aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python
diff options
context:
space:
mode:
authorGravatar Vizerai <jsking@google.com>2018-05-29 10:45:32 -0700
committerGravatar Vizerai <jsking@google.com>2018-05-29 10:45:32 -0700
commite501a3d3fdd63a4e8c184ff2bcb1e643dbfe4401 (patch)
treef83b87e440fe49217353e7daff913e7e6629e3e5 /src/python
parente1d7deeb5396691ae8ea515b0db6e778a2c0a59d (diff)
parent1bd0debdc7f57f2d51716789660b895a6c229350 (diff)
Merge branch 'master' of https://github.com/Vizerai/grpc into filter_port
Diffstat (limited to 'src/python')
-rw-r--r--src/python/grpcio/grpc/__init__.py17
-rw-r--r--src/python/grpcio/grpc/_channel.py445
-rw-r--r--src/python/grpcio/grpc/_common.py34
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/channel.pxd.pxi56
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi477
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd.pxi8
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi93
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi1
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/operation.pxd.pxi2
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/operation.pyx.pxi10
-rw-r--r--src/python/grpcio/grpc/_grpcio_metadata.py2
-rw-r--r--src/python/grpcio/grpc/_interceptor.py13
-rw-r--r--src/python/grpcio/grpc/_server.py10
-rw-r--r--src/python/grpcio/grpc/beta/_server_adaptations.py7
-rw-r--r--src/python/grpcio/grpc_core_dependencies.py6
-rw-r--r--src/python/grpcio/grpc_version.py2
-rw-r--r--src/python/grpcio_health_checking/grpc_version.py2
-rw-r--r--src/python/grpcio_health_checking/setup.py2
-rw-r--r--src/python/grpcio_reflection/grpc_version.py2
-rw-r--r--src/python/grpcio_reflection/setup.py2
-rw-r--r--src/python/grpcio_testing/grpc_testing/_channel/_channel.py15
-rw-r--r--src/python/grpcio_testing/grpc_version.py2
-rw-r--r--src/python/grpcio_testing/setup.py2
-rw-r--r--src/python/grpcio_tests/grpc_version.py2
-rw-r--r--src/python/grpcio_tests/setup.py7
-rw-r--r--src/python/grpcio_tests/tests/_loader.py2
-rw-r--r--src/python/grpcio_tests/tests/_result.py4
-rw-r--r--src/python/grpcio_tests/tests/interop/client.py4
-rw-r--r--src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py4
-rw-r--r--src/python/grpcio_tests/tests/qps/benchmark_client.py5
-rw-r--r--src/python/grpcio_tests/tests/qps/benchmark_server.py7
-rw-r--r--src/python/grpcio_tests/tests/qps/qps_worker.py5
-rw-r--r--src/python/grpcio_tests/tests/qps/worker_server.py6
-rw-r--r--src/python/grpcio_tests/tests/stress/test_runner.py2
-rw-r--r--src/python/grpcio_tests/tests/testing/_client_application.py24
-rw-r--r--src/python/grpcio_tests/tests/testing/_server_application.py2
-rw-r--r--src/python/grpcio_tests/tests/testing/_server_test.py5
-rw-r--r--src/python/grpcio_tests/tests/tests.json2
-rw-r--r--src/python/grpcio_tests/tests/unit/_channel_close_test.py185
-rw-r--r--src/python/grpcio_tests/tests/unit/_compression_test.py4
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py52
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/_channel_test.py28
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/_common.py3
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py54
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py55
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py73
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py114
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/test_utilities.py4
-rw-r--r--src/python/grpcio_tests/tests/unit/_exit_test.py2
-rw-r--r--src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py2
-rw-r--r--src/python/grpcio_tests/tests/unit/_invalid_metadata_test.py49
-rw-r--r--src/python/grpcio_tests/tests/unit/_invocation_defects_test.py4
-rw-r--r--src/python/grpcio_tests/tests/unit/_rpc_test.py8
-rw-r--r--src/python/grpcio_tests/tests/unit/_thread_cleanup_test.py115
-rw-r--r--src/python/grpcio_tests/tests/unit/beta/_beta_features_test.py2
55 files changed, 1260 insertions, 785 deletions
diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py
index 7fa7303691..b7ed0c8563 100644
--- a/src/python/grpcio/grpc/__init__.py
+++ b/src/python/grpcio/grpc/__init__.py
@@ -813,7 +813,11 @@ class StreamStreamMultiCallable(six.with_metaclass(abc.ABCMeta)):
class Channel(six.with_metaclass(abc.ABCMeta)):
- """Affords RPC invocation via generic methods on client-side."""
+ """Affords RPC invocation via generic methods on client-side.
+
+ Channel objects implement the Context Manager type, although they need not
+ support being entered and exited multiple times.
+ """
@abc.abstractmethod
def subscribe(self, callback, try_to_connect=False):
@@ -926,6 +930,17 @@ class Channel(six.with_metaclass(abc.ABCMeta)):
"""
raise NotImplementedError()
+ @abc.abstractmethod
+ def close(self):
+ """Closes this Channel and releases all resources held by it.
+
+ Closing the Channel will immediately terminate all RPCs active with the
+ Channel and it is not valid to invoke new RPCs with the Channel.
+
+ This method is idempotent.
+ """
+ raise NotImplementedError()
+
########################## Service-Side Context ##############################
diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py
index 2eff08aa57..2017d47130 100644
--- a/src/python/grpcio/grpc/_channel.py
+++ b/src/python/grpcio/grpc/_channel.py
@@ -58,6 +58,17 @@ _STREAM_STREAM_INITIAL_DUE = (
_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = (
'Exception calling channel subscription callback!')
+_OK_RENDEZVOUS_REPR_FORMAT = ('<_Rendezvous of RPC that terminated with:\n'
+ '\tstatus = {}\n'
+ '\tdetails = "{}"\n'
+ '>')
+
+_NON_OK_RENDEZVOUS_REPR_FORMAT = ('<_Rendezvous of RPC that terminated with:\n'
+ '\tstatus = {}\n'
+ '\tdetails = "{}"\n'
+ '\tdebug_error_string = "{}"\n'
+ '>')
+
def _deadline(timeout):
return None if timeout is None else time.time() + timeout
@@ -79,27 +90,6 @@ def _wait_once_until(condition, until):
condition.wait(timeout=remaining)
-_INTERNAL_CALL_ERROR_MESSAGE_FORMAT = (
- 'Internal gRPC call error %d. ' +
- 'Please report to https://github.com/grpc/grpc/issues')
-
-
-def _check_call_error(call_error, metadata):
- if call_error == cygrpc.CallError.invalid_metadata:
- raise ValueError('metadata was invalid: %s' % metadata)
- elif call_error != cygrpc.CallError.ok:
- raise ValueError(_INTERNAL_CALL_ERROR_MESSAGE_FORMAT % call_error)
-
-
-def _call_error_set_RPCstate(state, call_error, metadata):
- if call_error == cygrpc.CallError.invalid_metadata:
- _abort(state, grpc.StatusCode.INTERNAL,
- 'metadata was invalid: %s' % metadata)
- else:
- _abort(state, grpc.StatusCode.INTERNAL,
- _INTERNAL_CALL_ERROR_MESSAGE_FORMAT % call_error)
-
-
class _RPCState(object):
def __init__(self, due, initial_metadata, trailing_metadata, code, details):
@@ -112,6 +102,7 @@ class _RPCState(object):
self.trailing_metadata = trailing_metadata
self.code = code
self.details = details
+ self.debug_error_string = None
# The semantics of grpc.Future.cancel and grpc.Future.cancelled are
# slightly wonky, so they have to be tracked separately from the rest of the
# result of the RPC. This field tracks whether cancellation was requested
@@ -158,12 +149,13 @@ def _handle_event(event, state, response_deserializer):
else:
state.code = code
state.details = batch_operation.details()
+ state.debug_error_string = batch_operation.error_string()
callbacks.extend(state.callbacks)
state.callbacks = None
return callbacks
-def _event_handler(state, call, response_deserializer):
+def _event_handler(state, response_deserializer):
def handle_event(event):
with state.condition:
@@ -172,40 +164,47 @@ def _event_handler(state, call, response_deserializer):
done = not state.due
for callback in callbacks:
callback()
- return call if done else None
+ return done
return handle_event
-def _consume_request_iterator(request_iterator, state, call,
- request_serializer):
- event_handler = _event_handler(state, call, None)
+def _consume_request_iterator(request_iterator, state, call, request_serializer,
+ event_handler):
- def consume_request_iterator():
+ def consume_request_iterator(): # pylint: disable=too-many-branches
while True:
try:
request = next(request_iterator)
except StopIteration:
break
except Exception: # pylint: disable=broad-except
- logging.exception("Exception iterating requests!")
- call.cancel()
- _abort(state, grpc.StatusCode.UNKNOWN,
- "Exception iterating requests!")
+ code = grpc.StatusCode.UNKNOWN
+ details = 'Exception iterating requests!'
+ logging.exception(details)
+ call.cancel(_common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code],
+ details)
+ _abort(state, code, details)
return
serialized_request = _common.serialize(request, request_serializer)
with state.condition:
if state.code is None and not state.cancelled:
if serialized_request is None:
- call.cancel()
+ code = grpc.StatusCode.INTERNAL # pylint: disable=redefined-variable-type
details = 'Exception serializing request!'
- _abort(state, grpc.StatusCode.INTERNAL, details)
+ call.cancel(
+ _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code],
+ details)
+ _abort(state, code, details)
return
else:
operations = (cygrpc.SendMessageOperation(
serialized_request, _EMPTY_FLAGS),)
- call.start_client_batch(operations, event_handler)
- state.due.add(cygrpc.OperationType.send_message)
+ operating = call.operate(operations, event_handler)
+ if operating:
+ state.due.add(cygrpc.OperationType.send_message)
+ else:
+ return
while True:
state.condition.wait()
if state.code is None:
@@ -219,19 +218,12 @@ def _consume_request_iterator(request_iterator, state, call,
if state.code is None:
operations = (
cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),)
- call.start_client_batch(operations, event_handler)
- state.due.add(cygrpc.OperationType.send_close_from_client)
-
- def stop_consumption_thread(timeout): # pylint: disable=unused-argument
- with state.condition:
- if state.code is None:
- call.cancel()
- state.cancelled = True
- _abort(state, grpc.StatusCode.CANCELLED, 'Cancelled!')
- state.condition.notify_all()
+ operating = call.operate(operations, event_handler)
+ if operating:
+ state.due.add(cygrpc.OperationType.send_close_from_client)
- consumption_thread = _common.CleanupThread(
- stop_consumption_thread, target=consume_request_iterator)
+ consumption_thread = threading.Thread(target=consume_request_iterator)
+ consumption_thread.daemon = True
consumption_thread.start()
@@ -247,9 +239,12 @@ class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call):
def cancel(self):
with self._state.condition:
if self._state.code is None:
- self._call.cancel()
+ code = grpc.StatusCode.CANCELLED
+ details = 'Locally cancelled by application!'
+ self._call.cancel(
+ _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details)
self._state.cancelled = True
- _abort(self._state, grpc.StatusCode.CANCELLED, 'Cancelled!')
+ _abort(self._state, code, details)
self._state.condition.notify_all()
return False
@@ -318,12 +313,13 @@ class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call):
def _next(self):
with self._state.condition:
if self._state.code is None:
- event_handler = _event_handler(self._state, self._call,
+ event_handler = _event_handler(self._state,
self._response_deserializer)
- self._call.start_client_batch(
+ operating = self._call.operate(
(cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
event_handler)
- self._state.due.add(cygrpc.OperationType.receive_message)
+ if operating:
+ self._state.due.add(cygrpc.OperationType.receive_message)
elif self._state.code is grpc.StatusCode.OK:
raise StopIteration()
else:
@@ -391,13 +387,23 @@ class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call):
self._state.condition.wait()
return _common.decode(self._state.details)
+ def debug_error_string(self):
+ with self._state.condition:
+ while self._state.debug_error_string is None:
+ self._state.condition.wait()
+ return _common.decode(self._state.debug_error_string)
+
def _repr(self):
with self._state.condition:
if self._state.code is None:
return '<_Rendezvous object of in-flight RPC>'
+ elif self._state.code is grpc.StatusCode.OK:
+ return _OK_RENDEZVOUS_REPR_FORMAT.format(
+ self._state.code, self._state.details)
else:
- return '<_Rendezvous of RPC that terminated with ({}, {})>'.format(
- self._state.code, _common.decode(self._state.details))
+ return _NON_OK_RENDEZVOUS_REPR_FORMAT.format(
+ self._state.code, self._state.details,
+ self._state.debug_error_string)
def __repr__(self):
return self._repr()
@@ -408,9 +414,12 @@ class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call):
def __del__(self):
with self._state.condition:
if self._state.code is None:
- self._call.cancel()
- self._state.cancelled = True
self._state.code = grpc.StatusCode.CANCELLED
+ self._state.details = 'Cancelled upon garbage collection!'
+ self._state.cancelled = True
+ self._call.cancel(
+ _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[self._state.code],
+ self._state.details)
self._state.condition.notify_all()
@@ -437,6 +446,24 @@ def _end_unary_response_blocking(state, call, with_call, deadline):
raise _Rendezvous(state, None, None, deadline)
+def _stream_unary_invocation_operationses(metadata):
+ return (
+ (
+ cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS),
+ cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
+ cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
+ ),
+ (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
+ )
+
+
+def _stream_unary_invocation_operationses_and_tags(metadata):
+ return tuple((
+ operations,
+ None,
+ ) for operations in _stream_unary_invocation_operationses(metadata))
+
+
class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
def __init__(self, channel, managed_call, method, request_serializer,
@@ -448,8 +475,8 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
self._response_deserializer = response_deserializer
def _prepare(self, request, timeout, metadata):
- deadline, serialized_request, rendezvous = (_start_unary_request(
- request, timeout, self._request_serializer))
+ deadline, serialized_request, rendezvous = _start_unary_request(
+ request, timeout, self._request_serializer)
if serialized_request is None:
return None, None, None, rendezvous
else:
@@ -467,48 +494,38 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
def _blocking(self, request, timeout, metadata, credentials):
state, operations, deadline, rendezvous = self._prepare(
request, timeout, metadata)
- if rendezvous:
+ if state is None:
raise rendezvous
else:
- completion_queue = cygrpc.CompletionQueue()
- call = self._channel.create_call(None, 0, completion_queue,
- self._method, None, deadline)
- if credentials is not None:
- call.set_credentials(credentials._credentials)
- call_error = call.start_client_batch(operations, None)
- _check_call_error(call_error, metadata)
- _handle_event(completion_queue.poll(), state,
- self._response_deserializer)
- return state, call, deadline
+ call = self._channel.segregated_call(
+ 0, self._method, None, deadline, metadata, None
+ if credentials is None else credentials._credentials, ((
+ operations,
+ None,
+ ),))
+ event = call.next_event()
+ _handle_event(event, state, self._response_deserializer)
+ return state, call,
def __call__(self, request, timeout=None, metadata=None, credentials=None):
- state, call, deadline = self._blocking(request, timeout, metadata,
- credentials)
- return _end_unary_response_blocking(state, call, False, deadline)
+ state, call, = self._blocking(request, timeout, metadata, credentials)
+ return _end_unary_response_blocking(state, call, False, None)
def with_call(self, request, timeout=None, metadata=None, credentials=None):
- state, call, deadline = self._blocking(request, timeout, metadata,
- credentials)
- return _end_unary_response_blocking(state, call, True, deadline)
+ state, call, = self._blocking(request, timeout, metadata, credentials)
+ return _end_unary_response_blocking(state, call, True, None)
def future(self, request, timeout=None, metadata=None, credentials=None):
state, operations, deadline, rendezvous = self._prepare(
request, timeout, metadata)
- if rendezvous:
- return rendezvous
+ if state is None:
+ raise rendezvous
else:
- call, drive_call = self._managed_call(None, 0, self._method, None,
- deadline)
- if credentials is not None:
- call.set_credentials(credentials._credentials)
- event_handler = _event_handler(state, call,
- self._response_deserializer)
- with state.condition:
- call_error = call.start_client_batch(operations, event_handler)
- if call_error != cygrpc.CallError.ok:
- _call_error_set_RPCstate(state, call_error, metadata)
- return _Rendezvous(state, None, None, deadline)
- drive_call()
+ event_handler = _event_handler(state, self._response_deserializer)
+ call = self._managed_call(
+ 0, self._method, None, deadline, metadata, None
+ if credentials is None else credentials._credentials,
+ (operations,), event_handler)
return _Rendezvous(state, call, self._response_deserializer,
deadline)
@@ -524,34 +541,27 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
self._response_deserializer = response_deserializer
def __call__(self, request, timeout=None, metadata=None, credentials=None):
- deadline, serialized_request, rendezvous = (_start_unary_request(
- request, timeout, self._request_serializer))
+ deadline, serialized_request, rendezvous = _start_unary_request(
+ request, timeout, self._request_serializer)
if serialized_request is None:
raise rendezvous
else:
state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
- call, drive_call = self._managed_call(None, 0, self._method, None,
- deadline)
- if credentials is not None:
- call.set_credentials(credentials._credentials)
- event_handler = _event_handler(state, call,
- self._response_deserializer)
- with state.condition:
- call.start_client_batch(
- (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
- event_handler)
- operations = (
+ operationses = (
+ (
cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS),
cygrpc.SendMessageOperation(serialized_request,
_EMPTY_FLAGS),
cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
- )
- call_error = call.start_client_batch(operations, event_handler)
- if call_error != cygrpc.CallError.ok:
- _call_error_set_RPCstate(state, call_error, metadata)
- return _Rendezvous(state, None, None, deadline)
- drive_call()
+ ),
+ (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
+ )
+ event_handler = _event_handler(state, self._response_deserializer)
+ call = self._managed_call(
+ 0, self._method, None, deadline, metadata, None
+ if credentials is None else credentials._credentials,
+ operationses, event_handler)
return _Rendezvous(state, call, self._response_deserializer,
deadline)
@@ -569,49 +579,38 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
def _blocking(self, request_iterator, timeout, metadata, credentials):
deadline = _deadline(timeout)
state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
- completion_queue = cygrpc.CompletionQueue()
- call = self._channel.create_call(None, 0, completion_queue,
- self._method, None, deadline)
- if credentials is not None:
- call.set_credentials(credentials._credentials)
- with state.condition:
- call.start_client_batch(
- (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), None)
- operations = (
- cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS),
- cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
- cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
- )
- call_error = call.start_client_batch(operations, None)
- _check_call_error(call_error, metadata)
- _consume_request_iterator(request_iterator, state, call,
- self._request_serializer)
+ call = self._channel.segregated_call(
+ 0, self._method, None, deadline, metadata, None
+ if credentials is None else credentials._credentials,
+ _stream_unary_invocation_operationses_and_tags(metadata))
+ _consume_request_iterator(request_iterator, state, call,
+ self._request_serializer, None)
while True:
- event = completion_queue.poll()
+ event = call.next_event()
with state.condition:
_handle_event(event, state, self._response_deserializer)
state.condition.notify_all()
if not state.due:
break
- return state, call, deadline
+ return state, call,
def __call__(self,
request_iterator,
timeout=None,
metadata=None,
credentials=None):
- state, call, deadline = self._blocking(request_iterator, timeout,
- metadata, credentials)
- return _end_unary_response_blocking(state, call, False, deadline)
+ state, call, = self._blocking(request_iterator, timeout, metadata,
+ credentials)
+ return _end_unary_response_blocking(state, call, False, None)
def with_call(self,
request_iterator,
timeout=None,
metadata=None,
credentials=None):
- state, call, deadline = self._blocking(request_iterator, timeout,
- metadata, credentials)
- return _end_unary_response_blocking(state, call, True, deadline)
+ state, call, = self._blocking(request_iterator, timeout, metadata,
+ credentials)
+ return _end_unary_response_blocking(state, call, True, None)
def future(self,
request_iterator,
@@ -620,27 +619,13 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
credentials=None):
deadline = _deadline(timeout)
state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
- call, drive_call = self._managed_call(None, 0, self._method, None,
- deadline)
- if credentials is not None:
- call.set_credentials(credentials._credentials)
- event_handler = _event_handler(state, call, self._response_deserializer)
- with state.condition:
- call.start_client_batch(
- (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
- event_handler)
- operations = (
- cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS),
- cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
- cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
- )
- call_error = call.start_client_batch(operations, event_handler)
- if call_error != cygrpc.CallError.ok:
- _call_error_set_RPCstate(state, call_error, metadata)
- return _Rendezvous(state, None, None, deadline)
- drive_call()
- _consume_request_iterator(request_iterator, state, call,
- self._request_serializer)
+ event_handler = _event_handler(state, self._response_deserializer)
+ call = self._managed_call(
+ 0, self._method, None, deadline, metadata, None
+ if credentials is None else credentials._credentials,
+ _stream_unary_invocation_operationses(metadata), event_handler)
+ _consume_request_iterator(request_iterator, state, call,
+ self._request_serializer, event_handler)
return _Rendezvous(state, call, self._response_deserializer, deadline)
@@ -661,26 +646,20 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
credentials=None):
deadline = _deadline(timeout)
state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None)
- call, drive_call = self._managed_call(None, 0, self._method, None,
- deadline)
- if credentials is not None:
- call.set_credentials(credentials._credentials)
- event_handler = _event_handler(state, call, self._response_deserializer)
- with state.condition:
- call.start_client_batch(
- (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
- event_handler)
- operations = (
+ operationses = (
+ (
cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS),
cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
- )
- call_error = call.start_client_batch(operations, event_handler)
- if call_error != cygrpc.CallError.ok:
- _call_error_set_RPCstate(state, call_error, metadata)
- return _Rendezvous(state, None, None, deadline)
- drive_call()
- _consume_request_iterator(request_iterator, state, call,
- self._request_serializer)
+ ),
+ (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
+ )
+ event_handler = _event_handler(state, self._response_deserializer)
+ call = self._managed_call(
+ 0, self._method, None, deadline, metadata, None
+ if credentials is None else credentials._credentials, operationses,
+ event_handler)
+ _consume_request_iterator(request_iterator, state, call,
+ self._request_serializer, event_handler)
return _Rendezvous(state, call, self._response_deserializer, deadline)
@@ -689,67 +668,63 @@ class _ChannelCallState(object):
def __init__(self, channel):
self.lock = threading.Lock()
self.channel = channel
- self.completion_queue = cygrpc.CompletionQueue()
- self.managed_calls = None
+ self.managed_calls = 0
def _run_channel_spin_thread(state):
def channel_spin():
while True:
- event = state.completion_queue.poll()
- completed_call = event.tag(event)
- if completed_call is not None:
+ event = state.channel.next_call_event()
+ call_completed = event.tag(event)
+ if call_completed:
with state.lock:
- state.managed_calls.remove(completed_call)
- if not state.managed_calls:
- state.managed_calls = None
+ state.managed_calls -= 1
+ if state.managed_calls == 0:
return
- def stop_channel_spin(timeout): # pylint: disable=unused-argument
- with state.lock:
- if state.managed_calls is not None:
- for call in state.managed_calls:
- call.cancel()
-
- channel_spin_thread = _common.CleanupThread(
- stop_channel_spin, target=channel_spin)
+ channel_spin_thread = threading.Thread(target=channel_spin)
+ channel_spin_thread.daemon = True
channel_spin_thread.start()
def _channel_managed_call_management(state):
- def create(parent, flags, method, host, deadline):
- """Creates a managed cygrpc.Call and a function to call to drive it.
-
- If operations are successfully added to the returned cygrpc.Call, the
- returned function must be called. If operations are not successfully added
- to the returned cygrpc.Call, the returned function must not be called.
-
- Args:
- parent: A cygrpc.Call to be used as the parent of the created call.
- flags: An integer bitfield of call flags.
- method: The RPC method.
- host: A host string for the created call.
- deadline: A float to be the deadline of the created call or None if the
- call is to have an infinite deadline.
-
- Returns:
- A cygrpc.Call with which to conduct an RPC and a function to call if
- operations are successfully started on the call.
- """
- call = state.channel.create_call(parent, flags, state.completion_queue,
- method, host, deadline)
-
- def drive():
- with state.lock:
- if state.managed_calls is None:
- state.managed_calls = set((call,))
- _run_channel_spin_thread(state)
- else:
- state.managed_calls.add(call)
+ # pylint: disable=too-many-arguments
+ def create(flags, method, host, deadline, metadata, credentials,
+ operationses, event_handler):
+ """Creates a cygrpc.IntegratedCall.
- return call, drive
+ Args:
+ flags: An integer bitfield of call flags.
+ method: The RPC method.
+ host: A host string for the created call.
+ deadline: A float to be the deadline of the created call or None if
+ the call is to have an infinite deadline.
+ metadata: The metadata for the call or None.
+ credentials: A cygrpc.CallCredentials or None.
+ operationses: An iterable of iterables of cygrpc.Operations to be
+ started on the call.
+ event_handler: A behavior to call to handle the events resultant from
+ the operations on the call.
+
+ Returns:
+ A cygrpc.IntegratedCall with which to conduct an RPC.
+ """
+ operationses_and_tags = tuple((
+ operations,
+ event_handler,
+ ) for operations in operationses)
+ with state.lock:
+ call = state.channel.integrated_call(flags, method, host, deadline,
+ metadata, credentials,
+ operationses_and_tags)
+ if state.managed_calls == 0:
+ state.managed_calls = 1
+ _run_channel_spin_thread(state)
+ else:
+ state.managed_calls += 1
+ return call
return create
@@ -819,12 +794,9 @@ def _poll_connectivity(state, channel, initial_try_to_connect):
callback_and_connectivity[1] = state.connectivity
if callbacks:
_spawn_delivery(state, callbacks)
- completion_queue = cygrpc.CompletionQueue()
while True:
- channel.watch_connectivity_state(connectivity,
- time.time() + 0.2, completion_queue,
- None)
- event = completion_queue.poll()
+ event = channel.watch_connectivity_state(connectivity,
+ time.time() + 0.2)
with state.lock:
if not state.callbacks_and_connectivities and not state.try_to_connect:
state.polling = False
@@ -855,10 +827,10 @@ def _moot(state):
def _subscribe(state, callback, try_to_connect):
with state.lock:
if not state.callbacks_and_connectivities and not state.polling:
- polling_thread = _common.CleanupThread(
- lambda timeout: _moot(state),
+ polling_thread = threading.Thread(
target=_poll_connectivity,
args=(state, state.channel, bool(try_to_connect)))
+ polling_thread.daemon = True
polling_thread.start()
state.polling = True
state.callbacks_and_connectivities.append([callback, None])
@@ -944,5 +916,28 @@ class Channel(grpc.Channel):
self._channel, _channel_managed_call_management(self._call_state),
_common.encode(method), request_serializer, response_deserializer)
+ def _close(self):
+ self._channel.close(cygrpc.StatusCode.cancelled, 'Channel closed!')
+ _moot(self._connectivity_state)
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self._close()
+ return False
+
+ def close(self):
+ self._close()
+
def __del__(self):
+ # TODO(https://github.com/grpc/grpc/issues/12531): Several releases
+ # after 1.12 (1.16 or thereabouts?) add a "self._channel.close" call
+ # here (or more likely, call self._close() here). We don't do this today
+ # because many valid use cases today allow the channel to be deleted
+ # immediately after stubs are created. After a sufficient period of time
+ # has passed for all users to be trusted to hang out to their channels
+ # for as long as they are in use and to close them after using them,
+ # then deletion of this grpc._channel.Channel instance can be made to
+ # effect closure of the underlying cygrpc.Channel instance.
_moot(self._connectivity_state)
diff --git a/src/python/grpcio/grpc/_common.py b/src/python/grpcio/grpc/_common.py
index bbb69ad489..862987a0cd 100644
--- a/src/python/grpcio/grpc/_common.py
+++ b/src/python/grpcio/grpc/_common.py
@@ -14,8 +14,6 @@
"""Shared implementation."""
import logging
-import threading
-import time
import six
@@ -101,35 +99,3 @@ def deserialize(serialized_message, deserializer):
def fully_qualified_method(group, method):
return '/{}/{}'.format(group, method)
-
-
-class CleanupThread(threading.Thread):
- """A threading.Thread subclass supporting custom behavior on join().
-
- On Python Interpreter exit, Python will attempt to join outstanding threads
- prior to garbage collection. We may need to do additional cleanup, and
- we accomplish this by overriding the join() method.
- """
-
- def __init__(self, behavior, *args, **kwargs):
- """Constructor.
-
- Args:
- behavior (function): Function called on join() with a single
- argument, timeout, indicating the maximum duration of
- `behavior`, or None indicating `behavior` has no deadline.
- `behavior` must be idempotent.
- args: Positional arguments passed to threading.Thread constructor.
- kwargs: Keyword arguments passed to threading.Thread constructor.
- """
- super(CleanupThread, self).__init__(*args, **kwargs)
- self._behavior = behavior
-
- def join(self, timeout=None):
- start_time = time.time()
- self._behavior(timeout)
- end_time = time.time()
- if timeout is not None:
- timeout -= end_time - start_time
- timeout = max(timeout, 0)
- super(CleanupThread, self).join(timeout)
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pxd.pxi
index 1ba76b7f83..eefc685c0b 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pxd.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pxd.pxi
@@ -13,9 +13,59 @@
# limitations under the License.
+cdef _check_call_error_no_metadata(c_call_error)
+
+
+cdef _check_and_raise_call_error_no_metadata(c_call_error)
+
+
+cdef _check_call_error(c_call_error, metadata)
+
+
+cdef class _CallState:
+
+ cdef grpc_call *c_call
+ cdef set due
+
+
+cdef class _ChannelState:
+
+ cdef object condition
+ cdef grpc_channel *c_channel
+ # A boolean field indicating that the channel is open (if True) or is being
+ # closed (i.e. a call to close is currently executing) or is closed (if
+ # False).
+ # TODO(https://github.com/grpc/grpc/issues/3064): Eliminate "is being closed"
+ # a state in which condition may be acquired by any thread, eliminate this
+ # field and just use the NULLness of c_channel as an indication that the
+ # channel is closed.
+ cdef object open
+
+ # A dict from _BatchOperationTag to _CallState
+ cdef dict integrated_call_states
+ cdef grpc_completion_queue *c_call_completion_queue
+
+ # A set of _CallState
+ cdef set segregated_call_states
+
+ cdef set connectivity_due
+ cdef grpc_completion_queue *c_connectivity_completion_queue
+
+
+cdef class IntegratedCall:
+
+ cdef _ChannelState _channel_state
+ cdef _CallState _call_state
+
+
+cdef class SegregatedCall:
+
+ cdef _ChannelState _channel_state
+ cdef _CallState _call_state
+ cdef grpc_completion_queue *_c_completion_queue
+
+
cdef class Channel:
cdef grpc_arg_pointer_vtable _vtable
- cdef grpc_channel *c_channel
- cdef list references
- cdef readonly _ArgumentsProcessor _arguments_processor
+ cdef _ChannelState _state
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
index a3966497bc..72e74e84ae 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
@@ -14,82 +14,439 @@
cimport cpython
+import threading
+
+_INTERNAL_CALL_ERROR_MESSAGE_FORMAT = (
+ 'Internal gRPC call error %d. ' +
+ 'Please report to https://github.com/grpc/grpc/issues')
+
+
+cdef str _call_error_metadata(metadata):
+ return 'metadata was invalid: %s' % metadata
+
+
+cdef str _call_error_no_metadata(c_call_error):
+ return _INTERNAL_CALL_ERROR_MESSAGE_FORMAT % c_call_error
+
+
+cdef str _call_error(c_call_error, metadata):
+ if c_call_error == GRPC_CALL_ERROR_INVALID_METADATA:
+ return _call_error_metadata(metadata)
+ else:
+ return _call_error_no_metadata(c_call_error)
+
+
+cdef _check_call_error_no_metadata(c_call_error):
+ if c_call_error != GRPC_CALL_OK:
+ return _INTERNAL_CALL_ERROR_MESSAGE_FORMAT % c_call_error
+ else:
+ return None
+
+
+cdef _check_and_raise_call_error_no_metadata(c_call_error):
+ error = _check_call_error_no_metadata(c_call_error)
+ if error is not None:
+ raise ValueError(error)
+
+
+cdef _check_call_error(c_call_error, metadata):
+ if c_call_error == GRPC_CALL_ERROR_INVALID_METADATA:
+ return _call_error_metadata(metadata)
+ else:
+ return _check_call_error_no_metadata(c_call_error)
+
+
+cdef void _raise_call_error_no_metadata(c_call_error) except *:
+ raise ValueError(_call_error_no_metadata(c_call_error))
+
+
+cdef void _raise_call_error(c_call_error, metadata) except *:
+ raise ValueError(_call_error(c_call_error, metadata))
+
+
+cdef _destroy_c_completion_queue(grpc_completion_queue *c_completion_queue):
+ grpc_completion_queue_shutdown(c_completion_queue)
+ grpc_completion_queue_destroy(c_completion_queue)
+
+
+cdef class _CallState:
+
+ def __cinit__(self):
+ self.due = set()
+
+
+cdef class _ChannelState:
+
+ def __cinit__(self):
+ self.condition = threading.Condition()
+ self.open = True
+ self.integrated_call_states = {}
+ self.segregated_call_states = set()
+ self.connectivity_due = set()
+
+
+cdef tuple _operate(grpc_call *c_call, object operations, object user_tag):
+ cdef grpc_call_error c_call_error
+ cdef _BatchOperationTag tag = _BatchOperationTag(user_tag, operations, None)
+ tag.prepare()
+ cpython.Py_INCREF(tag)
+ with nogil:
+ c_call_error = grpc_call_start_batch(
+ c_call, tag.c_ops, tag.c_nops, <cpython.PyObject *>tag, NULL)
+ return c_call_error, tag
+
+
+cdef object _operate_from_integrated_call(
+ _ChannelState channel_state, _CallState call_state, object operations,
+ object user_tag):
+ cdef grpc_call_error c_call_error
+ cdef _BatchOperationTag tag
+ with channel_state.condition:
+ if call_state.due:
+ c_call_error, tag = _operate(call_state.c_call, operations, user_tag)
+ if c_call_error == GRPC_CALL_OK:
+ call_state.due.add(tag)
+ channel_state.integrated_call_states[tag] = call_state
+ return True
+ else:
+ _raise_call_error_no_metadata(c_call_error)
+ else:
+ return False
+
+
+cdef object _operate_from_segregated_call(
+ _ChannelState channel_state, _CallState call_state, object operations,
+ object user_tag):
+ cdef grpc_call_error c_call_error
+ cdef _BatchOperationTag tag
+ with channel_state.condition:
+ if call_state.due:
+ c_call_error, tag = _operate(call_state.c_call, operations, user_tag)
+ if c_call_error == GRPC_CALL_OK:
+ call_state.due.add(tag)
+ return True
+ else:
+ _raise_call_error_no_metadata(c_call_error)
+ else:
+ return False
+
+
+cdef _cancel(
+ _ChannelState channel_state, _CallState call_state, grpc_status_code code,
+ str details):
+ cdef grpc_call_error c_call_error
+ with channel_state.condition:
+ if call_state.due:
+ c_call_error = grpc_call_cancel_with_status(
+ call_state.c_call, code, _encode(details), NULL)
+ _check_and_raise_call_error_no_metadata(c_call_error)
+
+
+cdef BatchOperationEvent _next_call_event(
+ _ChannelState channel_state, grpc_completion_queue *c_completion_queue,
+ on_success):
+ tag, event = _latent_event(c_completion_queue, None)
+ with channel_state.condition:
+ on_success(tag)
+ channel_state.condition.notify_all()
+ return event
+
+
+# TODO(https://github.com/grpc/grpc/issues/14569): This could be a lot simpler.
+cdef void _call(
+ _ChannelState channel_state, _CallState call_state,
+ grpc_completion_queue *c_completion_queue, on_success, int flags, method,
+ host, object deadline, CallCredentials credentials,
+ object operationses_and_user_tags, object metadata) except *:
+ """Invokes an RPC.
+
+ Args:
+ channel_state: A _ChannelState with its "open" attribute set to True. RPCs
+ may not be invoked on a closed channel.
+ call_state: An empty _CallState to be altered (specifically assigned a
+ c_call and having its due set populated) if the RPC invocation is
+ successful.
+ c_completion_queue: A grpc_completion_queue to be used for the call's
+ operations.
+ on_success: A behavior to be called if attempting to start operations for
+ the call succeeds. If called the behavior will be called while holding the
+ channel_state condition and passed the tags associated with operations
+ that were successfully started for the call.
+ flags: Flags to be passed to gRPC Core as part of call creation.
+ method: The fully-qualified name of the RPC method being invoked.
+ host: A "host" string to be passed to gRPC Core as part of call creation.
+ deadline: A float for the deadline of the RPC, or None if the RPC is to have
+ no deadline.
+ credentials: A _CallCredentials for the RPC or None.
+ operationses_and_user_tags: A sequence of length-two sequences the first
+ element of which is a sequence of Operations and the second element of
+ which is an object to be used as a tag. A SendInitialMetadataOperation
+ must be present in the first element of this value.
+ metadata: The metadata for this call.
+ """
+ cdef grpc_slice method_slice
+ cdef grpc_slice host_slice
+ cdef grpc_slice *host_slice_ptr
+ cdef grpc_call_credentials *c_call_credentials
+ cdef grpc_call_error c_call_error
+ cdef tuple error_and_wrapper_tag
+ cdef _BatchOperationTag wrapper_tag
+ with channel_state.condition:
+ if channel_state.open:
+ method_slice = _slice_from_bytes(method)
+ if host is None:
+ host_slice_ptr = NULL
+ else:
+ host_slice = _slice_from_bytes(host)
+ host_slice_ptr = &host_slice
+ call_state.c_call = grpc_channel_create_call(
+ channel_state.c_channel, NULL, flags,
+ c_completion_queue, method_slice, host_slice_ptr,
+ _timespec_from_time(deadline), NULL)
+ grpc_slice_unref(method_slice)
+ if host_slice_ptr:
+ grpc_slice_unref(host_slice)
+ if credentials is not None:
+ c_call_credentials = credentials.c()
+ c_call_error = grpc_call_set_credentials(
+ call_state.c_call, c_call_credentials)
+ grpc_call_credentials_release(c_call_credentials)
+ if c_call_error != GRPC_CALL_OK:
+ grpc_call_unref(call_state.c_call)
+ call_state.c_call = NULL
+ _raise_call_error_no_metadata(c_call_error)
+ started_tags = set()
+ for operations, user_tag in operationses_and_user_tags:
+ c_call_error, tag = _operate(call_state.c_call, operations, user_tag)
+ if c_call_error == GRPC_CALL_OK:
+ started_tags.add(tag)
+ else:
+ grpc_call_cancel(call_state.c_call, NULL)
+ grpc_call_unref(call_state.c_call)
+ call_state.c_call = NULL
+ _raise_call_error(c_call_error, metadata)
+ else:
+ call_state.due.update(started_tags)
+ on_success(started_tags)
+ else:
+ raise ValueError('Cannot invoke RPC on closed channel!')
+
+cdef void _process_integrated_call_tag(
+ _ChannelState state, _BatchOperationTag tag) except *:
+ cdef _CallState call_state = state.integrated_call_states.pop(tag)
+ call_state.due.remove(tag)
+ if not call_state.due:
+ grpc_call_unref(call_state.c_call)
+ call_state.c_call = NULL
+
+
+cdef class IntegratedCall:
+
+ def __cinit__(self, _ChannelState channel_state, _CallState call_state):
+ self._channel_state = channel_state
+ self._call_state = call_state
+
+ def operate(self, operations, tag):
+ return _operate_from_integrated_call(
+ self._channel_state, self._call_state, operations, tag)
+
+ def cancel(self, code, details):
+ _cancel(self._channel_state, self._call_state, code, details)
+
+
+cdef IntegratedCall _integrated_call(
+ _ChannelState state, int flags, method, host, object deadline,
+ object metadata, CallCredentials credentials, operationses_and_user_tags):
+ call_state = _CallState()
+
+ def on_success(started_tags):
+ for started_tag in started_tags:
+ state.integrated_call_states[started_tag] = call_state
+
+ _call(
+ state, call_state, state.c_call_completion_queue, on_success, flags,
+ method, host, deadline, credentials, operationses_and_user_tags, metadata)
+
+ return IntegratedCall(state, call_state)
+
+
+cdef object _process_segregated_call_tag(
+ _ChannelState state, _CallState call_state,
+ grpc_completion_queue *c_completion_queue, _BatchOperationTag tag):
+ call_state.due.remove(tag)
+ if not call_state.due:
+ grpc_call_unref(call_state.c_call)
+ call_state.c_call = NULL
+ state.segregated_call_states.remove(call_state)
+ _destroy_c_completion_queue(c_completion_queue)
+ return True
+ else:
+ return False
+
+
+cdef class SegregatedCall:
+
+ def __cinit__(self, _ChannelState channel_state, _CallState call_state):
+ self._channel_state = channel_state
+ self._call_state = call_state
+
+ def operate(self, operations, tag):
+ return _operate_from_segregated_call(
+ self._channel_state, self._call_state, operations, tag)
+
+ def cancel(self, code, details):
+ _cancel(self._channel_state, self._call_state, code, details)
+
+ def next_event(self):
+ def on_success(tag):
+ _process_segregated_call_tag(
+ self._channel_state, self._call_state, self._c_completion_queue, tag)
+ return _next_call_event(
+ self._channel_state, self._c_completion_queue, on_success)
+
+
+cdef SegregatedCall _segregated_call(
+ _ChannelState state, int flags, method, host, object deadline,
+ object metadata, CallCredentials credentials, operationses_and_user_tags):
+ cdef _CallState call_state = _CallState()
+ cdef grpc_completion_queue *c_completion_queue = (
+ grpc_completion_queue_create_for_next(NULL))
+ cdef SegregatedCall segregated_call
+
+ def on_success(started_tags):
+ state.segregated_call_states.add(call_state)
+
+ try:
+ _call(
+ state, call_state, c_completion_queue, on_success, flags, method, host,
+ deadline, credentials, operationses_and_user_tags, metadata)
+ except:
+ _destroy_c_completion_queue(c_completion_queue)
+ raise
+
+ segregated_call = SegregatedCall(state, call_state)
+ segregated_call._c_completion_queue = c_completion_queue
+ return segregated_call
+
+
+cdef object _watch_connectivity_state(
+ _ChannelState state, grpc_connectivity_state last_observed_state,
+ object deadline):
+ cdef _ConnectivityTag tag = _ConnectivityTag(object())
+ with state.condition:
+ if state.open:
+ cpython.Py_INCREF(tag)
+ grpc_channel_watch_connectivity_state(
+ state.c_channel, last_observed_state, _timespec_from_time(deadline),
+ state.c_connectivity_completion_queue, <cpython.PyObject *>tag)
+ state.connectivity_due.add(tag)
+ else:
+ raise ValueError('Cannot invoke RPC on closed channel!')
+ completed_tag, event = _latent_event(
+ state.c_connectivity_completion_queue, None)
+ with state.condition:
+ state.connectivity_due.remove(completed_tag)
+ state.condition.notify_all()
+ return event
+
+
+cdef _close(_ChannelState state, grpc_status_code code, object details):
+ cdef _CallState call_state
+ encoded_details = _encode(details)
+ with state.condition:
+ if state.open:
+ state.open = False
+ for call_state in set(state.integrated_call_states.values()):
+ grpc_call_cancel_with_status(
+ call_state.c_call, code, encoded_details, NULL)
+ for call_state in state.segregated_call_states:
+ grpc_call_cancel_with_status(
+ call_state.c_call, code, encoded_details, NULL)
+ # TODO(https://github.com/grpc/grpc/issues/3064): Cancel connectivity
+ # watching.
+
+ while state.integrated_call_states:
+ state.condition.wait()
+ while state.segregated_call_states:
+ state.condition.wait()
+ while state.connectivity_due:
+ state.condition.wait()
+
+ _destroy_c_completion_queue(state.c_call_completion_queue)
+ _destroy_c_completion_queue(state.c_connectivity_completion_queue)
+ grpc_channel_destroy(state.c_channel)
+ state.c_channel = NULL
+ grpc_shutdown()
+ state.condition.notify_all()
+ else:
+ # Another call to close already completed in the past or is currently
+ # being executed in another thread.
+ while state.c_channel != NULL:
+ state.condition.wait()
+
cdef class Channel:
- def __cinit__(self, bytes target, object arguments,
- ChannelCredentials channel_credentials=None):
+ def __cinit__(
+ self, bytes target, object arguments,
+ ChannelCredentials channel_credentials):
grpc_init()
+ self._state = _ChannelState()
self._vtable.copy = &_copy_pointer
self._vtable.destroy = &_destroy_pointer
self._vtable.cmp = &_compare_pointer
cdef _ArgumentsProcessor arguments_processor = _ArgumentsProcessor(
arguments)
cdef grpc_channel_args *c_arguments = arguments_processor.c(&self._vtable)
- self.references = []
- c_target = target
if channel_credentials is None:
- self.c_channel = grpc_insecure_channel_create(c_target, c_arguments, NULL)
+ self._state.c_channel = grpc_insecure_channel_create(
+ <char *>target, c_arguments, NULL)
else:
c_channel_credentials = channel_credentials.c()
- self.c_channel = grpc_secure_channel_create(
- c_channel_credentials, c_target, c_arguments, NULL)
+ self._state.c_channel = grpc_secure_channel_create(
+ c_channel_credentials, <char *>target, c_arguments, NULL)
grpc_channel_credentials_release(c_channel_credentials)
- arguments_processor.un_c()
- self.references.append(target)
- self.references.append(arguments)
-
- def create_call(self, Call parent, int flags,
- CompletionQueue queue not None,
- method, host, object deadline):
- if queue.is_shutting_down:
- raise ValueError("queue must not be shutting down or shutdown")
- cdef grpc_slice method_slice = _slice_from_bytes(method)
- cdef grpc_slice host_slice
- cdef grpc_slice *host_slice_ptr = NULL
- if host is not None:
- host_slice = _slice_from_bytes(host)
- host_slice_ptr = &host_slice
- cdef Call operation_call = Call()
- operation_call.references = [self, queue]
- cdef grpc_call *parent_call = NULL
- if parent is not None:
- parent_call = parent.c_call
- operation_call.c_call = grpc_channel_create_call(
- self.c_channel, parent_call, flags,
- queue.c_completion_queue, method_slice, host_slice_ptr,
- _timespec_from_time(deadline), NULL)
- grpc_slice_unref(method_slice)
- if host_slice_ptr:
- grpc_slice_unref(host_slice)
- return operation_call
+ self._state.c_call_completion_queue = (
+ grpc_completion_queue_create_for_next(NULL))
+ self._state.c_connectivity_completion_queue = (
+ grpc_completion_queue_create_for_next(NULL))
+
+ def target(self):
+ cdef char *c_target
+ with self._state.condition:
+ c_target = grpc_channel_get_target(self._state.c_channel)
+ target = <bytes>c_target
+ gpr_free(c_target)
+ return target
+
+ def integrated_call(
+ self, int flags, method, host, object deadline, object metadata,
+ CallCredentials credentials, operationses_and_tags):
+ return _integrated_call(
+ self._state, flags, method, host, deadline, metadata, credentials,
+ operationses_and_tags)
+
+ def next_call_event(self):
+ def on_success(tag):
+ _process_integrated_call_tag(self._state, tag)
+ return _next_call_event(
+ self._state, self._state.c_call_completion_queue, on_success)
+
+ def segregated_call(
+ self, int flags, method, host, object deadline, object metadata,
+ CallCredentials credentials, operationses_and_tags):
+ return _segregated_call(
+ self._state, flags, method, host, deadline, metadata, credentials,
+ operationses_and_tags)
def check_connectivity_state(self, bint try_to_connect):
- cdef grpc_connectivity_state result
- with nogil:
- result = grpc_channel_check_connectivity_state(self.c_channel,
- try_to_connect)
- return result
+ with self._state.condition:
+ return grpc_channel_check_connectivity_state(
+ self._state.c_channel, try_to_connect)
def watch_connectivity_state(
- self, grpc_connectivity_state last_observed_state,
- object deadline, CompletionQueue queue not None, tag):
- cdef _ConnectivityTag connectivity_tag = _ConnectivityTag(tag)
- cpython.Py_INCREF(connectivity_tag)
- grpc_channel_watch_connectivity_state(
- self.c_channel, last_observed_state, _timespec_from_time(deadline),
- queue.c_completion_queue, <cpython.PyObject *>connectivity_tag)
+ self, grpc_connectivity_state last_observed_state, object deadline):
+ return _watch_connectivity_state(self._state, last_observed_state, deadline)
- def target(self):
- cdef char *target = NULL
- with nogil:
- target = grpc_channel_get_target(self.c_channel)
- result = <bytes>target
- with nogil:
- gpr_free(target)
- return result
-
- def __dealloc__(self):
- if self.c_channel != NULL:
- grpc_channel_destroy(self.c_channel)
- grpc_shutdown()
+ def close(self, code, details):
+ _close(self._state, code, details)
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd.pxi
index 5ea0287b81..9f06ce086e 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd.pxi
@@ -13,10 +13,16 @@
# limitations under the License.
+cdef grpc_event _next(grpc_completion_queue *c_completion_queue, deadline)
+
+
+cdef _interpret_event(grpc_event c_event)
+
+
cdef class CompletionQueue:
cdef grpc_completion_queue *c_completion_queue
cdef bint is_shutting_down
cdef bint is_shutdown
- cdef _interpret_event(self, grpc_event event)
+ cdef _interpret_event(self, grpc_event c_event)
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
index 40496d1124..a2d765546a 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
@@ -20,6 +20,53 @@ import time
cdef int _INTERRUPT_CHECK_PERIOD_MS = 200
+cdef grpc_event _next(grpc_completion_queue *c_completion_queue, deadline):
+ cdef gpr_timespec c_increment
+ cdef gpr_timespec c_timeout
+ cdef gpr_timespec c_deadline
+ c_increment = gpr_time_from_millis(_INTERRUPT_CHECK_PERIOD_MS, GPR_TIMESPAN)
+ if deadline is None:
+ c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME)
+ else:
+ c_deadline = _timespec_from_time(deadline)
+
+ with nogil:
+ while True:
+ c_timeout = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), c_increment)
+ if gpr_time_cmp(c_timeout, c_deadline) > 0:
+ c_timeout = c_deadline
+ c_event = grpc_completion_queue_next(c_completion_queue, c_timeout, NULL)
+ if (c_event.type != GRPC_QUEUE_TIMEOUT or
+ gpr_time_cmp(c_timeout, c_deadline) == 0):
+ break
+
+ # Handle any signals
+ with gil:
+ cpython.PyErr_CheckSignals()
+ return c_event
+
+
+cdef _interpret_event(grpc_event c_event):
+ cdef _Tag tag
+ if c_event.type == GRPC_QUEUE_TIMEOUT:
+ # NOTE(nathaniel): For now we coopt ConnectivityEvent here.
+ return None, ConnectivityEvent(GRPC_QUEUE_TIMEOUT, False, None)
+ elif c_event.type == GRPC_QUEUE_SHUTDOWN:
+ # NOTE(nathaniel): For now we coopt ConnectivityEvent here.
+ return None, ConnectivityEvent(GRPC_QUEUE_SHUTDOWN, False, None)
+ else:
+ tag = <_Tag>c_event.tag
+ # We receive event tags only after they've been inc-ref'd elsewhere in
+ # the code.
+ cpython.Py_DECREF(tag)
+ return tag, tag.event(c_event)
+
+
+cdef _latent_event(grpc_completion_queue *c_completion_queue, object deadline):
+ cdef grpc_event c_event = _next(c_completion_queue, deadline)
+ return _interpret_event(c_event)
+
+
cdef class CompletionQueue:
def __cinit__(self, shutdown_cq=False):
@@ -36,48 +83,16 @@ cdef class CompletionQueue:
self.is_shutting_down = False
self.is_shutdown = False
- cdef _interpret_event(self, grpc_event event):
- cdef _Tag tag = None
- if event.type == GRPC_QUEUE_TIMEOUT:
- # NOTE(nathaniel): For now we coopt ConnectivityEvent here.
- return ConnectivityEvent(GRPC_QUEUE_TIMEOUT, False, None)
- elif event.type == GRPC_QUEUE_SHUTDOWN:
+ cdef _interpret_event(self, grpc_event c_event):
+ unused_tag, event = _interpret_event(c_event)
+ if event.completion_type == GRPC_QUEUE_SHUTDOWN:
self.is_shutdown = True
- # NOTE(nathaniel): For now we coopt ConnectivityEvent here.
- return ConnectivityEvent(GRPC_QUEUE_TIMEOUT, True, None)
- else:
- tag = <_Tag>event.tag
- # We receive event tags only after they've been inc-ref'd elsewhere in
- # the code.
- cpython.Py_DECREF(tag)
- return tag.event(event)
+ return event
+ # We name this 'poll' to avoid problems with CPython's expectations for
+ # 'special' methods (like next and __next__).
def poll(self, deadline=None):
- # We name this 'poll' to avoid problems with CPython's expectations for
- # 'special' methods (like next and __next__).
- cdef gpr_timespec c_increment
- cdef gpr_timespec c_timeout
- cdef gpr_timespec c_deadline
- if deadline is None:
- c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME)
- else:
- c_deadline = _timespec_from_time(deadline)
- with nogil:
- c_increment = gpr_time_from_millis(_INTERRUPT_CHECK_PERIOD_MS, GPR_TIMESPAN)
-
- while True:
- c_timeout = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), c_increment)
- if gpr_time_cmp(c_timeout, c_deadline) > 0:
- c_timeout = c_deadline
- event = grpc_completion_queue_next(
- self.c_completion_queue, c_timeout, NULL)
- if event.type != GRPC_QUEUE_TIMEOUT or gpr_time_cmp(c_timeout, c_deadline) == 0:
- break;
-
- # Handle any signals
- with gil:
- cpython.PyErr_CheckSignals()
- return self._interpret_event(event)
+ return self._interpret_event(_next(self.c_completion_queue, deadline))
def shutdown(self):
with nogil:
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
index a4c0319553..2d6c900c54 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
@@ -291,6 +291,7 @@ cdef extern from "grpc/grpc.h":
grpc_metadata_array *trailing_metadata
grpc_status_code *status
grpc_slice *status_details
+ char** error_string
ctypedef struct grpc_op_data_recv_close_on_server:
int *cancelled
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/operation.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/operation.pxd.pxi
index bfbe27785b..69a2a4989e 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/operation.pxd.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/operation.pxd.pxi
@@ -91,9 +91,11 @@ cdef class ReceiveStatusOnClientOperation(Operation):
cdef grpc_metadata_array _c_trailing_metadata
cdef grpc_status_code _c_code
cdef grpc_slice _c_details
+ cdef const char* _c_error_string
cdef tuple _trailing_metadata
cdef object _code
cdef str _details
+ cdef str _error_string
cdef void c(self)
cdef void un_c(self)
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/operation.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/operation.pyx.pxi
index 239d0f3f95..454627f570 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/operation.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/operation.pyx.pxi
@@ -199,6 +199,8 @@ cdef class ReceiveStatusOnClientOperation(Operation):
&self._c_code)
self.c_op.data.receive_status_on_client.status_details = (
&self._c_details)
+ self.c_op.data.receive_status_on_client.error_string = (
+ &self._c_error_string)
cdef void un_c(self):
self._trailing_metadata = _metadata(&self._c_trailing_metadata)
@@ -206,6 +208,11 @@ cdef class ReceiveStatusOnClientOperation(Operation):
self._code = self._c_code
self._details = _decode(_slice_bytes(self._c_details))
grpc_slice_unref(self._c_details)
+ if self._c_error_string != NULL:
+ self._error_string = _decode(self._c_error_string)
+ gpr_free(<void*>self._c_error_string)
+ else:
+ self._error_string = ""
def trailing_metadata(self):
return self._trailing_metadata
@@ -216,6 +223,9 @@ cdef class ReceiveStatusOnClientOperation(Operation):
def details(self):
return self._details
+ def error_string(self):
+ return self._error_string
+
cdef class ReceiveCloseOnServerOperation(Operation):
diff --git a/src/python/grpcio/grpc/_grpcio_metadata.py b/src/python/grpcio/grpc/_grpcio_metadata.py
index cb5da72f1f..ad53f60ad3 100644
--- a/src/python/grpcio/grpc/_grpcio_metadata.py
+++ b/src/python/grpcio/grpc/_grpcio_metadata.py
@@ -14,4 +14,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio/grpc/_grpcio_metadata.py.template`!!!
-__version__ = """1.12.0.dev0"""
+__version__ = """1.13.0.dev0"""
diff --git a/src/python/grpcio/grpc/_interceptor.py b/src/python/grpcio/grpc/_interceptor.py
index d029472c68..f465e35a9c 100644
--- a/src/python/grpcio/grpc/_interceptor.py
+++ b/src/python/grpcio/grpc/_interceptor.py
@@ -334,6 +334,19 @@ class _Channel(grpc.Channel):
else:
return thunk(method)
+ def _close(self):
+ self._channel.close()
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self._close()
+ return False
+
+ def close(self):
+ self._channel.close()
+
def intercept_channel(channel, *interceptors):
for interceptor in reversed(list(interceptors)):
diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py
index c988e0c87c..d849cadbee 100644
--- a/src/python/grpcio/grpc/_server.py
+++ b/src/python/grpcio/grpc/_server.py
@@ -780,14 +780,8 @@ def _start(state):
state.stage = _ServerStage.STARTED
_request_call(state)
- def cleanup_server(timeout):
- if timeout is None:
- _stop(state, _UNEXPECTED_EXIT_SERVER_GRACE).wait()
- else:
- _stop(state, timeout).wait()
-
- thread = _common.CleanupThread(
- cleanup_server, target=_serve, args=(state,))
+ thread = threading.Thread(target=_serve, args=(state,))
+ thread.daemon = True
thread.start()
diff --git a/src/python/grpcio/grpc/beta/_server_adaptations.py b/src/python/grpcio/grpc/beta/_server_adaptations.py
index 3c04fd7639..ccafec8951 100644
--- a/src/python/grpcio/grpc/beta/_server_adaptations.py
+++ b/src/python/grpcio/grpc/beta/_server_adaptations.py
@@ -168,11 +168,8 @@ def _run_request_pipe_thread(request_iterator, request_consumer,
return
request_consumer.terminate()
- def stop_request_pipe(timeout): # pylint: disable=unused-argument
- thread_joined.set()
-
- request_pipe_thread = _common.CleanupThread(
- stop_request_pipe, target=pipe_requests)
+ request_pipe_thread = threading.Thread(target=pipe_requests)
+ request_pipe_thread.daemon = True
request_pipe_thread.start()
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index 81e6cb11e8..de5d2ba33f 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -28,7 +28,6 @@ CORE_SOURCE_FILES = [
'src/core/lib/gpr/env_linux.cc',
'src/core/lib/gpr/env_posix.cc',
'src/core/lib/gpr/env_windows.cc',
- 'src/core/lib/gpr/fork.cc',
'src/core/lib/gpr/host_port.cc',
'src/core/lib/gpr/log.cc',
'src/core/lib/gpr/log_android.cc',
@@ -53,6 +52,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/gpr/tmpfile_posix.cc',
'src/core/lib/gpr/tmpfile_windows.cc',
'src/core/lib/gpr/wrap_memcpy.cc',
+ 'src/core/lib/gprpp/fork.cc',
'src/core/lib/gprpp/thd_posix.cc',
'src/core/lib/gprpp/thd_windows.cc',
'src/core/lib/profiling/basic_timers.cc',
@@ -64,7 +64,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/channel/channel_stack.cc',
'src/core/lib/channel/channel_stack_builder.cc',
'src/core/lib/channel/channel_trace.cc',
- 'src/core/lib/channel/channel_trace_registry.cc',
+ 'src/core/lib/channel/channelz_registry.cc',
'src/core/lib/channel/connected_channel.cc',
'src/core/lib/channel/handshaker.cc',
'src/core/lib/channel/handshaker_factory.cc',
@@ -296,7 +296,6 @@ CORE_SOURCE_FILES = [
'third_party/nanopb/pb_decode.c',
'third_party/nanopb/pb_encode.c',
'src/core/tsi/transport_security.cc',
- 'src/core/tsi/transport_security_adapter.cc',
'src/core/ext/transport/chttp2/client/insecure/channel_create.cc',
'src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc',
'src/core/ext/transport/chttp2/client/authority.cc',
@@ -344,7 +343,6 @@ CORE_SOURCE_FILES = [
'src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c',
'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc',
'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc',
- 'src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc',
'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc',
diff --git a/src/python/grpcio/grpc_version.py b/src/python/grpcio/grpc_version.py
index de5a780abd..57dc26dbeb 100644
--- a/src/python/grpcio/grpc_version.py
+++ b/src/python/grpcio/grpc_version.py
@@ -14,4 +14,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio/grpc_version.py.template`!!!
-VERSION = '1.12.0.dev0'
+VERSION = '1.13.0.dev0'
diff --git a/src/python/grpcio_health_checking/grpc_version.py b/src/python/grpcio_health_checking/grpc_version.py
index afcd316e5c..ba0d4a3b6d 100644
--- a/src/python/grpcio_health_checking/grpc_version.py
+++ b/src/python/grpcio_health_checking/grpc_version.py
@@ -14,4 +14,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_health_checking/grpc_version.py.template`!!!
-VERSION = '1.12.0.dev0'
+VERSION = '1.13.0.dev0'
diff --git a/src/python/grpcio_health_checking/setup.py b/src/python/grpcio_health_checking/setup.py
index 60d309ec65..35c09827ba 100644
--- a/src/python/grpcio_health_checking/setup.py
+++ b/src/python/grpcio_health_checking/setup.py
@@ -57,7 +57,7 @@ PACKAGE_DIRECTORIES = {
}
INSTALL_REQUIRES = (
- 'protobuf>=3.5.0.post1',
+ 'protobuf>=3.5.2.post1',
'grpcio>={version}'.format(version=grpc_version.VERSION),
)
diff --git a/src/python/grpcio_reflection/grpc_version.py b/src/python/grpcio_reflection/grpc_version.py
index 824b73201d..ea2878d9ee 100644
--- a/src/python/grpcio_reflection/grpc_version.py
+++ b/src/python/grpcio_reflection/grpc_version.py
@@ -14,4 +14,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_reflection/grpc_version.py.template`!!!
-VERSION = '1.12.0.dev0'
+VERSION = '1.13.0.dev0'
diff --git a/src/python/grpcio_reflection/setup.py b/src/python/grpcio_reflection/setup.py
index 10c4c38f19..589d0ff556 100644
--- a/src/python/grpcio_reflection/setup.py
+++ b/src/python/grpcio_reflection/setup.py
@@ -58,7 +58,7 @@ PACKAGE_DIRECTORIES = {
}
INSTALL_REQUIRES = (
- 'protobuf>=3.5.0.post1',
+ 'protobuf>=3.5.2.post1',
'grpcio>={version}'.format(version=grpc_version.VERSION),
)
diff --git a/src/python/grpcio_testing/grpc_testing/_channel/_channel.py b/src/python/grpcio_testing/grpc_testing/_channel/_channel.py
index b015b8d738..0c1941e6be 100644
--- a/src/python/grpcio_testing/grpc_testing/_channel/_channel.py
+++ b/src/python/grpcio_testing/grpc_testing/_channel/_channel.py
@@ -56,6 +56,21 @@ class TestingChannel(grpc_testing.Channel):
response_deserializer=None):
return _multi_callable.StreamStream(method, self._state)
+ def _close(self):
+ # TODO(https://github.com/grpc/grpc/issues/12531): Decide what
+ # action to take here, if any?
+ pass
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self._close()
+ return False
+
+ def close(self):
+ self._close()
+
def take_unary_unary(self, method_descriptor):
return _channel_rpc.unary_unary(self._state, method_descriptor)
diff --git a/src/python/grpcio_testing/grpc_version.py b/src/python/grpcio_testing/grpc_version.py
index 5b1f4c4cc0..02f19f2283 100644
--- a/src/python/grpcio_testing/grpc_version.py
+++ b/src/python/grpcio_testing/grpc_version.py
@@ -14,4 +14,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_testing/grpc_version.py.template`!!!
-VERSION = '1.12.0.dev0'
+VERSION = '1.13.0.dev0'
diff --git a/src/python/grpcio_testing/setup.py b/src/python/grpcio_testing/setup.py
index 5a9d593ec1..eb480a5464 100644
--- a/src/python/grpcio_testing/setup.py
+++ b/src/python/grpcio_testing/setup.py
@@ -29,7 +29,7 @@ PACKAGE_DIRECTORIES = {
}
INSTALL_REQUIRES = (
- 'protobuf>=3.5.0.post1',
+ 'protobuf>=3.5.2.post1',
'grpcio>={version}'.format(version=grpc_version.VERSION),
)
diff --git a/src/python/grpcio_tests/grpc_version.py b/src/python/grpcio_tests/grpc_version.py
index 382f95018e..9d2e41644e 100644
--- a/src/python/grpcio_tests/grpc_version.py
+++ b/src/python/grpcio_tests/grpc_version.py
@@ -14,4 +14,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_tests/grpc_version.py.template`!!!
-VERSION = '1.12.0.dev0'
+VERSION = '1.13.0.dev0'
diff --git a/src/python/grpcio_tests/setup.py b/src/python/grpcio_tests/setup.py
index 4e0f6726fa..1262e48571 100644
--- a/src/python/grpcio_tests/setup.py
+++ b/src/python/grpcio_tests/setup.py
@@ -37,13 +37,16 @@ PACKAGE_DIRECTORIES = {
}
INSTALL_REQUIRES = (
- 'coverage>=4.0', 'enum34>=1.0.4', 'futures>=2.2.0',
+ 'coverage>=4.0', 'enum34>=1.0.4',
'grpcio>={version}'.format(version=grpc_version.VERSION),
'grpcio-tools>={version}'.format(version=grpc_version.VERSION),
'grpcio-health-checking>={version}'.format(version=grpc_version.VERSION),
- 'oauth2client>=1.4.7', 'protobuf>=3.5.0.post1', 'six>=1.10',
+ 'oauth2client>=1.4.7', 'protobuf>=3.5.2.post1', 'six>=1.10',
'google-auth>=1.0.0', 'requests>=2.14.2')
+if not PY3:
+ INSTALL_REQUIRES += ('futures>=2.2.0',)
+
COMMAND_CLASS = {
# Run `preprocess` *before* doing any packaging!
'preprocess': commands.GatherProto,
diff --git a/src/python/grpcio_tests/tests/_loader.py b/src/python/grpcio_tests/tests/_loader.py
index 31680916b4..be0af64646 100644
--- a/src/python/grpcio_tests/tests/_loader.py
+++ b/src/python/grpcio_tests/tests/_loader.py
@@ -54,7 +54,7 @@ class Loader(object):
for module in modules:
try:
package_paths = module.__path__
- except:
+ except AttributeError:
continue
self.walk_packages(package_paths)
coverage_context.stop()
diff --git a/src/python/grpcio_tests/tests/_result.py b/src/python/grpcio_tests/tests/_result.py
index 9907c4e1f9..b105f18e78 100644
--- a/src/python/grpcio_tests/tests/_result.py
+++ b/src/python/grpcio_tests/tests/_result.py
@@ -46,7 +46,7 @@ class CaseResult(
None.
"""
- class Kind:
+ class Kind(object):
UNTESTED = 'untested'
RUNNING = 'running'
ERROR = 'error'
@@ -257,7 +257,7 @@ class CoverageResult(AugmentedResult):
#coverage.Coverage().combine()
-class _Colors:
+class _Colors(object):
"""Namespaced constants for terminal color magic numbers."""
HEADER = '\033[95m'
INFO = '\033[94m'
diff --git a/src/python/grpcio_tests/tests/interop/client.py b/src/python/grpcio_tests/tests/interop/client.py
index 3780ed9020..698c37017f 100644
--- a/src/python/grpcio_tests/tests/interop/client.py
+++ b/src/python/grpcio_tests/tests/interop/client.py
@@ -66,10 +66,6 @@ def _args():
return parser.parse_args()
-def _application_default_credentials():
- return oauth2client_client.GoogleCredentials.get_application_default()
-
-
def _stub(args):
target = '{}:{}'.format(args.server_host, args.server_port)
if args.test_case == 'oauth2_auth_token':
diff --git a/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py b/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py
index ad0ecf0079..b46e53315e 100644
--- a/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py
+++ b/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py
@@ -329,9 +329,7 @@ class PythonPluginTest(unittest.TestCase):
_packagify(self._python_out)
- with _system_path([
- self._python_out,
- ]):
+ with _system_path([self._python_out]):
self._payload_pb2 = importlib.import_module(_PAYLOAD_PB2)
self._requests_pb2 = importlib.import_module(_REQUESTS_PB2)
self._responses_pb2 = importlib.import_module(_RESPONSES_PB2)
diff --git a/src/python/grpcio_tests/tests/qps/benchmark_client.py b/src/python/grpcio_tests/tests/qps/benchmark_client.py
index e6392a8b8c..0488450740 100644
--- a/src/python/grpcio_tests/tests/qps/benchmark_client.py
+++ b/src/python/grpcio_tests/tests/qps/benchmark_client.py
@@ -22,7 +22,7 @@ from six.moves import queue
import grpc
from src.proto.grpc.testing import messages_pb2
-from src.proto.grpc.testing import services_pb2_grpc
+from src.proto.grpc.testing import benchmark_service_pb2_grpc
from tests.unit import resources
from tests.unit import test_common
@@ -58,7 +58,8 @@ class BenchmarkClient:
if config.payload_config.WhichOneof('payload') == 'simple_params':
self._generic = False
- self._stub = services_pb2_grpc.BenchmarkServiceStub(channel)
+ self._stub = benchmark_service_pb2_grpc.BenchmarkServiceStub(
+ channel)
payload = messages_pb2.Payload(
body='\0' * config.payload_config.simple_params.req_size)
self._request = messages_pb2.SimpleRequest(
diff --git a/src/python/grpcio_tests/tests/qps/benchmark_server.py b/src/python/grpcio_tests/tests/qps/benchmark_server.py
index bb07844491..2bd89cbbdf 100644
--- a/src/python/grpcio_tests/tests/qps/benchmark_server.py
+++ b/src/python/grpcio_tests/tests/qps/benchmark_server.py
@@ -13,10 +13,10 @@
# limitations under the License.
from src.proto.grpc.testing import messages_pb2
-from src.proto.grpc.testing import services_pb2_grpc
+from src.proto.grpc.testing import benchmark_service_pb2_grpc
-class BenchmarkServer(services_pb2_grpc.BenchmarkServiceServicer):
+class BenchmarkServer(benchmark_service_pb2_grpc.BenchmarkServiceServicer):
"""Synchronous Server implementation for the Benchmark service."""
def UnaryCall(self, request, context):
@@ -29,7 +29,8 @@ class BenchmarkServer(services_pb2_grpc.BenchmarkServiceServicer):
yield messages_pb2.SimpleResponse(payload=payload)
-class GenericBenchmarkServer(services_pb2_grpc.BenchmarkServiceServicer):
+class GenericBenchmarkServer(
+ benchmark_service_pb2_grpc.BenchmarkServiceServicer):
"""Generic Server implementation for the Benchmark service."""
def __init__(self, resp_size):
diff --git a/src/python/grpcio_tests/tests/qps/qps_worker.py b/src/python/grpcio_tests/tests/qps/qps_worker.py
index 54f69db109..c33d013882 100644
--- a/src/python/grpcio_tests/tests/qps/qps_worker.py
+++ b/src/python/grpcio_tests/tests/qps/qps_worker.py
@@ -17,7 +17,7 @@ import argparse
import time
import grpc
-from src.proto.grpc.testing import services_pb2_grpc
+from src.proto.grpc.testing import worker_service_pb2_grpc
from tests.qps import worker_server
from tests.unit import test_common
@@ -26,7 +26,8 @@ from tests.unit import test_common
def run_worker_server(port):
server = test_common.test_server()
servicer = worker_server.WorkerServer()
- services_pb2_grpc.add_WorkerServiceServicer_to_server(servicer, server)
+ worker_service_pb2_grpc.add_WorkerServiceServicer_to_server(
+ servicer, server)
server.add_insecure_port('[::]:{}'.format(port))
server.start()
servicer.wait_for_quit()
diff --git a/src/python/grpcio_tests/tests/qps/worker_server.py b/src/python/grpcio_tests/tests/qps/worker_server.py
index 41e2403c8f..db145fbf64 100644
--- a/src/python/grpcio_tests/tests/qps/worker_server.py
+++ b/src/python/grpcio_tests/tests/qps/worker_server.py
@@ -20,7 +20,7 @@ import time
from concurrent import futures
import grpc
from src.proto.grpc.testing import control_pb2
-from src.proto.grpc.testing import services_pb2_grpc
+from src.proto.grpc.testing import worker_service_pb2_grpc
from src.proto.grpc.testing import stats_pb2
from tests.qps import benchmark_client
@@ -31,7 +31,7 @@ from tests.unit import resources
from tests.unit import test_common
-class WorkerServer(services_pb2_grpc.WorkerServiceServicer):
+class WorkerServer(worker_service_pb2_grpc.WorkerServiceServicer):
"""Python Worker Server implementation."""
def __init__(self):
@@ -72,7 +72,7 @@ class WorkerServer(services_pb2_grpc.WorkerServiceServicer):
server = test_common.test_server(max_workers=server_threads)
if config.server_type == control_pb2.ASYNC_SERVER:
servicer = benchmark_server.BenchmarkServer()
- services_pb2_grpc.add_BenchmarkServiceServicer_to_server(
+ worker_service_pb2_grpc.add_BenchmarkServiceServicer_to_server(
servicer, server)
elif config.server_type == control_pb2.ASYNC_GENERIC_SERVER:
resp_size = config.payload_config.bytebuf_params.resp_size
diff --git a/src/python/grpcio_tests/tests/stress/test_runner.py b/src/python/grpcio_tests/tests/stress/test_runner.py
index d5038e3ba2..764cda17fb 100644
--- a/src/python/grpcio_tests/tests/stress/test_runner.py
+++ b/src/python/grpcio_tests/tests/stress/test_runner.py
@@ -50,7 +50,7 @@ class TestRunner(threading.Thread):
test_case.test_interoperability(self._stub, None)
end_time = time.time()
self._histogram.add((end_time - start_time) * 1e9)
- except Exception as e:
+ except Exception as e: # pylint: disable=broad-except
traceback.print_exc()
self._exception_queue.put(
Exception("An exception occured during test {}"
diff --git a/src/python/grpcio_tests/tests/testing/_client_application.py b/src/python/grpcio_tests/tests/testing/_client_application.py
index 7d0d74c8c4..3ddeba2373 100644
--- a/src/python/grpcio_tests/tests/testing/_client_application.py
+++ b/src/python/grpcio_tests/tests/testing/_client_application.py
@@ -215,30 +215,6 @@ def _run_infinite_request_stream(stub):
return _UNSATISFACTORY_OUTCOME
-def run(scenario, channel):
- stub = services_pb2_grpc.FirstServiceStub(channel)
- try:
- if scenario is Scenario.UNARY_UNARY:
- return _run_unary_unary(stub)
- elif scenario is Scenario.UNARY_STREAM:
- return _run_unary_stream(stub)
- elif scenario is Scenario.STREAM_UNARY:
- return _run_stream_unary(stub)
- elif scenario is Scenario.STREAM_STREAM:
- return _run_stream_stream(stub)
- elif scenario is Scenario.CONCURRENT_STREAM_UNARY:
- return _run_concurrent_stream_unary(stub)
- elif scenario is Scenario.CONCURRENT_STREAM_STREAM:
- return _run_concurrent_stream_stream(stub)
- elif scenario is Scenario.CANCEL_UNARY_UNARY:
- return _run_cancel_unary_unary(stub)
- elif scenario is Scenario.INFINITE_REQUEST_STREAM:
- return _run_infinite_request_stream(stub)
- except grpc.RpcError as rpc_error:
- return Outcome(Outcome.Kind.RPC_ERROR, rpc_error.code(),
- rpc_error.details())
-
-
_IMPLEMENTATIONS = {
Scenario.UNARY_UNARY: _run_unary_unary,
Scenario.UNARY_STREAM: _run_unary_stream,
diff --git a/src/python/grpcio_tests/tests/testing/_server_application.py b/src/python/grpcio_tests/tests/testing/_server_application.py
index 02769ca68d..243c385daf 100644
--- a/src/python/grpcio_tests/tests/testing/_server_application.py
+++ b/src/python/grpcio_tests/tests/testing/_server_application.py
@@ -38,7 +38,7 @@ class FirstServiceServicer(services_pb2_grpc.FirstServiceServicer):
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
context.set_details('Something is wrong with your request!')
return
- yield services_pb2.Strange()
+ yield services_pb2.Strange() # pylint: disable=unreachable
def StreUn(self, request_iterator, context):
context.send_initial_metadata(((
diff --git a/src/python/grpcio_tests/tests/testing/_server_test.py b/src/python/grpcio_tests/tests/testing/_server_test.py
index 4f4abd7708..88e3a79ae5 100644
--- a/src/python/grpcio_tests/tests/testing/_server_test.py
+++ b/src/python/grpcio_tests/tests/testing/_server_test.py
@@ -21,13 +21,8 @@ import grpc_testing
from tests.testing import _application_common
from tests.testing import _application_testing_common
from tests.testing import _server_application
-from tests.testing.proto import services_pb2
-# TODO(https://github.com/google/protobuf/issues/3452): Drop this skip.
-@unittest.skipIf(
- services_pb2.DESCRIPTOR.services_by_name.get('FirstService') is None,
- 'Fix protobuf issue 3452!')
class FirstServiceServicerTest(unittest.TestCase):
def setUp(self):
diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json
index d38ee517f0..0d94426413 100644
--- a/src/python/grpcio_tests/tests/tests.json
+++ b/src/python/grpcio_tests/tests/tests.json
@@ -25,6 +25,7 @@
"unit._auth_test.AccessTokenAuthMetadataPluginTest",
"unit._auth_test.GoogleCallCredentialsTest",
"unit._channel_args_test.ChannelArgsTest",
+ "unit._channel_close_test.ChannelCloseTest",
"unit._channel_connectivity_test.ChannelConnectivityTest",
"unit._channel_ready_future_test.ChannelReadyFutureTest",
"unit._compression_test.CompressionTest",
@@ -52,7 +53,6 @@
"unit._server_ssl_cert_config_test.ServerSSLCertReloadTestCertConfigReuse",
"unit._server_ssl_cert_config_test.ServerSSLCertReloadTestWithClientAuth",
"unit._server_ssl_cert_config_test.ServerSSLCertReloadTestWithoutClientAuth",
- "unit._thread_cleanup_test.CleanupThreadTest",
"unit.beta._beta_features_test.BetaFeaturesTest",
"unit.beta._beta_features_test.ContextManagementAndLifecycleTest",
"unit.beta._connectivity_channel_test.ConnectivityStatesTest",
diff --git a/src/python/grpcio_tests/tests/unit/_channel_close_test.py b/src/python/grpcio_tests/tests/unit/_channel_close_test.py
new file mode 100644
index 0000000000..af3a9ee1ee
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/_channel_close_test.py
@@ -0,0 +1,185 @@
+# Copyright 2018 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Tests server and client side compression."""
+
+import threading
+import time
+import unittest
+
+import grpc
+
+from tests.unit import test_common
+from tests.unit.framework.common import test_constants
+
+_BEAT = 0.5
+_SOME_TIME = 5
+_MORE_TIME = 10
+
+
+class _MethodHandler(grpc.RpcMethodHandler):
+
+ request_streaming = True
+ response_streaming = True
+ request_deserializer = None
+ response_serializer = None
+
+ def stream_stream(self, request_iterator, servicer_context):
+ for request in request_iterator:
+ yield request * 2
+
+
+_METHOD_HANDLER = _MethodHandler()
+
+
+class _GenericHandler(grpc.GenericRpcHandler):
+
+ def service(self, handler_call_details):
+ return _METHOD_HANDLER
+
+
+_GENERIC_HANDLER = _GenericHandler()
+
+
+class _Pipe(object):
+
+ def __init__(self, values):
+ self._condition = threading.Condition()
+ self._values = list(values)
+ self._open = True
+
+ def __iter__(self):
+ return self
+
+ def _next(self):
+ with self._condition:
+ while not self._values and self._open:
+ self._condition.wait()
+ if self._values:
+ return self._values.pop(0)
+ else:
+ raise StopIteration()
+
+ def next(self):
+ return self._next()
+
+ def __next__(self):
+ return self._next()
+
+ def add(self, value):
+ with self._condition:
+ self._values.append(value)
+ self._condition.notify()
+
+ def close(self):
+ with self._condition:
+ self._open = False
+ self._condition.notify()
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, type, value, traceback):
+ self.close()
+
+
+class ChannelCloseTest(unittest.TestCase):
+
+ def setUp(self):
+ self._server = test_common.test_server(
+ max_workers=test_constants.THREAD_CONCURRENCY)
+ self._server.add_generic_rpc_handlers((_GENERIC_HANDLER,))
+ self._port = self._server.add_insecure_port('[::]:0')
+ self._server.start()
+
+ def tearDown(self):
+ self._server.stop(None)
+
+ def test_close_immediately_after_call_invocation(self):
+ channel = grpc.insecure_channel('localhost:{}'.format(self._port))
+ multi_callable = channel.stream_stream('Meffod')
+ request_iterator = _Pipe(())
+ response_iterator = multi_callable(request_iterator)
+ channel.close()
+ request_iterator.close()
+
+ self.assertIs(response_iterator.code(), grpc.StatusCode.CANCELLED)
+
+ def test_close_while_call_active(self):
+ channel = grpc.insecure_channel('localhost:{}'.format(self._port))
+ multi_callable = channel.stream_stream('Meffod')
+ request_iterator = _Pipe((b'abc',))
+ response_iterator = multi_callable(request_iterator)
+ next(response_iterator)
+ channel.close()
+ request_iterator.close()
+
+ self.assertIs(response_iterator.code(), grpc.StatusCode.CANCELLED)
+
+ def test_context_manager_close_while_call_active(self):
+ with grpc.insecure_channel('localhost:{}'.format(
+ self._port)) as channel: # pylint: disable=bad-continuation
+ multi_callable = channel.stream_stream('Meffod')
+ request_iterator = _Pipe((b'abc',))
+ response_iterator = multi_callable(request_iterator)
+ next(response_iterator)
+ request_iterator.close()
+
+ self.assertIs(response_iterator.code(), grpc.StatusCode.CANCELLED)
+
+ def test_context_manager_close_while_many_calls_active(self):
+ with grpc.insecure_channel('localhost:{}'.format(
+ self._port)) as channel: # pylint: disable=bad-continuation
+ multi_callable = channel.stream_stream('Meffod')
+ request_iterators = tuple(
+ _Pipe((b'abc',))
+ for _ in range(test_constants.THREAD_CONCURRENCY))
+ response_iterators = []
+ for request_iterator in request_iterators:
+ response_iterator = multi_callable(request_iterator)
+ next(response_iterator)
+ response_iterators.append(response_iterator)
+ for request_iterator in request_iterators:
+ request_iterator.close()
+
+ for response_iterator in response_iterators:
+ self.assertIs(response_iterator.code(), grpc.StatusCode.CANCELLED)
+
+ def test_many_concurrent_closes(self):
+ channel = grpc.insecure_channel('localhost:{}'.format(self._port))
+ multi_callable = channel.stream_stream('Meffod')
+ request_iterator = _Pipe((b'abc',))
+ response_iterator = multi_callable(request_iterator)
+ next(response_iterator)
+ start = time.time()
+ end = start + _MORE_TIME
+
+ def sleep_some_time_then_close():
+ time.sleep(_SOME_TIME)
+ channel.close()
+
+ for _ in range(test_constants.THREAD_CONCURRENCY):
+ close_thread = threading.Thread(target=sleep_some_time_then_close)
+ close_thread.start()
+ while True:
+ request_iterator.add(b'def')
+ time.sleep(_BEAT)
+ if end < time.time():
+ break
+ request_iterator.close()
+
+ self.assertIs(response_iterator.code(), grpc.StatusCode.CANCELLED)
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_tests/tests/unit/_compression_test.py b/src/python/grpcio_tests/tests/unit/_compression_test.py
index 7550cd39ba..0b11f03adf 100644
--- a/src/python/grpcio_tests/tests/unit/_compression_test.py
+++ b/src/python/grpcio_tests/tests/unit/_compression_test.py
@@ -52,9 +52,9 @@ class _MethodHandler(grpc.RpcMethodHandler):
self.stream_unary = None
self.stream_stream = None
if self.request_streaming and self.response_streaming:
- self.stream_stream = lambda x, y: handle_stream(x, y)
+ self.stream_stream = handle_stream
elif not self.request_streaming and not self.response_streaming:
- self.unary_unary = lambda x, y: handle_unary(x, y)
+ self.unary_unary = handle_unary
class _GenericHandler(grpc.GenericRpcHandler):
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py b/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py
index 3765ce4fb0..578a3d79ad 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py
@@ -19,6 +19,7 @@ import unittest
from grpc._cython import cygrpc
from grpc.framework.foundation import logging_pool
from tests.unit.framework.common import test_constants
+from tests.unit._cython import test_utilities
_EMPTY_FLAGS = 0
_EMPTY_METADATA = ()
@@ -30,6 +31,8 @@ _RECEIVE_MESSAGE_TAG = 'receive_message'
_SERVER_COMPLETE_CALL_TAG = 'server_complete_call'
_SUCCESS_CALL_FRACTION = 1.0 / 8.0
+_SUCCESSFUL_CALLS = int(test_constants.RPC_CONCURRENCY * _SUCCESS_CALL_FRACTION)
+_UNSUCCESSFUL_CALLS = test_constants.RPC_CONCURRENCY - _SUCCESSFUL_CALLS
class _State(object):
@@ -43,7 +46,7 @@ class _State(object):
def _is_cancellation_event(event):
return (event.tag is _RECEIVE_CLOSE_ON_SERVER_TAG and
- event.batch_operations[0].received_cancelled)
+ event.batch_operations[0].cancelled())
class _Handler(object):
@@ -150,7 +153,8 @@ class CancelManyCallsTest(unittest.TestCase):
server.register_completion_queue(server_completion_queue)
port = server.add_http2_port(b'[::]:0')
server.start()
- channel = cygrpc.Channel('localhost:{}'.format(port).encode(), None)
+ channel = cygrpc.Channel('localhost:{}'.format(port).encode(), None,
+ None)
state = _State()
@@ -165,31 +169,33 @@ class CancelManyCallsTest(unittest.TestCase):
client_condition = threading.Condition()
client_due = set()
- client_completion_queue = cygrpc.CompletionQueue()
- client_driver = _QueueDriver(client_condition, client_completion_queue,
- client_due)
- client_driver.start()
with client_condition:
client_calls = []
for index in range(test_constants.RPC_CONCURRENCY):
- client_call = channel.create_call(None, _EMPTY_FLAGS,
- client_completion_queue,
- b'/twinkies', None, None)
- operations = (
- cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA,
- _EMPTY_FLAGS),
- cygrpc.SendMessageOperation(b'\x45\x56', _EMPTY_FLAGS),
- cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
- cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
- cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
- cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
- )
tag = 'client_complete_call_{0:04d}_tag'.format(index)
- client_call.start_client_batch(operations, tag)
+ client_call = channel.integrated_call(
+ _EMPTY_FLAGS, b'/twinkies', None, None, _EMPTY_METADATA,
+ None, ((
+ (
+ cygrpc.SendInitialMetadataOperation(
+ _EMPTY_METADATA, _EMPTY_FLAGS),
+ cygrpc.SendMessageOperation(b'\x45\x56',
+ _EMPTY_FLAGS),
+ cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
+ cygrpc.ReceiveInitialMetadataOperation(
+ _EMPTY_FLAGS),
+ cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
+ cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
+ ),
+ tag,
+ ),))
client_due.add(tag)
client_calls.append(client_call)
+ client_events_future = test_utilities.SimpleFuture(
+ lambda: tuple(channel.next_call_event() for _ in range(_SUCCESSFUL_CALLS)))
+
with state.condition:
while True:
if state.parked_handlers < test_constants.THREAD_CONCURRENCY:
@@ -201,12 +207,14 @@ class CancelManyCallsTest(unittest.TestCase):
state.condition.notify_all()
break
- client_driver.events(
- test_constants.RPC_CONCURRENCY * _SUCCESS_CALL_FRACTION)
+ client_events_future.result()
with client_condition:
for client_call in client_calls:
- client_call.cancel()
+ client_call.cancel(cygrpc.StatusCode.cancelled, 'Cancelled!')
+ for _ in range(_UNSUCCESSFUL_CALLS):
+ channel.next_call_event()
+ channel.close(cygrpc.StatusCode.unknown, 'Cancelled on channel close!')
with state.condition:
server.shutdown(server_completion_queue, _SERVER_SHUTDOWN_TAG)
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_channel_test.py b/src/python/grpcio_tests/tests/unit/_cython/_channel_test.py
index 7305d0fa3f..d95286071d 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/_channel_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/_channel_test.py
@@ -21,25 +21,20 @@ from grpc._cython import cygrpc
from tests.unit.framework.common import test_constants
-def _channel_and_completion_queue():
- channel = cygrpc.Channel(b'localhost:54321', ())
- completion_queue = cygrpc.CompletionQueue()
- return channel, completion_queue
+def _channel():
+ return cygrpc.Channel(b'localhost:54321', (), None)
-def _connectivity_loop(channel, completion_queue):
+def _connectivity_loop(channel):
for _ in range(100):
connectivity = channel.check_connectivity_state(True)
- channel.watch_connectivity_state(connectivity,
- time.time() + 0.2, completion_queue,
- None)
- completion_queue.poll()
+ channel.watch_connectivity_state(connectivity, time.time() + 0.2)
def _create_loop_destroy():
- channel, completion_queue = _channel_and_completion_queue()
- _connectivity_loop(channel, completion_queue)
- completion_queue.shutdown()
+ channel = _channel()
+ _connectivity_loop(channel)
+ channel.close(cygrpc.StatusCode.ok, 'Channel close!')
def _in_parallel(behavior, arguments):
@@ -55,12 +50,9 @@ def _in_parallel(behavior, arguments):
class ChannelTest(unittest.TestCase):
def test_single_channel_lonely_connectivity(self):
- channel, completion_queue = _channel_and_completion_queue()
- _in_parallel(_connectivity_loop, (
- channel,
- completion_queue,
- ))
- completion_queue.shutdown()
+ channel = _channel()
+ _connectivity_loop(channel)
+ channel.close(cygrpc.StatusCode.ok, 'Channel close!')
def test_multiple_channels_lonely_connectivity(self):
_in_parallel(_create_loop_destroy, ())
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_common.py b/src/python/grpcio_tests/tests/unit/_cython/_common.py
index 7fd3d19b4e..d8210f36f8 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/_common.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/_common.py
@@ -100,7 +100,8 @@ class RpcTest(object):
self.server.register_completion_queue(self.server_completion_queue)
port = self.server.add_http2_port(b'[::]:0')
self.server.start()
- self.channel = cygrpc.Channel('localhost:{}'.format(port).encode(), [])
+ self.channel = cygrpc.Channel('localhost:{}'.format(port).encode(), [],
+ None)
self._server_shutdown_tag = 'server_shutdown_tag'
self.server_condition = threading.Condition()
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py
index 7caa98f72d..8a721788f4 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py
@@ -19,6 +19,7 @@ import unittest
from grpc._cython import cygrpc
from tests.unit._cython import _common
+from tests.unit._cython import test_utilities
class Test(_common.RpcTest, unittest.TestCase):
@@ -41,31 +42,27 @@ class Test(_common.RpcTest, unittest.TestCase):
server_request_call_tag,
})
- client_call = self.channel.create_call(None, _common.EMPTY_FLAGS,
- self.client_completion_queue,
- b'/twinkies', None, None)
client_receive_initial_metadata_tag = 'client_receive_initial_metadata_tag'
client_complete_rpc_tag = 'client_complete_rpc_tag'
- with self.client_condition:
- client_receive_initial_metadata_start_batch_result = (
- client_call.start_client_batch([
- cygrpc.ReceiveInitialMetadataOperation(_common.EMPTY_FLAGS),
- ], client_receive_initial_metadata_tag))
- self.assertEqual(cygrpc.CallError.ok,
- client_receive_initial_metadata_start_batch_result)
- client_complete_rpc_start_batch_result = client_call.start_client_batch(
+ client_call = self.channel.integrated_call(
+ _common.EMPTY_FLAGS, b'/twinkies', None, None,
+ _common.INVOCATION_METADATA, None, [(
[
- cygrpc.SendInitialMetadataOperation(
- _common.INVOCATION_METADATA, _common.EMPTY_FLAGS),
- cygrpc.SendCloseFromClientOperation(_common.EMPTY_FLAGS),
- cygrpc.ReceiveStatusOnClientOperation(_common.EMPTY_FLAGS),
- ], client_complete_rpc_tag)
- self.assertEqual(cygrpc.CallError.ok,
- client_complete_rpc_start_batch_result)
- self.client_driver.add_due({
+ cygrpc.ReceiveInitialMetadataOperation(_common.EMPTY_FLAGS),
+ ],
client_receive_initial_metadata_tag,
- client_complete_rpc_tag,
- })
+ )])
+ client_call.operate([
+ cygrpc.SendInitialMetadataOperation(_common.INVOCATION_METADATA,
+ _common.EMPTY_FLAGS),
+ cygrpc.SendCloseFromClientOperation(_common.EMPTY_FLAGS),
+ cygrpc.ReceiveStatusOnClientOperation(_common.EMPTY_FLAGS),
+ ], client_complete_rpc_tag)
+
+ client_events_future = test_utilities.SimpleFuture(
+ lambda: [
+ self.channel.next_call_event(),
+ self.channel.next_call_event(),])
server_request_call_event = self.server_driver.event_with_tag(
server_request_call_tag)
@@ -96,20 +93,23 @@ class Test(_common.RpcTest, unittest.TestCase):
server_complete_rpc_event = server_call_driver.event_with_tag(
server_complete_rpc_tag)
- client_receive_initial_metadata_event = self.client_driver.event_with_tag(
- client_receive_initial_metadata_tag)
- client_complete_rpc_event = self.client_driver.event_with_tag(
- client_complete_rpc_tag)
+ client_events = client_events_future.result()
+ if client_events[0].tag is client_receive_initial_metadata_tag:
+ client_receive_initial_metadata_event = client_events[0]
+ client_complete_rpc_event = client_events[1]
+ else:
+ client_complete_rpc_event = client_events[0]
+ client_receive_initial_metadata_event = client_events[1]
return (
_common.OperationResult(server_request_call_start_batch_result,
server_request_call_event.completion_type,
server_request_call_event.success),
_common.OperationResult(
- client_receive_initial_metadata_start_batch_result,
+ cygrpc.CallError.ok,
client_receive_initial_metadata_event.completion_type,
client_receive_initial_metadata_event.success),
- _common.OperationResult(client_complete_rpc_start_batch_result,
+ _common.OperationResult(cygrpc.CallError.ok,
client_complete_rpc_event.completion_type,
client_complete_rpc_event.success),
_common.OperationResult(
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py
index 8582a39c01..47f39ebce2 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py
@@ -19,6 +19,7 @@ import unittest
from grpc._cython import cygrpc
from tests.unit._cython import _common
+from tests.unit._cython import test_utilities
class Test(_common.RpcTest, unittest.TestCase):
@@ -36,28 +37,31 @@ class Test(_common.RpcTest, unittest.TestCase):
server_request_call_tag,
})
- client_call = self.channel.create_call(None, _common.EMPTY_FLAGS,
- self.client_completion_queue,
- b'/twinkies', None, None)
client_receive_initial_metadata_tag = 'client_receive_initial_metadata_tag'
client_complete_rpc_tag = 'client_complete_rpc_tag'
- with self.client_condition:
- client_receive_initial_metadata_start_batch_result = (
- client_call.start_client_batch([
- cygrpc.ReceiveInitialMetadataOperation(_common.EMPTY_FLAGS),
- ], client_receive_initial_metadata_tag))
- client_complete_rpc_start_batch_result = client_call.start_client_batch(
- [
- cygrpc.SendInitialMetadataOperation(
- _common.INVOCATION_METADATA, _common.EMPTY_FLAGS),
- cygrpc.SendCloseFromClientOperation(_common.EMPTY_FLAGS),
- cygrpc.ReceiveStatusOnClientOperation(_common.EMPTY_FLAGS),
- ], client_complete_rpc_tag)
- self.client_driver.add_due({
- client_receive_initial_metadata_tag,
- client_complete_rpc_tag,
- })
-
+ client_call = self.channel.integrated_call(
+ _common.EMPTY_FLAGS, b'/twinkies', None, None,
+ _common.INVOCATION_METADATA, None, [
+ (
+ [
+ cygrpc.SendInitialMetadataOperation(
+ _common.INVOCATION_METADATA, _common.EMPTY_FLAGS),
+ cygrpc.SendCloseFromClientOperation(
+ _common.EMPTY_FLAGS),
+ cygrpc.ReceiveStatusOnClientOperation(
+ _common.EMPTY_FLAGS),
+ ],
+ client_complete_rpc_tag,
+ ),
+ ])
+ client_call.operate([
+ cygrpc.ReceiveInitialMetadataOperation(_common.EMPTY_FLAGS),
+ ], client_receive_initial_metadata_tag)
+
+ client_events_future = test_utilities.SimpleFuture(
+ lambda: [
+ self.channel.next_call_event(),
+ self.channel.next_call_event(),])
server_request_call_event = self.server_driver.event_with_tag(
server_request_call_tag)
@@ -87,20 +91,19 @@ class Test(_common.RpcTest, unittest.TestCase):
server_complete_rpc_event = self.server_driver.event_with_tag(
server_complete_rpc_tag)
- client_receive_initial_metadata_event = self.client_driver.event_with_tag(
- client_receive_initial_metadata_tag)
- client_complete_rpc_event = self.client_driver.event_with_tag(
- client_complete_rpc_tag)
+ client_events = client_events_future.result()
+ client_receive_initial_metadata_event = client_events[0]
+ client_complete_rpc_event = client_events[1]
return (
_common.OperationResult(server_request_call_start_batch_result,
server_request_call_event.completion_type,
server_request_call_event.success),
_common.OperationResult(
- client_receive_initial_metadata_start_batch_result,
+ cygrpc.CallError.ok,
client_receive_initial_metadata_event.completion_type,
client_receive_initial_metadata_event.success),
- _common.OperationResult(client_complete_rpc_start_batch_result,
+ _common.OperationResult(cygrpc.CallError.ok,
client_complete_rpc_event.completion_type,
client_complete_rpc_event.success),
_common.OperationResult(
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py b/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py
index bc63b54879..8a903bfaf9 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py
@@ -17,6 +17,7 @@ import threading
import unittest
from grpc._cython import cygrpc
+from tests.unit._cython import test_utilities
_EMPTY_FLAGS = 0
_EMPTY_METADATA = ()
@@ -118,7 +119,8 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase):
server.register_completion_queue(server_completion_queue)
port = server.add_http2_port(b'[::]:0')
server.start()
- channel = cygrpc.Channel('localhost:{}'.format(port).encode(), set())
+ channel = cygrpc.Channel('localhost:{}'.format(port).encode(), set(),
+ None)
server_shutdown_tag = 'server_shutdown_tag'
server_driver = _ServerDriver(server_completion_queue,
@@ -127,10 +129,6 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase):
client_condition = threading.Condition()
client_due = set()
- client_completion_queue = cygrpc.CompletionQueue()
- client_driver = _QueueDriver(client_condition, client_completion_queue,
- client_due)
- client_driver.start()
server_call_condition = threading.Condition()
server_send_initial_metadata_tag = 'server_send_initial_metadata_tag'
@@ -154,25 +152,28 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase):
server_completion_queue,
server_rpc_tag)
- client_call = channel.create_call(None, _EMPTY_FLAGS,
- client_completion_queue, b'/twinkies',
- None, None)
client_receive_initial_metadata_tag = 'client_receive_initial_metadata_tag'
client_complete_rpc_tag = 'client_complete_rpc_tag'
- with client_condition:
- client_receive_initial_metadata_start_batch_result = (
- client_call.start_client_batch([
- cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
- ], client_receive_initial_metadata_tag))
- client_due.add(client_receive_initial_metadata_tag)
- client_complete_rpc_start_batch_result = (
- client_call.start_client_batch([
- cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA,
- _EMPTY_FLAGS),
- cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
- cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
- ], client_complete_rpc_tag))
- client_due.add(client_complete_rpc_tag)
+ client_call = channel.segregated_call(
+ _EMPTY_FLAGS, b'/twinkies', None, None, _EMPTY_METADATA, None, (
+ (
+ [
+ cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
+ ],
+ client_receive_initial_metadata_tag,
+ ),
+ (
+ [
+ cygrpc.SendInitialMetadataOperation(
+ _EMPTY_METADATA, _EMPTY_FLAGS),
+ cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
+ cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
+ ],
+ client_complete_rpc_tag,
+ ),
+ ))
+ client_receive_initial_metadata_event_future = test_utilities.SimpleFuture(
+ client_call.next_event)
server_rpc_event = server_driver.first_event()
@@ -208,19 +209,20 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase):
server_complete_rpc_tag)
server_call_driver.events()
- with client_condition:
- client_receive_first_message_tag = 'client_receive_first_message_tag'
- client_receive_first_message_start_batch_result = (
- client_call.start_client_batch([
- cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
- ], client_receive_first_message_tag))
- client_due.add(client_receive_first_message_tag)
- client_receive_first_message_event = client_driver.event_with_tag(
- client_receive_first_message_tag)
+ client_recieve_initial_metadata_event = client_receive_initial_metadata_event_future.result(
+ )
+
+ client_receive_first_message_tag = 'client_receive_first_message_tag'
+ client_call.operate([
+ cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
+ ], client_receive_first_message_tag)
+ client_receive_first_message_event = client_call.next_event()
- client_call_cancel_result = client_call.cancel()
- client_driver.events()
+ client_call_cancel_result = client_call.cancel(
+ cygrpc.StatusCode.cancelled, 'Cancelled during test!')
+ client_complete_rpc_event = client_call.next_event()
+ channel.close(cygrpc.StatusCode.unknown, 'Channel closed!')
server.shutdown(server_completion_queue, server_shutdown_tag)
server.cancel_all_calls()
server_driver.events()
@@ -228,11 +230,6 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase):
self.assertEqual(cygrpc.CallError.ok, request_call_result)
self.assertEqual(cygrpc.CallError.ok,
server_send_initial_metadata_start_batch_result)
- self.assertEqual(cygrpc.CallError.ok,
- client_receive_initial_metadata_start_batch_result)
- self.assertEqual(cygrpc.CallError.ok,
- client_complete_rpc_start_batch_result)
- self.assertEqual(cygrpc.CallError.ok, client_call_cancel_result)
self.assertIs(server_rpc_tag, server_rpc_event.tag)
self.assertEqual(cygrpc.CompletionType.operation_complete,
server_rpc_event.completion_type)
diff --git a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py
index 9045ff58a0..724a690746 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py
@@ -51,8 +51,8 @@ class TypeSmokeTest(unittest.TestCase):
del server
def testChannelUpDown(self):
- channel = cygrpc.Channel(b'[::]:0', None)
- del channel
+ channel = cygrpc.Channel(b'[::]:0', None, None)
+ channel.close(cygrpc.StatusCode.cancelled, 'Test method anyway!')
def test_metadata_plugin_call_credentials_up_down(self):
cygrpc.MetadataPluginCallCredentials(_metadata_plugin,
@@ -121,7 +121,7 @@ class ServerClientMixin(object):
client_credentials)
else:
self.client_channel = cygrpc.Channel('localhost:{}'.format(
- self.port).encode(), set())
+ self.port).encode(), set(), None)
if host_override:
self.host_argument = None # default host
self.expected_host = host_override
@@ -131,17 +131,20 @@ class ServerClientMixin(object):
self.expected_host = self.host_argument
def tearDownMixin(self):
+ self.client_channel.close(cygrpc.StatusCode.ok, 'test being torn down!')
+ del self.client_channel
del self.server
del self.client_completion_queue
del self.server_completion_queue
- def _perform_operations(self, operations, call, queue, deadline,
- description):
- """Perform the list of operations with given call, queue, and deadline.
+ def _perform_queue_operations(self, operations, call, queue, deadline,
+ description):
+ """Perform the operations with given call, queue, and deadline.
- Invocation errors are reported with as an exception with `description` in
- the message. Performs the operations asynchronously, returning a future.
- """
+ Invocation errors are reported with as an exception with `description`
+ in the message. Performs the operations asynchronously, returning a
+ future.
+ """
def performer():
tag = object()
@@ -185,9 +188,6 @@ class ServerClientMixin(object):
self.assertEqual(cygrpc.CallError.ok, request_call_result)
client_call_tag = object()
- client_call = self.client_channel.create_call(
- None, 0, self.client_completion_queue, METHOD, self.host_argument,
- DEADLINE)
client_initial_metadata = (
(
CLIENT_METADATA_ASCII_KEY,
@@ -198,18 +198,24 @@ class ServerClientMixin(object):
CLIENT_METADATA_BIN_VALUE,
),
)
- client_start_batch_result = client_call.start_client_batch([
- cygrpc.SendInitialMetadataOperation(client_initial_metadata,
- _EMPTY_FLAGS),
- cygrpc.SendMessageOperation(REQUEST, _EMPTY_FLAGS),
- cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
- cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
- cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
- cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
- ], client_call_tag)
- self.assertEqual(cygrpc.CallError.ok, client_start_batch_result)
- client_event_future = test_utilities.CompletionQueuePollFuture(
- self.client_completion_queue, DEADLINE)
+ client_call = self.client_channel.integrated_call(
+ 0, METHOD, self.host_argument, DEADLINE, client_initial_metadata,
+ None, [
+ (
+ [
+ cygrpc.SendInitialMetadataOperation(
+ client_initial_metadata, _EMPTY_FLAGS),
+ cygrpc.SendMessageOperation(REQUEST, _EMPTY_FLAGS),
+ cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
+ cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
+ cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
+ cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
+ ],
+ client_call_tag,
+ ),
+ ])
+ client_event_future = test_utilities.SimpleFuture(
+ self.client_channel.next_call_event)
request_event = self.server_completion_queue.poll(deadline=DEADLINE)
self.assertEqual(cygrpc.CompletionType.operation_complete,
@@ -285,7 +291,7 @@ class ServerClientMixin(object):
self.assertEqual(5, len(server_event.batch_operations))
found_server_op_types = set()
for server_result in server_event.batch_operations:
- self.assertNotIn(client_result.type(), found_server_op_types)
+ self.assertNotIn(server_result.type(), found_server_op_types)
found_server_op_types.add(server_result.type())
if server_result.type() == cygrpc.OperationType.receive_message:
self.assertEqual(REQUEST, server_result.message())
@@ -304,66 +310,76 @@ class ServerClientMixin(object):
del client_call
del server_call
- def test6522(self):
+ def test_6522(self):
DEADLINE = time.time() + 5
DEADLINE_TOLERANCE = 0.25
METHOD = b'twinkies'
empty_metadata = ()
+ # Prologue
server_request_tag = object()
self.server.request_call(self.server_completion_queue,
self.server_completion_queue,
server_request_tag)
- client_call = self.client_channel.create_call(
- None, 0, self.client_completion_queue, METHOD, self.host_argument,
- DEADLINE)
-
- # Prologue
- def perform_client_operations(operations, description):
- return self._perform_operations(operations, client_call,
- self.client_completion_queue,
- DEADLINE, description)
-
- client_event_future = perform_client_operations([
- cygrpc.SendInitialMetadataOperation(empty_metadata, _EMPTY_FLAGS),
- cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
- ], "Client prologue")
+ client_call = self.client_channel.segregated_call(
+ 0, METHOD, self.host_argument, DEADLINE, None, None, ([(
+ [
+ cygrpc.SendInitialMetadataOperation(empty_metadata,
+ _EMPTY_FLAGS),
+ cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
+ ],
+ object(),
+ ), (
+ [
+ cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
+ ],
+ object(),
+ )]))
+
+ client_initial_metadata_event_future = test_utilities.SimpleFuture(
+ client_call.next_event)
request_event = self.server_completion_queue.poll(deadline=DEADLINE)
server_call = request_event.call
def perform_server_operations(operations, description):
- return self._perform_operations(operations, server_call,
- self.server_completion_queue,
- DEADLINE, description)
+ return self._perform_queue_operations(operations, server_call,
+ self.server_completion_queue,
+ DEADLINE, description)
server_event_future = perform_server_operations([
cygrpc.SendInitialMetadataOperation(empty_metadata, _EMPTY_FLAGS),
], "Server prologue")
- client_event_future.result() # force completion
+ client_initial_metadata_event_future.result() # force completion
server_event_future.result()
# Messaging
for _ in range(10):
- client_event_future = perform_client_operations([
+ client_call.operate([
cygrpc.SendMessageOperation(b'', _EMPTY_FLAGS),
cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
], "Client message")
+ client_message_event_future = test_utilities.SimpleFuture(
+ client_call.next_event)
server_event_future = perform_server_operations([
cygrpc.SendMessageOperation(b'', _EMPTY_FLAGS),
cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
], "Server receive")
- client_event_future.result() # force completion
+ client_message_event_future.result() # force completion
server_event_future.result()
# Epilogue
- client_event_future = perform_client_operations([
+ client_call.operate([
cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
- cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS)
], "Client epilogue")
+ # One for ReceiveStatusOnClient, one for SendCloseFromClient.
+ client_events_future = test_utilities.SimpleFuture(
+ lambda: {
+ client_call.next_event(),
+ client_call.next_event(),})
server_event_future = perform_server_operations([
cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),
@@ -371,7 +387,7 @@ class ServerClientMixin(object):
empty_metadata, cygrpc.StatusCode.ok, b'', _EMPTY_FLAGS)
], "Server epilogue")
- client_event_future.result() # force completion
+ client_events_future.result() # force completion
server_event_future.result()
diff --git a/src/python/grpcio_tests/tests/unit/_cython/test_utilities.py b/src/python/grpcio_tests/tests/unit/_cython/test_utilities.py
index 4a00b9ef2f..7d5eaaaa84 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/test_utilities.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/test_utilities.py
@@ -25,7 +25,7 @@ class SimpleFuture(object):
def wrapped_function():
try:
self._result = function(*args, **kwargs)
- except Exception as error:
+ except Exception as error: # pylint: disable=broad-except
self._error = error
self._result = None
@@ -41,7 +41,7 @@ class SimpleFuture(object):
self._thread.join()
if self._error:
# TODO(atash): re-raise exceptions in a way that preserves tracebacks
- raise self._error
+ raise self._error # pylint: disable=raising-bad-type
return self._result
diff --git a/src/python/grpcio_tests/tests/unit/_exit_test.py b/src/python/grpcio_tests/tests/unit/_exit_test.py
index 6e6d9de0fb..f40f3ae07c 100644
--- a/src/python/grpcio_tests/tests/unit/_exit_test.py
+++ b/src/python/grpcio_tests/tests/unit/_exit_test.py
@@ -49,7 +49,7 @@ def cleanup_processes():
for process in processes:
try:
process.kill()
- except Exception:
+ except Exception: # pylint: disable=broad-except
pass
diff --git a/src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py b/src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py
index e683131722..ad847ae03e 100644
--- a/src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py
+++ b/src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py
@@ -14,7 +14,7 @@
_BEFORE_IMPORT = tuple(globals())
-from grpc import *
+from grpc import * # pylint: disable=wildcard-import
_AFTER_IMPORT = tuple(globals())
diff --git a/src/python/grpcio_tests/tests/unit/_invalid_metadata_test.py b/src/python/grpcio_tests/tests/unit/_invalid_metadata_test.py
index 4edf0fc4ad..f153089a24 100644
--- a/src/python/grpcio_tests/tests/unit/_invalid_metadata_test.py
+++ b/src/python/grpcio_tests/tests/unit/_invalid_metadata_test.py
@@ -81,29 +81,16 @@ class InvalidMetadataTest(unittest.TestCase):
request = b'\x07\x08'
metadata = (('InVaLiD', 'UnaryRequestFutureUnaryResponse'),)
expected_error_details = "metadata was invalid: %s" % metadata
- response_future = self._unary_unary.future(request, metadata=metadata)
- with self.assertRaises(grpc.RpcError) as exception_context:
- response_future.result()
- self.assertEqual(exception_context.exception.details(),
- expected_error_details)
- self.assertEqual(exception_context.exception.code(),
- grpc.StatusCode.INTERNAL)
- self.assertEqual(response_future.details(), expected_error_details)
- self.assertEqual(response_future.code(), grpc.StatusCode.INTERNAL)
+ with self.assertRaises(ValueError) as exception_context:
+ self._unary_unary.future(request, metadata=metadata)
def testUnaryRequestStreamResponse(self):
request = b'\x37\x58'
metadata = (('InVaLiD', 'UnaryRequestStreamResponse'),)
expected_error_details = "metadata was invalid: %s" % metadata
- response_iterator = self._unary_stream(request, metadata=metadata)
- with self.assertRaises(grpc.RpcError) as exception_context:
- next(response_iterator)
- self.assertEqual(exception_context.exception.details(),
- expected_error_details)
- self.assertEqual(exception_context.exception.code(),
- grpc.StatusCode.INTERNAL)
- self.assertEqual(response_iterator.details(), expected_error_details)
- self.assertEqual(response_iterator.code(), grpc.StatusCode.INTERNAL)
+ with self.assertRaises(ValueError) as exception_context:
+ self._unary_stream(request, metadata=metadata)
+ self.assertIn(expected_error_details, str(exception_context.exception))
def testStreamRequestBlockingUnaryResponse(self):
request_iterator = (
@@ -129,32 +116,18 @@ class InvalidMetadataTest(unittest.TestCase):
b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
metadata = (('InVaLiD', 'StreamRequestFutureUnaryResponse'),)
expected_error_details = "metadata was invalid: %s" % metadata
- response_future = self._stream_unary.future(
- request_iterator, metadata=metadata)
- with self.assertRaises(grpc.RpcError) as exception_context:
- response_future.result()
- self.assertEqual(exception_context.exception.details(),
- expected_error_details)
- self.assertEqual(exception_context.exception.code(),
- grpc.StatusCode.INTERNAL)
- self.assertEqual(response_future.details(), expected_error_details)
- self.assertEqual(response_future.code(), grpc.StatusCode.INTERNAL)
+ with self.assertRaises(ValueError) as exception_context:
+ self._stream_unary.future(request_iterator, metadata=metadata)
+ self.assertIn(expected_error_details, str(exception_context.exception))
def testStreamRequestStreamResponse(self):
request_iterator = (
b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
metadata = (('InVaLiD', 'StreamRequestStreamResponse'),)
expected_error_details = "metadata was invalid: %s" % metadata
- response_iterator = self._stream_stream(
- request_iterator, metadata=metadata)
- with self.assertRaises(grpc.RpcError) as exception_context:
- next(response_iterator)
- self.assertEqual(exception_context.exception.details(),
- expected_error_details)
- self.assertEqual(exception_context.exception.code(),
- grpc.StatusCode.INTERNAL)
- self.assertEqual(response_iterator.details(), expected_error_details)
- self.assertEqual(response_iterator.code(), grpc.StatusCode.INTERNAL)
+ with self.assertRaises(ValueError) as exception_context:
+ self._stream_stream(request_iterator, metadata=metadata)
+ self.assertIn(expected_error_details, str(exception_context.exception))
if __name__ == '__main__':
diff --git a/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py
index e40cca8b24..93a5fdf9ff 100644
--- a/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py
+++ b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py
@@ -165,11 +165,13 @@ class FailAfterFewIterationsCounter(object):
def __next__(self):
if self._current >= self._high:
- raise Exception("This is a deliberate failure in a unit test.")
+ raise test_control.Defect()
else:
self._current += 1
return self._bytestring
+ next = __next__
+
def _unary_unary_multi_callable(channel):
return channel.unary_unary(_UNARY_UNARY)
diff --git a/src/python/grpcio_tests/tests/unit/_rpc_test.py b/src/python/grpcio_tests/tests/unit/_rpc_test.py
index 54f01d9f8d..34e7831a98 100644
--- a/src/python/grpcio_tests/tests/unit/_rpc_test.py
+++ b/src/python/grpcio_tests/tests/unit/_rpc_test.py
@@ -225,6 +225,7 @@ class RPCTest(unittest.TestCase):
self.assertEqual(expected_response, response)
self.assertIs(grpc.StatusCode.OK, call.code())
+ self.assertEqual("", call.debug_error_string())
def testSuccessfulUnaryRequestFutureUnaryResponse(self):
request = b'\x07\x08'
@@ -706,6 +707,13 @@ class RPCTest(unittest.TestCase):
self.assertIs(grpc.StatusCode.UNKNOWN,
exception_context.exception.code())
+ # sanity checks on to make sure returned string contains default members
+ # of the error
+ debug_error_string = exception_context.exception.debug_error_string()
+ self.assertIn("created", debug_error_string)
+ self.assertIn("description", debug_error_string)
+ self.assertIn("file", debug_error_string)
+ self.assertIn("file_line", debug_error_string)
def testFailedUnaryRequestFutureUnaryResponse(self):
request = b'\x37\x17'
diff --git a/src/python/grpcio_tests/tests/unit/_thread_cleanup_test.py b/src/python/grpcio_tests/tests/unit/_thread_cleanup_test.py
deleted file mode 100644
index 18f5af058a..0000000000
--- a/src/python/grpcio_tests/tests/unit/_thread_cleanup_test.py
+++ /dev/null
@@ -1,115 +0,0 @@
-# Copyright 2016 gRPC authors.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""Tests for CleanupThread."""
-
-import threading
-import time
-import unittest
-
-from grpc import _common
-
-_SHORT_TIME = 0.5
-_LONG_TIME = 5.0
-_EPSILON = 0.5
-
-
-def cleanup(timeout):
- if timeout is not None:
- time.sleep(timeout)
- else:
- time.sleep(_LONG_TIME)
-
-
-def slow_cleanup(timeout):
- # Don't respect timeout
- time.sleep(_LONG_TIME)
-
-
-class CleanupThreadTest(unittest.TestCase):
-
- def testTargetInvocation(self):
- event = threading.Event()
-
- def target(arg1, arg2, arg3=None):
- self.assertEqual('arg1', arg1)
- self.assertEqual('arg2', arg2)
- self.assertEqual('arg3', arg3)
- event.set()
-
- cleanup_thread = _common.CleanupThread(
- behavior=lambda x: None,
- target=target,
- name='test-name',
- args=('arg1', 'arg2'),
- kwargs={
- 'arg3': 'arg3'
- })
- cleanup_thread.start()
- cleanup_thread.join()
- self.assertEqual(cleanup_thread.name, 'test-name')
- self.assertTrue(event.is_set())
-
- def testJoinNoTimeout(self):
- cleanup_thread = _common.CleanupThread(behavior=cleanup)
- cleanup_thread.start()
- start_time = time.time()
- cleanup_thread.join()
- end_time = time.time()
- self.assertAlmostEqual(
- _LONG_TIME, end_time - start_time, delta=_EPSILON)
-
- def testJoinTimeout(self):
- cleanup_thread = _common.CleanupThread(behavior=cleanup)
- cleanup_thread.start()
- start_time = time.time()
- cleanup_thread.join(_SHORT_TIME)
- end_time = time.time()
- self.assertAlmostEqual(
- _SHORT_TIME, end_time - start_time, delta=_EPSILON)
-
- def testJoinTimeoutSlowBehavior(self):
- cleanup_thread = _common.CleanupThread(behavior=slow_cleanup)
- cleanup_thread.start()
- start_time = time.time()
- cleanup_thread.join(_SHORT_TIME)
- end_time = time.time()
- self.assertAlmostEqual(
- _LONG_TIME, end_time - start_time, delta=_EPSILON)
-
- def testJoinTimeoutSlowTarget(self):
- event = threading.Event()
-
- def target():
- event.wait(_LONG_TIME)
-
- cleanup_thread = _common.CleanupThread(behavior=cleanup, target=target)
- cleanup_thread.start()
- start_time = time.time()
- cleanup_thread.join(_SHORT_TIME)
- end_time = time.time()
- self.assertAlmostEqual(
- _SHORT_TIME, end_time - start_time, delta=_EPSILON)
- event.set()
-
- def testJoinZeroTimeout(self):
- cleanup_thread = _common.CleanupThread(behavior=cleanup)
- cleanup_thread.start()
- start_time = time.time()
- cleanup_thread.join(0)
- end_time = time.time()
- self.assertAlmostEqual(0, end_time - start_time, delta=_EPSILON)
-
-
-if __name__ == '__main__':
- unittest.main(verbosity=2)
diff --git a/src/python/grpcio_tests/tests/unit/beta/_beta_features_test.py b/src/python/grpcio_tests/tests/unit/beta/_beta_features_test.py
index 61c03f64ba..b43c647fc9 100644
--- a/src/python/grpcio_tests/tests/unit/beta/_beta_features_test.py
+++ b/src/python/grpcio_tests/tests/unit/beta/_beta_features_test.py
@@ -65,7 +65,7 @@ class _Servicer(object):
self._serviced = True
self._condition.notify_all()
return
- yield
+ yield # pylint: disable=unreachable
def stream_unary(self, request_iterator, context):
for request in request_iterator: