aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python
diff options
context:
space:
mode:
Diffstat (limited to 'src/python')
-rw-r--r--src/python/grpcio/grpc/__init__.py17
-rw-r--r--src/python/grpcio/grpc/_channel.py423
-rw-r--r--src/python/grpcio/grpc/_common.py34
-rw-r--r--src/python/grpcio/grpc/_cython/.gitignore2
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi5
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/channel.pxd.pxi56
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi477
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd.pxi8
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi93
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pxd152
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx448
-rw-r--r--src/python/grpcio/grpc/_cython/cygrpc.pxd3
-rw-r--r--src/python/grpcio/grpc/_cython/cygrpc.pyx3
-rw-r--r--src/python/grpcio/grpc/_grpcio_metadata.py2
-rw-r--r--src/python/grpcio/grpc/_interceptor.py13
-rw-r--r--src/python/grpcio/grpc/_server.py10
-rw-r--r--src/python/grpcio/grpc/beta/_server_adaptations.py7
-rw-r--r--src/python/grpcio/grpc/experimental/__init__.py (renamed from src/python/grpcio_tests/tests/unit/framework/interfaces/face/__init__.py)6
-rw-r--r--src/python/grpcio/grpc/experimental/gevent.py (renamed from src/python/grpcio_tests/tests/unit/framework/interfaces/face/_3069_test_constant.py)20
-rw-r--r--src/python/grpcio/grpc_core_dependencies.py35
-rw-r--r--src/python/grpcio/grpc_version.py2
-rw-r--r--src/python/grpcio_health_checking/grpc_version.py2
-rw-r--r--src/python/grpcio_health_checking/setup.py2
-rw-r--r--src/python/grpcio_reflection/README.rst10
-rw-r--r--src/python/grpcio_reflection/grpc_version.py2
-rw-r--r--src/python/grpcio_reflection/setup.py2
-rw-r--r--src/python/grpcio_testing/grpc_testing/_channel/_channel.py15
-rw-r--r--src/python/grpcio_testing/grpc_version.py2
-rw-r--r--src/python/grpcio_testing/setup.py2
-rw-r--r--src/python/grpcio_tests/commands.py51
-rw-r--r--src/python/grpcio_tests/grpc_version.py2
-rw-r--r--src/python/grpcio_tests/setup.py5
-rw-r--r--src/python/grpcio_tests/tests/_loader.py2
-rw-r--r--src/python/grpcio_tests/tests/_result.py4
-rw-r--r--src/python/grpcio_tests/tests/_runner.py52
-rw-r--r--src/python/grpcio_tests/tests/interop/_insecure_intraop_test.py3
-rw-r--r--src/python/grpcio_tests/tests/interop/_secure_intraop_test.py3
-rw-r--r--src/python/grpcio_tests/tests/interop/client.py4
-rw-r--r--src/python/grpcio_tests/tests/protoc_plugin/_python_plugin_test.py22
-rw-r--r--src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py1
-rw-r--r--src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py4
-rw-r--r--src/python/grpcio_tests/tests/qps/benchmark_client.py5
-rw-r--r--src/python/grpcio_tests/tests/qps/benchmark_server.py7
-rw-r--r--src/python/grpcio_tests/tests/qps/qps_worker.py5
-rw-r--r--src/python/grpcio_tests/tests/qps/worker_server.py6
-rw-r--r--src/python/grpcio_tests/tests/stress/test_runner.py2
-rw-r--r--src/python/grpcio_tests/tests/testing/_client_application.py24
-rw-r--r--src/python/grpcio_tests/tests/testing/_server_application.py2
-rw-r--r--src/python/grpcio_tests/tests/testing/_server_test.py5
-rw-r--r--src/python/grpcio_tests/tests/tests.json8
-rw-r--r--src/python/grpcio_tests/tests/unit/_auth_context_test.py3
-rw-r--r--src/python/grpcio_tests/tests/unit/_channel_close_test.py185
-rw-r--r--src/python/grpcio_tests/tests/unit/_compression_test.py4
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py52
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/_channel_test.py28
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/_common.py3
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py54
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py55
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py73
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py114
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/test_utilities.py4
-rw-r--r--src/python/grpcio_tests/tests/unit/_exit_test.py2
-rw-r--r--src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py2
-rw-r--r--src/python/grpcio_tests/tests/unit/_invalid_metadata_test.py49
-rw-r--r--src/python/grpcio_tests/tests/unit/_invocation_defects_test.py4
-rw-r--r--src/python/grpcio_tests/tests/unit/_reconnect_test.py5
-rw-r--r--src/python/grpcio_tests/tests/unit/_thread_cleanup_test.py115
-rw-r--r--src/python/grpcio_tests/tests/unit/beta/_beta_features_test.py2
-rw-r--r--src/python/grpcio_tests/tests/unit/beta/_face_interface_test.py132
-rw-r--r--src/python/grpcio_tests/tests/unit/framework/interfaces/__init__.py13
-rw-r--r--src/python/grpcio_tests/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py287
-rw-r--r--src/python/grpcio_tests/tests/unit/framework/interfaces/face/_digest.py432
-rw-r--r--src/python/grpcio_tests/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py508
-rw-r--r--src/python/grpcio_tests/tests/unit/framework/interfaces/face/_invocation.py198
-rw-r--r--src/python/grpcio_tests/tests/unit/framework/interfaces/face/_service.py304
-rw-r--r--src/python/grpcio_tests/tests/unit/framework/interfaces/face/_stock_service.py390
-rw-r--r--src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_cases.py53
-rw-r--r--src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_interfaces.py212
78 files changed, 1993 insertions, 3360 deletions
diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py
index 7fa7303691..b7ed0c8563 100644
--- a/src/python/grpcio/grpc/__init__.py
+++ b/src/python/grpcio/grpc/__init__.py
@@ -813,7 +813,11 @@ class StreamStreamMultiCallable(six.with_metaclass(abc.ABCMeta)):
class Channel(six.with_metaclass(abc.ABCMeta)):
- """Affords RPC invocation via generic methods on client-side."""
+ """Affords RPC invocation via generic methods on client-side.
+
+ Channel objects implement the Context Manager type, although they need not
+ support being entered and exited multiple times.
+ """
@abc.abstractmethod
def subscribe(self, callback, try_to_connect=False):
@@ -926,6 +930,17 @@ class Channel(six.with_metaclass(abc.ABCMeta)):
"""
raise NotImplementedError()
+ @abc.abstractmethod
+ def close(self):
+ """Closes this Channel and releases all resources held by it.
+
+ Closing the Channel will immediately terminate all RPCs active with the
+ Channel and it is not valid to invoke new RPCs with the Channel.
+
+ This method is idempotent.
+ """
+ raise NotImplementedError()
+
########################## Service-Side Context ##############################
diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py
index 25a4210974..8cc0e981ef 100644
--- a/src/python/grpcio/grpc/_channel.py
+++ b/src/python/grpcio/grpc/_channel.py
@@ -79,27 +79,6 @@ def _wait_once_until(condition, until):
condition.wait(timeout=remaining)
-_INTERNAL_CALL_ERROR_MESSAGE_FORMAT = (
- 'Internal gRPC call error %d. ' +
- 'Please report to https://github.com/grpc/grpc/issues')
-
-
-def _check_call_error(call_error, metadata):
- if call_error == cygrpc.CallError.invalid_metadata:
- raise ValueError('metadata was invalid: %s' % metadata)
- elif call_error != cygrpc.CallError.ok:
- raise ValueError(_INTERNAL_CALL_ERROR_MESSAGE_FORMAT % call_error)
-
-
-def _call_error_set_RPCstate(state, call_error, metadata):
- if call_error == cygrpc.CallError.invalid_metadata:
- _abort(state, grpc.StatusCode.INTERNAL,
- 'metadata was invalid: %s' % metadata)
- else:
- _abort(state, grpc.StatusCode.INTERNAL,
- _INTERNAL_CALL_ERROR_MESSAGE_FORMAT % call_error)
-
-
class _RPCState(object):
def __init__(self, due, initial_metadata, trailing_metadata, code, details):
@@ -163,7 +142,7 @@ def _handle_event(event, state, response_deserializer):
return callbacks
-def _event_handler(state, call, response_deserializer):
+def _event_handler(state, response_deserializer):
def handle_event(event):
with state.condition:
@@ -172,40 +151,47 @@ def _event_handler(state, call, response_deserializer):
done = not state.due
for callback in callbacks:
callback()
- return call if done else None
+ return done
return handle_event
-def _consume_request_iterator(request_iterator, state, call,
- request_serializer):
- event_handler = _event_handler(state, call, None)
+def _consume_request_iterator(request_iterator, state, call, request_serializer,
+ event_handler):
- def consume_request_iterator():
+ def consume_request_iterator(): # pylint: disable=too-many-branches
while True:
try:
request = next(request_iterator)
except StopIteration:
break
except Exception: # pylint: disable=broad-except
- logging.exception("Exception iterating requests!")
- call.cancel()
- _abort(state, grpc.StatusCode.UNKNOWN,
- "Exception iterating requests!")
+ code = grpc.StatusCode.UNKNOWN
+ details = 'Exception iterating requests!'
+ logging.exception(details)
+ call.cancel(_common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code],
+ details)
+ _abort(state, code, details)
return
serialized_request = _common.serialize(request, request_serializer)
with state.condition:
if state.code is None and not state.cancelled:
if serialized_request is None:
- call.cancel()
+ code = grpc.StatusCode.INTERNAL # pylint: disable=redefined-variable-type
details = 'Exception serializing request!'
- _abort(state, grpc.StatusCode.INTERNAL, details)
+ call.cancel(
+ _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code],
+ details)
+ _abort(state, code, details)
return
else:
operations = (cygrpc.SendMessageOperation(
serialized_request, _EMPTY_FLAGS),)
- call.start_client_batch(operations, event_handler)
- state.due.add(cygrpc.OperationType.send_message)
+ operating = call.operate(operations, event_handler)
+ if operating:
+ state.due.add(cygrpc.OperationType.send_message)
+ else:
+ return
while True:
state.condition.wait()
if state.code is None:
@@ -219,19 +205,12 @@ def _consume_request_iterator(request_iterator, state, call,
if state.code is None:
operations = (
cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),)
- call.start_client_batch(operations, event_handler)
- state.due.add(cygrpc.OperationType.send_close_from_client)
+ operating = call.operate(operations, event_handler)
+ if operating:
+ state.due.add(cygrpc.OperationType.send_close_from_client)
- def stop_consumption_thread(timeout): # pylint: disable=unused-argument
- with state.condition:
- if state.code is None:
- call.cancel()
- state.cancelled = True
- _abort(state, grpc.StatusCode.CANCELLED, 'Cancelled!')
- state.condition.notify_all()
-
- consumption_thread = _common.CleanupThread(
- stop_consumption_thread, target=consume_request_iterator)
+ consumption_thread = threading.Thread(target=consume_request_iterator)
+ consumption_thread.daemon = True
consumption_thread.start()
@@ -247,9 +226,12 @@ class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call):
def cancel(self):
with self._state.condition:
if self._state.code is None:
- self._call.cancel()
+ code = grpc.StatusCode.CANCELLED
+ details = 'Locally cancelled by application!'
+ self._call.cancel(
+ _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details)
self._state.cancelled = True
- _abort(self._state, grpc.StatusCode.CANCELLED, 'Cancelled!')
+ _abort(self._state, code, details)
self._state.condition.notify_all()
return False
@@ -318,12 +300,13 @@ class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call):
def _next(self):
with self._state.condition:
if self._state.code is None:
- event_handler = _event_handler(self._state, self._call,
+ event_handler = _event_handler(self._state,
self._response_deserializer)
- self._call.start_client_batch(
+ operating = self._call.operate(
(cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
event_handler)
- self._state.due.add(cygrpc.OperationType.receive_message)
+ if operating:
+ self._state.due.add(cygrpc.OperationType.receive_message)
elif self._state.code is grpc.StatusCode.OK:
raise StopIteration()
else:
@@ -408,9 +391,12 @@ class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call):
def __del__(self):
with self._state.condition:
if self._state.code is None:
- self._call.cancel()
- self._state.cancelled = True
self._state.code = grpc.StatusCode.CANCELLED
+ self._state.details = 'Cancelled upon garbage collection!'
+ self._state.cancelled = True
+ self._call.cancel(
+ _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[self._state.code],
+ self._state.details)
self._state.condition.notify_all()
@@ -437,6 +423,24 @@ def _end_unary_response_blocking(state, call, with_call, deadline):
raise _Rendezvous(state, None, None, deadline)
+def _stream_unary_invocation_operationses(metadata):
+ return (
+ (
+ cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS),
+ cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
+ cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
+ ),
+ (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
+ )
+
+
+def _stream_unary_invocation_operationses_and_tags(metadata):
+ return tuple((
+ operations,
+ None,
+ ) for operations in _stream_unary_invocation_operationses(metadata))
+
+
class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
def __init__(self, channel, managed_call, method, request_serializer,
@@ -448,8 +452,8 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
self._response_deserializer = response_deserializer
def _prepare(self, request, timeout, metadata):
- deadline, serialized_request, rendezvous = (_start_unary_request(
- request, timeout, self._request_serializer))
+ deadline, serialized_request, rendezvous = _start_unary_request(
+ request, timeout, self._request_serializer)
if serialized_request is None:
return None, None, None, rendezvous
else:
@@ -467,48 +471,38 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
def _blocking(self, request, timeout, metadata, credentials):
state, operations, deadline, rendezvous = self._prepare(
request, timeout, metadata)
- if rendezvous:
+ if state is None:
raise rendezvous
else:
- completion_queue = cygrpc.CompletionQueue()
- call = self._channel.create_call(None, 0, completion_queue,
- self._method, None, deadline)
- if credentials is not None:
- call.set_credentials(credentials._credentials)
- call_error = call.start_client_batch(operations, None)
- _check_call_error(call_error, metadata)
- _handle_event(completion_queue.poll(), state,
- self._response_deserializer)
- return state, call, deadline
+ call = self._channel.segregated_call(
+ 0, self._method, None, deadline, metadata, None
+ if credentials is None else credentials._credentials, ((
+ operations,
+ None,
+ ),))
+ event = call.next_event()
+ _handle_event(event, state, self._response_deserializer)
+ return state, call,
def __call__(self, request, timeout=None, metadata=None, credentials=None):
- state, call, deadline = self._blocking(request, timeout, metadata,
- credentials)
- return _end_unary_response_blocking(state, call, False, deadline)
+ state, call, = self._blocking(request, timeout, metadata, credentials)
+ return _end_unary_response_blocking(state, call, False, None)
def with_call(self, request, timeout=None, metadata=None, credentials=None):
- state, call, deadline = self._blocking(request, timeout, metadata,
- credentials)
- return _end_unary_response_blocking(state, call, True, deadline)
+ state, call, = self._blocking(request, timeout, metadata, credentials)
+ return _end_unary_response_blocking(state, call, True, None)
def future(self, request, timeout=None, metadata=None, credentials=None):
state, operations, deadline, rendezvous = self._prepare(
request, timeout, metadata)
- if rendezvous:
- return rendezvous
+ if state is None:
+ raise rendezvous
else:
- call, drive_call = self._managed_call(None, 0, self._method, None,
- deadline)
- if credentials is not None:
- call.set_credentials(credentials._credentials)
- event_handler = _event_handler(state, call,
- self._response_deserializer)
- with state.condition:
- call_error = call.start_client_batch(operations, event_handler)
- if call_error != cygrpc.CallError.ok:
- _call_error_set_RPCstate(state, call_error, metadata)
- return _Rendezvous(state, None, None, deadline)
- drive_call()
+ event_handler = _event_handler(state, self._response_deserializer)
+ call = self._managed_call(
+ 0, self._method, None, deadline, metadata, None
+ if credentials is None else credentials._credentials,
+ (operations,), event_handler)
return _Rendezvous(state, call, self._response_deserializer,
deadline)
@@ -524,34 +518,27 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
self._response_deserializer = response_deserializer
def __call__(self, request, timeout=None, metadata=None, credentials=None):
- deadline, serialized_request, rendezvous = (_start_unary_request(
- request, timeout, self._request_serializer))
+ deadline, serialized_request, rendezvous = _start_unary_request(
+ request, timeout, self._request_serializer)
if serialized_request is None:
raise rendezvous
else:
state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
- call, drive_call = self._managed_call(None, 0, self._method, None,
- deadline)
- if credentials is not None:
- call.set_credentials(credentials._credentials)
- event_handler = _event_handler(state, call,
- self._response_deserializer)
- with state.condition:
- call.start_client_batch(
- (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
- event_handler)
- operations = (
+ operationses = (
+ (
cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS),
cygrpc.SendMessageOperation(serialized_request,
_EMPTY_FLAGS),
cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
- )
- call_error = call.start_client_batch(operations, event_handler)
- if call_error != cygrpc.CallError.ok:
- _call_error_set_RPCstate(state, call_error, metadata)
- return _Rendezvous(state, None, None, deadline)
- drive_call()
+ ),
+ (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
+ )
+ event_handler = _event_handler(state, self._response_deserializer)
+ call = self._managed_call(
+ 0, self._method, None, deadline, metadata, None
+ if credentials is None else credentials._credentials,
+ operationses, event_handler)
return _Rendezvous(state, call, self._response_deserializer,
deadline)
@@ -569,49 +556,38 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
def _blocking(self, request_iterator, timeout, metadata, credentials):
deadline = _deadline(timeout)
state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
- completion_queue = cygrpc.CompletionQueue()
- call = self._channel.create_call(None, 0, completion_queue,
- self._method, None, deadline)
- if credentials is not None:
- call.set_credentials(credentials._credentials)
- with state.condition:
- call.start_client_batch(
- (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), None)
- operations = (
- cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS),
- cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
- cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
- )
- call_error = call.start_client_batch(operations, None)
- _check_call_error(call_error, metadata)
- _consume_request_iterator(request_iterator, state, call,
- self._request_serializer)
+ call = self._channel.segregated_call(
+ 0, self._method, None, deadline, metadata, None
+ if credentials is None else credentials._credentials,
+ _stream_unary_invocation_operationses_and_tags(metadata))
+ _consume_request_iterator(request_iterator, state, call,
+ self._request_serializer, None)
while True:
- event = completion_queue.poll()
+ event = call.next_event()
with state.condition:
_handle_event(event, state, self._response_deserializer)
state.condition.notify_all()
if not state.due:
break
- return state, call, deadline
+ return state, call,
def __call__(self,
request_iterator,
timeout=None,
metadata=None,
credentials=None):
- state, call, deadline = self._blocking(request_iterator, timeout,
- metadata, credentials)
- return _end_unary_response_blocking(state, call, False, deadline)
+ state, call, = self._blocking(request_iterator, timeout, metadata,
+ credentials)
+ return _end_unary_response_blocking(state, call, False, None)
def with_call(self,
request_iterator,
timeout=None,
metadata=None,
credentials=None):
- state, call, deadline = self._blocking(request_iterator, timeout,
- metadata, credentials)
- return _end_unary_response_blocking(state, call, True, deadline)
+ state, call, = self._blocking(request_iterator, timeout, metadata,
+ credentials)
+ return _end_unary_response_blocking(state, call, True, None)
def future(self,
request_iterator,
@@ -620,27 +596,13 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
credentials=None):
deadline = _deadline(timeout)
state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
- call, drive_call = self._managed_call(None, 0, self._method, None,
- deadline)
- if credentials is not None:
- call.set_credentials(credentials._credentials)
- event_handler = _event_handler(state, call, self._response_deserializer)
- with state.condition:
- call.start_client_batch(
- (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
- event_handler)
- operations = (
- cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS),
- cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
- cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
- )
- call_error = call.start_client_batch(operations, event_handler)
- if call_error != cygrpc.CallError.ok:
- _call_error_set_RPCstate(state, call_error, metadata)
- return _Rendezvous(state, None, None, deadline)
- drive_call()
- _consume_request_iterator(request_iterator, state, call,
- self._request_serializer)
+ event_handler = _event_handler(state, self._response_deserializer)
+ call = self._managed_call(
+ 0, self._method, None, deadline, metadata, None
+ if credentials is None else credentials._credentials,
+ _stream_unary_invocation_operationses(metadata), event_handler)
+ _consume_request_iterator(request_iterator, state, call,
+ self._request_serializer, event_handler)
return _Rendezvous(state, call, self._response_deserializer, deadline)
@@ -661,26 +623,20 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
credentials=None):
deadline = _deadline(timeout)
state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None)
- call, drive_call = self._managed_call(None, 0, self._method, None,
- deadline)
- if credentials is not None:
- call.set_credentials(credentials._credentials)
- event_handler = _event_handler(state, call, self._response_deserializer)
- with state.condition:
- call.start_client_batch(
- (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
- event_handler)
- operations = (
+ operationses = (
+ (
cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS),
cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
- )
- call_error = call.start_client_batch(operations, event_handler)
- if call_error != cygrpc.CallError.ok:
- _call_error_set_RPCstate(state, call_error, metadata)
- return _Rendezvous(state, None, None, deadline)
- drive_call()
- _consume_request_iterator(request_iterator, state, call,
- self._request_serializer)
+ ),
+ (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
+ )
+ event_handler = _event_handler(state, self._response_deserializer)
+ call = self._managed_call(
+ 0, self._method, None, deadline, metadata, None
+ if credentials is None else credentials._credentials, operationses,
+ event_handler)
+ _consume_request_iterator(request_iterator, state, call,
+ self._request_serializer, event_handler)
return _Rendezvous(state, call, self._response_deserializer, deadline)
@@ -689,67 +645,63 @@ class _ChannelCallState(object):
def __init__(self, channel):
self.lock = threading.Lock()
self.channel = channel
- self.completion_queue = cygrpc.CompletionQueue()
- self.managed_calls = None
+ self.managed_calls = 0
def _run_channel_spin_thread(state):
def channel_spin():
while True:
- event = state.completion_queue.poll()
- completed_call = event.tag(event)
- if completed_call is not None:
+ event = state.channel.next_call_event()
+ call_completed = event.tag(event)
+ if call_completed:
with state.lock:
- state.managed_calls.remove(completed_call)
- if not state.managed_calls:
- state.managed_calls = None
+ state.managed_calls -= 1
+ if state.managed_calls == 0:
return
- def stop_channel_spin(timeout): # pylint: disable=unused-argument
- with state.lock:
- if state.managed_calls is not None:
- for call in state.managed_calls:
- call.cancel()
-
- channel_spin_thread = _common.CleanupThread(
- stop_channel_spin, target=channel_spin)
+ channel_spin_thread = threading.Thread(target=channel_spin)
+ channel_spin_thread.daemon = True
channel_spin_thread.start()
def _channel_managed_call_management(state):
- def create(parent, flags, method, host, deadline):
- """Creates a managed cygrpc.Call and a function to call to drive it.
-
- If operations are successfully added to the returned cygrpc.Call, the
- returned function must be called. If operations are not successfully added
- to the returned cygrpc.Call, the returned function must not be called.
-
- Args:
- parent: A cygrpc.Call to be used as the parent of the created call.
- flags: An integer bitfield of call flags.
- method: The RPC method.
- host: A host string for the created call.
- deadline: A float to be the deadline of the created call or None if the
- call is to have an infinite deadline.
-
- Returns:
- A cygrpc.Call with which to conduct an RPC and a function to call if
- operations are successfully started on the call.
- """
- call = state.channel.create_call(parent, flags, state.completion_queue,
- method, host, deadline)
-
- def drive():
- with state.lock:
- if state.managed_calls is None:
- state.managed_calls = set((call,))
- _run_channel_spin_thread(state)
- else:
- state.managed_calls.add(call)
+ # pylint: disable=too-many-arguments
+ def create(flags, method, host, deadline, metadata, credentials,
+ operationses, event_handler):
+ """Creates a cygrpc.IntegratedCall.
- return call, drive
+ Args:
+ flags: An integer bitfield of call flags.
+ method: The RPC method.
+ host: A host string for the created call.
+ deadline: A float to be the deadline of the created call or None if
+ the call is to have an infinite deadline.
+ metadata: The metadata for the call or None.
+ credentials: A cygrpc.CallCredentials or None.
+ operationses: An iterable of iterables of cygrpc.Operations to be
+ started on the call.
+ event_handler: A behavior to call to handle the events resultant from
+ the operations on the call.
+
+ Returns:
+ A cygrpc.IntegratedCall with which to conduct an RPC.
+ """
+ operationses_and_tags = tuple((
+ operations,
+ event_handler,
+ ) for operations in operationses)
+ with state.lock:
+ call = state.channel.integrated_call(flags, method, host, deadline,
+ metadata, credentials,
+ operationses_and_tags)
+ if state.managed_calls == 0:
+ state.managed_calls = 1
+ _run_channel_spin_thread(state)
+ else:
+ state.managed_calls += 1
+ return call
return create
@@ -819,12 +771,9 @@ def _poll_connectivity(state, channel, initial_try_to_connect):
callback_and_connectivity[1] = state.connectivity
if callbacks:
_spawn_delivery(state, callbacks)
- completion_queue = cygrpc.CompletionQueue()
while True:
- channel.watch_connectivity_state(connectivity,
- time.time() + 0.2, completion_queue,
- None)
- event = completion_queue.poll()
+ event = channel.watch_connectivity_state(connectivity,
+ time.time() + 0.2)
with state.lock:
if not state.callbacks_and_connectivities and not state.try_to_connect:
state.polling = False
@@ -855,10 +804,10 @@ def _moot(state):
def _subscribe(state, callback, try_to_connect):
with state.lock:
if not state.callbacks_and_connectivities and not state.polling:
- polling_thread = _common.CleanupThread(
- lambda timeout: _moot(state),
+ polling_thread = threading.Thread(
target=_poll_connectivity,
args=(state, state.channel, bool(try_to_connect)))
+ polling_thread.daemon = True
polling_thread.start()
state.polling = True
state.callbacks_and_connectivities.append([callback, None])
@@ -906,11 +855,6 @@ class Channel(grpc.Channel):
self._call_state = _ChannelCallState(self._channel)
self._connectivity_state = _ChannelConnectivityState(self._channel)
- # TODO(https://github.com/grpc/grpc/issues/9884)
- # Temporary work around UNAVAILABLE issues
- # Remove this once c-core has retry support
- _subscribe(self._connectivity_state, lambda *args: None, None)
-
def subscribe(self, callback, try_to_connect=None):
_subscribe(self._connectivity_state, callback, try_to_connect)
@@ -949,5 +893,28 @@ class Channel(grpc.Channel):
self._channel, _channel_managed_call_management(self._call_state),
_common.encode(method), request_serializer, response_deserializer)
+ def _close(self):
+ self._channel.close(cygrpc.StatusCode.cancelled, 'Channel closed!')
+ _moot(self._connectivity_state)
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self._close()
+ return False
+
+ def close(self):
+ self._close()
+
def __del__(self):
+ # TODO(https://github.com/grpc/grpc/issues/12531): Several releases
+ # after 1.12 (1.16 or thereabouts?) add a "self._channel.close" call
+ # here (or more likely, call self._close() here). We don't do this today
+ # because many valid use cases today allow the channel to be deleted
+ # immediately after stubs are created. After a sufficient period of time
+ # has passed for all users to be trusted to hang out to their channels
+ # for as long as they are in use and to close them after using them,
+ # then deletion of this grpc._channel.Channel instance can be made to
+ # effect closure of the underlying cygrpc.Channel instance.
_moot(self._connectivity_state)
diff --git a/src/python/grpcio/grpc/_common.py b/src/python/grpcio/grpc/_common.py
index bbb69ad489..862987a0cd 100644
--- a/src/python/grpcio/grpc/_common.py
+++ b/src/python/grpcio/grpc/_common.py
@@ -14,8 +14,6 @@
"""Shared implementation."""
import logging
-import threading
-import time
import six
@@ -101,35 +99,3 @@ def deserialize(serialized_message, deserializer):
def fully_qualified_method(group, method):
return '/{}/{}'.format(group, method)
-
-
-class CleanupThread(threading.Thread):
- """A threading.Thread subclass supporting custom behavior on join().
-
- On Python Interpreter exit, Python will attempt to join outstanding threads
- prior to garbage collection. We may need to do additional cleanup, and
- we accomplish this by overriding the join() method.
- """
-
- def __init__(self, behavior, *args, **kwargs):
- """Constructor.
-
- Args:
- behavior (function): Function called on join() with a single
- argument, timeout, indicating the maximum duration of
- `behavior`, or None indicating `behavior` has no deadline.
- `behavior` must be idempotent.
- args: Positional arguments passed to threading.Thread constructor.
- kwargs: Keyword arguments passed to threading.Thread constructor.
- """
- super(CleanupThread, self).__init__(*args, **kwargs)
- self._behavior = behavior
-
- def join(self, timeout=None):
- start_time = time.time()
- self._behavior(timeout)
- end_time = time.time()
- if timeout is not None:
- timeout -= end_time - start_time
- timeout = max(timeout, 0)
- super(CleanupThread, self).join(timeout)
diff --git a/src/python/grpcio/grpc/_cython/.gitignore b/src/python/grpcio/grpc/_cython/.gitignore
index 306e3ad277..b9936e932c 100644
--- a/src/python/grpcio/grpc/_cython/.gitignore
+++ b/src/python/grpcio/grpc/_cython/.gitignore
@@ -1,4 +1,4 @@
-cygrpc.c
+cygrpc.cpp
*.a
*.so
*.dll
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi
index 0892215b6d..2e02111ddd 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi
@@ -30,9 +30,12 @@ cdef class Call:
tag, operations, self if retain_self else None)
batch_operation_tag.prepare()
cpython.Py_INCREF(batch_operation_tag)
- return grpc_call_start_batch(
+ cdef grpc_call_error error
+ with nogil:
+ error = grpc_call_start_batch(
self.c_call, batch_operation_tag.c_ops, batch_operation_tag.c_nops,
<cpython.PyObject *>batch_operation_tag, NULL)
+ return error
def start_client_batch(self, operations, tag):
# We don't reference this call in the operations tag because
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pxd.pxi
index 1ba76b7f83..eefc685c0b 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pxd.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pxd.pxi
@@ -13,9 +13,59 @@
# limitations under the License.
+cdef _check_call_error_no_metadata(c_call_error)
+
+
+cdef _check_and_raise_call_error_no_metadata(c_call_error)
+
+
+cdef _check_call_error(c_call_error, metadata)
+
+
+cdef class _CallState:
+
+ cdef grpc_call *c_call
+ cdef set due
+
+
+cdef class _ChannelState:
+
+ cdef object condition
+ cdef grpc_channel *c_channel
+ # A boolean field indicating that the channel is open (if True) or is being
+ # closed (i.e. a call to close is currently executing) or is closed (if
+ # False).
+ # TODO(https://github.com/grpc/grpc/issues/3064): Eliminate "is being closed"
+ # a state in which condition may be acquired by any thread, eliminate this
+ # field and just use the NULLness of c_channel as an indication that the
+ # channel is closed.
+ cdef object open
+
+ # A dict from _BatchOperationTag to _CallState
+ cdef dict integrated_call_states
+ cdef grpc_completion_queue *c_call_completion_queue
+
+ # A set of _CallState
+ cdef set segregated_call_states
+
+ cdef set connectivity_due
+ cdef grpc_completion_queue *c_connectivity_completion_queue
+
+
+cdef class IntegratedCall:
+
+ cdef _ChannelState _channel_state
+ cdef _CallState _call_state
+
+
+cdef class SegregatedCall:
+
+ cdef _ChannelState _channel_state
+ cdef _CallState _call_state
+ cdef grpc_completion_queue *_c_completion_queue
+
+
cdef class Channel:
cdef grpc_arg_pointer_vtable _vtable
- cdef grpc_channel *c_channel
- cdef list references
- cdef readonly _ArgumentsProcessor _arguments_processor
+ cdef _ChannelState _state
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
index a3966497bc..72e74e84ae 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
@@ -14,82 +14,439 @@
cimport cpython
+import threading
+
+_INTERNAL_CALL_ERROR_MESSAGE_FORMAT = (
+ 'Internal gRPC call error %d. ' +
+ 'Please report to https://github.com/grpc/grpc/issues')
+
+
+cdef str _call_error_metadata(metadata):
+ return 'metadata was invalid: %s' % metadata
+
+
+cdef str _call_error_no_metadata(c_call_error):
+ return _INTERNAL_CALL_ERROR_MESSAGE_FORMAT % c_call_error
+
+
+cdef str _call_error(c_call_error, metadata):
+ if c_call_error == GRPC_CALL_ERROR_INVALID_METADATA:
+ return _call_error_metadata(metadata)
+ else:
+ return _call_error_no_metadata(c_call_error)
+
+
+cdef _check_call_error_no_metadata(c_call_error):
+ if c_call_error != GRPC_CALL_OK:
+ return _INTERNAL_CALL_ERROR_MESSAGE_FORMAT % c_call_error
+ else:
+ return None
+
+
+cdef _check_and_raise_call_error_no_metadata(c_call_error):
+ error = _check_call_error_no_metadata(c_call_error)
+ if error is not None:
+ raise ValueError(error)
+
+
+cdef _check_call_error(c_call_error, metadata):
+ if c_call_error == GRPC_CALL_ERROR_INVALID_METADATA:
+ return _call_error_metadata(metadata)
+ else:
+ return _check_call_error_no_metadata(c_call_error)
+
+
+cdef void _raise_call_error_no_metadata(c_call_error) except *:
+ raise ValueError(_call_error_no_metadata(c_call_error))
+
+
+cdef void _raise_call_error(c_call_error, metadata) except *:
+ raise ValueError(_call_error(c_call_error, metadata))
+
+
+cdef _destroy_c_completion_queue(grpc_completion_queue *c_completion_queue):
+ grpc_completion_queue_shutdown(c_completion_queue)
+ grpc_completion_queue_destroy(c_completion_queue)
+
+
+cdef class _CallState:
+
+ def __cinit__(self):
+ self.due = set()
+
+
+cdef class _ChannelState:
+
+ def __cinit__(self):
+ self.condition = threading.Condition()
+ self.open = True
+ self.integrated_call_states = {}
+ self.segregated_call_states = set()
+ self.connectivity_due = set()
+
+
+cdef tuple _operate(grpc_call *c_call, object operations, object user_tag):
+ cdef grpc_call_error c_call_error
+ cdef _BatchOperationTag tag = _BatchOperationTag(user_tag, operations, None)
+ tag.prepare()
+ cpython.Py_INCREF(tag)
+ with nogil:
+ c_call_error = grpc_call_start_batch(
+ c_call, tag.c_ops, tag.c_nops, <cpython.PyObject *>tag, NULL)
+ return c_call_error, tag
+
+
+cdef object _operate_from_integrated_call(
+ _ChannelState channel_state, _CallState call_state, object operations,
+ object user_tag):
+ cdef grpc_call_error c_call_error
+ cdef _BatchOperationTag tag
+ with channel_state.condition:
+ if call_state.due:
+ c_call_error, tag = _operate(call_state.c_call, operations, user_tag)
+ if c_call_error == GRPC_CALL_OK:
+ call_state.due.add(tag)
+ channel_state.integrated_call_states[tag] = call_state
+ return True
+ else:
+ _raise_call_error_no_metadata(c_call_error)
+ else:
+ return False
+
+
+cdef object _operate_from_segregated_call(
+ _ChannelState channel_state, _CallState call_state, object operations,
+ object user_tag):
+ cdef grpc_call_error c_call_error
+ cdef _BatchOperationTag tag
+ with channel_state.condition:
+ if call_state.due:
+ c_call_error, tag = _operate(call_state.c_call, operations, user_tag)
+ if c_call_error == GRPC_CALL_OK:
+ call_state.due.add(tag)
+ return True
+ else:
+ _raise_call_error_no_metadata(c_call_error)
+ else:
+ return False
+
+
+cdef _cancel(
+ _ChannelState channel_state, _CallState call_state, grpc_status_code code,
+ str details):
+ cdef grpc_call_error c_call_error
+ with channel_state.condition:
+ if call_state.due:
+ c_call_error = grpc_call_cancel_with_status(
+ call_state.c_call, code, _encode(details), NULL)
+ _check_and_raise_call_error_no_metadata(c_call_error)
+
+
+cdef BatchOperationEvent _next_call_event(
+ _ChannelState channel_state, grpc_completion_queue *c_completion_queue,
+ on_success):
+ tag, event = _latent_event(c_completion_queue, None)
+ with channel_state.condition:
+ on_success(tag)
+ channel_state.condition.notify_all()
+ return event
+
+
+# TODO(https://github.com/grpc/grpc/issues/14569): This could be a lot simpler.
+cdef void _call(
+ _ChannelState channel_state, _CallState call_state,
+ grpc_completion_queue *c_completion_queue, on_success, int flags, method,
+ host, object deadline, CallCredentials credentials,
+ object operationses_and_user_tags, object metadata) except *:
+ """Invokes an RPC.
+
+ Args:
+ channel_state: A _ChannelState with its "open" attribute set to True. RPCs
+ may not be invoked on a closed channel.
+ call_state: An empty _CallState to be altered (specifically assigned a
+ c_call and having its due set populated) if the RPC invocation is
+ successful.
+ c_completion_queue: A grpc_completion_queue to be used for the call's
+ operations.
+ on_success: A behavior to be called if attempting to start operations for
+ the call succeeds. If called the behavior will be called while holding the
+ channel_state condition and passed the tags associated with operations
+ that were successfully started for the call.
+ flags: Flags to be passed to gRPC Core as part of call creation.
+ method: The fully-qualified name of the RPC method being invoked.
+ host: A "host" string to be passed to gRPC Core as part of call creation.
+ deadline: A float for the deadline of the RPC, or None if the RPC is to have
+ no deadline.
+ credentials: A _CallCredentials for the RPC or None.
+ operationses_and_user_tags: A sequence of length-two sequences the first
+ element of which is a sequence of Operations and the second element of
+ which is an object to be used as a tag. A SendInitialMetadataOperation
+ must be present in the first element of this value.
+ metadata: The metadata for this call.
+ """
+ cdef grpc_slice method_slice
+ cdef grpc_slice host_slice
+ cdef grpc_slice *host_slice_ptr
+ cdef grpc_call_credentials *c_call_credentials
+ cdef grpc_call_error c_call_error
+ cdef tuple error_and_wrapper_tag
+ cdef _BatchOperationTag wrapper_tag
+ with channel_state.condition:
+ if channel_state.open:
+ method_slice = _slice_from_bytes(method)
+ if host is None:
+ host_slice_ptr = NULL
+ else:
+ host_slice = _slice_from_bytes(host)
+ host_slice_ptr = &host_slice
+ call_state.c_call = grpc_channel_create_call(
+ channel_state.c_channel, NULL, flags,
+ c_completion_queue, method_slice, host_slice_ptr,
+ _timespec_from_time(deadline), NULL)
+ grpc_slice_unref(method_slice)
+ if host_slice_ptr:
+ grpc_slice_unref(host_slice)
+ if credentials is not None:
+ c_call_credentials = credentials.c()
+ c_call_error = grpc_call_set_credentials(
+ call_state.c_call, c_call_credentials)
+ grpc_call_credentials_release(c_call_credentials)
+ if c_call_error != GRPC_CALL_OK:
+ grpc_call_unref(call_state.c_call)
+ call_state.c_call = NULL
+ _raise_call_error_no_metadata(c_call_error)
+ started_tags = set()
+ for operations, user_tag in operationses_and_user_tags:
+ c_call_error, tag = _operate(call_state.c_call, operations, user_tag)
+ if c_call_error == GRPC_CALL_OK:
+ started_tags.add(tag)
+ else:
+ grpc_call_cancel(call_state.c_call, NULL)
+ grpc_call_unref(call_state.c_call)
+ call_state.c_call = NULL
+ _raise_call_error(c_call_error, metadata)
+ else:
+ call_state.due.update(started_tags)
+ on_success(started_tags)
+ else:
+ raise ValueError('Cannot invoke RPC on closed channel!')
+
+cdef void _process_integrated_call_tag(
+ _ChannelState state, _BatchOperationTag tag) except *:
+ cdef _CallState call_state = state.integrated_call_states.pop(tag)
+ call_state.due.remove(tag)
+ if not call_state.due:
+ grpc_call_unref(call_state.c_call)
+ call_state.c_call = NULL
+
+
+cdef class IntegratedCall:
+
+ def __cinit__(self, _ChannelState channel_state, _CallState call_state):
+ self._channel_state = channel_state
+ self._call_state = call_state
+
+ def operate(self, operations, tag):
+ return _operate_from_integrated_call(
+ self._channel_state, self._call_state, operations, tag)
+
+ def cancel(self, code, details):
+ _cancel(self._channel_state, self._call_state, code, details)
+
+
+cdef IntegratedCall _integrated_call(
+ _ChannelState state, int flags, method, host, object deadline,
+ object metadata, CallCredentials credentials, operationses_and_user_tags):
+ call_state = _CallState()
+
+ def on_success(started_tags):
+ for started_tag in started_tags:
+ state.integrated_call_states[started_tag] = call_state
+
+ _call(
+ state, call_state, state.c_call_completion_queue, on_success, flags,
+ method, host, deadline, credentials, operationses_and_user_tags, metadata)
+
+ return IntegratedCall(state, call_state)
+
+
+cdef object _process_segregated_call_tag(
+ _ChannelState state, _CallState call_state,
+ grpc_completion_queue *c_completion_queue, _BatchOperationTag tag):
+ call_state.due.remove(tag)
+ if not call_state.due:
+ grpc_call_unref(call_state.c_call)
+ call_state.c_call = NULL
+ state.segregated_call_states.remove(call_state)
+ _destroy_c_completion_queue(c_completion_queue)
+ return True
+ else:
+ return False
+
+
+cdef class SegregatedCall:
+
+ def __cinit__(self, _ChannelState channel_state, _CallState call_state):
+ self._channel_state = channel_state
+ self._call_state = call_state
+
+ def operate(self, operations, tag):
+ return _operate_from_segregated_call(
+ self._channel_state, self._call_state, operations, tag)
+
+ def cancel(self, code, details):
+ _cancel(self._channel_state, self._call_state, code, details)
+
+ def next_event(self):
+ def on_success(tag):
+ _process_segregated_call_tag(
+ self._channel_state, self._call_state, self._c_completion_queue, tag)
+ return _next_call_event(
+ self._channel_state, self._c_completion_queue, on_success)
+
+
+cdef SegregatedCall _segregated_call(
+ _ChannelState state, int flags, method, host, object deadline,
+ object metadata, CallCredentials credentials, operationses_and_user_tags):
+ cdef _CallState call_state = _CallState()
+ cdef grpc_completion_queue *c_completion_queue = (
+ grpc_completion_queue_create_for_next(NULL))
+ cdef SegregatedCall segregated_call
+
+ def on_success(started_tags):
+ state.segregated_call_states.add(call_state)
+
+ try:
+ _call(
+ state, call_state, c_completion_queue, on_success, flags, method, host,
+ deadline, credentials, operationses_and_user_tags, metadata)
+ except:
+ _destroy_c_completion_queue(c_completion_queue)
+ raise
+
+ segregated_call = SegregatedCall(state, call_state)
+ segregated_call._c_completion_queue = c_completion_queue
+ return segregated_call
+
+
+cdef object _watch_connectivity_state(
+ _ChannelState state, grpc_connectivity_state last_observed_state,
+ object deadline):
+ cdef _ConnectivityTag tag = _ConnectivityTag(object())
+ with state.condition:
+ if state.open:
+ cpython.Py_INCREF(tag)
+ grpc_channel_watch_connectivity_state(
+ state.c_channel, last_observed_state, _timespec_from_time(deadline),
+ state.c_connectivity_completion_queue, <cpython.PyObject *>tag)
+ state.connectivity_due.add(tag)
+ else:
+ raise ValueError('Cannot invoke RPC on closed channel!')
+ completed_tag, event = _latent_event(
+ state.c_connectivity_completion_queue, None)
+ with state.condition:
+ state.connectivity_due.remove(completed_tag)
+ state.condition.notify_all()
+ return event
+
+
+cdef _close(_ChannelState state, grpc_status_code code, object details):
+ cdef _CallState call_state
+ encoded_details = _encode(details)
+ with state.condition:
+ if state.open:
+ state.open = False
+ for call_state in set(state.integrated_call_states.values()):
+ grpc_call_cancel_with_status(
+ call_state.c_call, code, encoded_details, NULL)
+ for call_state in state.segregated_call_states:
+ grpc_call_cancel_with_status(
+ call_state.c_call, code, encoded_details, NULL)
+ # TODO(https://github.com/grpc/grpc/issues/3064): Cancel connectivity
+ # watching.
+
+ while state.integrated_call_states:
+ state.condition.wait()
+ while state.segregated_call_states:
+ state.condition.wait()
+ while state.connectivity_due:
+ state.condition.wait()
+
+ _destroy_c_completion_queue(state.c_call_completion_queue)
+ _destroy_c_completion_queue(state.c_connectivity_completion_queue)
+ grpc_channel_destroy(state.c_channel)
+ state.c_channel = NULL
+ grpc_shutdown()
+ state.condition.notify_all()
+ else:
+ # Another call to close already completed in the past or is currently
+ # being executed in another thread.
+ while state.c_channel != NULL:
+ state.condition.wait()
+
cdef class Channel:
- def __cinit__(self, bytes target, object arguments,
- ChannelCredentials channel_credentials=None):
+ def __cinit__(
+ self, bytes target, object arguments,
+ ChannelCredentials channel_credentials):
grpc_init()
+ self._state = _ChannelState()
self._vtable.copy = &_copy_pointer
self._vtable.destroy = &_destroy_pointer
self._vtable.cmp = &_compare_pointer
cdef _ArgumentsProcessor arguments_processor = _ArgumentsProcessor(
arguments)
cdef grpc_channel_args *c_arguments = arguments_processor.c(&self._vtable)
- self.references = []
- c_target = target
if channel_credentials is None:
- self.c_channel = grpc_insecure_channel_create(c_target, c_arguments, NULL)
+ self._state.c_channel = grpc_insecure_channel_create(
+ <char *>target, c_arguments, NULL)
else:
c_channel_credentials = channel_credentials.c()
- self.c_channel = grpc_secure_channel_create(
- c_channel_credentials, c_target, c_arguments, NULL)
+ self._state.c_channel = grpc_secure_channel_create(
+ c_channel_credentials, <char *>target, c_arguments, NULL)
grpc_channel_credentials_release(c_channel_credentials)
- arguments_processor.un_c()
- self.references.append(target)
- self.references.append(arguments)
-
- def create_call(self, Call parent, int flags,
- CompletionQueue queue not None,
- method, host, object deadline):
- if queue.is_shutting_down:
- raise ValueError("queue must not be shutting down or shutdown")
- cdef grpc_slice method_slice = _slice_from_bytes(method)
- cdef grpc_slice host_slice
- cdef grpc_slice *host_slice_ptr = NULL
- if host is not None:
- host_slice = _slice_from_bytes(host)
- host_slice_ptr = &host_slice
- cdef Call operation_call = Call()
- operation_call.references = [self, queue]
- cdef grpc_call *parent_call = NULL
- if parent is not None:
- parent_call = parent.c_call
- operation_call.c_call = grpc_channel_create_call(
- self.c_channel, parent_call, flags,
- queue.c_completion_queue, method_slice, host_slice_ptr,
- _timespec_from_time(deadline), NULL)
- grpc_slice_unref(method_slice)
- if host_slice_ptr:
- grpc_slice_unref(host_slice)
- return operation_call
+ self._state.c_call_completion_queue = (
+ grpc_completion_queue_create_for_next(NULL))
+ self._state.c_connectivity_completion_queue = (
+ grpc_completion_queue_create_for_next(NULL))
+
+ def target(self):
+ cdef char *c_target
+ with self._state.condition:
+ c_target = grpc_channel_get_target(self._state.c_channel)
+ target = <bytes>c_target
+ gpr_free(c_target)
+ return target
+
+ def integrated_call(
+ self, int flags, method, host, object deadline, object metadata,
+ CallCredentials credentials, operationses_and_tags):
+ return _integrated_call(
+ self._state, flags, method, host, deadline, metadata, credentials,
+ operationses_and_tags)
+
+ def next_call_event(self):
+ def on_success(tag):
+ _process_integrated_call_tag(self._state, tag)
+ return _next_call_event(
+ self._state, self._state.c_call_completion_queue, on_success)
+
+ def segregated_call(
+ self, int flags, method, host, object deadline, object metadata,
+ CallCredentials credentials, operationses_and_tags):
+ return _segregated_call(
+ self._state, flags, method, host, deadline, metadata, credentials,
+ operationses_and_tags)
def check_connectivity_state(self, bint try_to_connect):
- cdef grpc_connectivity_state result
- with nogil:
- result = grpc_channel_check_connectivity_state(self.c_channel,
- try_to_connect)
- return result
+ with self._state.condition:
+ return grpc_channel_check_connectivity_state(
+ self._state.c_channel, try_to_connect)
def watch_connectivity_state(
- self, grpc_connectivity_state last_observed_state,
- object deadline, CompletionQueue queue not None, tag):
- cdef _ConnectivityTag connectivity_tag = _ConnectivityTag(tag)
- cpython.Py_INCREF(connectivity_tag)
- grpc_channel_watch_connectivity_state(
- self.c_channel, last_observed_state, _timespec_from_time(deadline),
- queue.c_completion_queue, <cpython.PyObject *>connectivity_tag)
+ self, grpc_connectivity_state last_observed_state, object deadline):
+ return _watch_connectivity_state(self._state, last_observed_state, deadline)
- def target(self):
- cdef char *target = NULL
- with nogil:
- target = grpc_channel_get_target(self.c_channel)
- result = <bytes>target
- with nogil:
- gpr_free(target)
- return result
-
- def __dealloc__(self):
- if self.c_channel != NULL:
- grpc_channel_destroy(self.c_channel)
- grpc_shutdown()
+ def close(self, code, details):
+ _close(self._state, code, details)
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd.pxi
index 5ea0287b81..9f06ce086e 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd.pxi
@@ -13,10 +13,16 @@
# limitations under the License.
+cdef grpc_event _next(grpc_completion_queue *c_completion_queue, deadline)
+
+
+cdef _interpret_event(grpc_event c_event)
+
+
cdef class CompletionQueue:
cdef grpc_completion_queue *c_completion_queue
cdef bint is_shutting_down
cdef bint is_shutdown
- cdef _interpret_event(self, grpc_event event)
+ cdef _interpret_event(self, grpc_event c_event)
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
index 40496d1124..a2d765546a 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
@@ -20,6 +20,53 @@ import time
cdef int _INTERRUPT_CHECK_PERIOD_MS = 200
+cdef grpc_event _next(grpc_completion_queue *c_completion_queue, deadline):
+ cdef gpr_timespec c_increment
+ cdef gpr_timespec c_timeout
+ cdef gpr_timespec c_deadline
+ c_increment = gpr_time_from_millis(_INTERRUPT_CHECK_PERIOD_MS, GPR_TIMESPAN)
+ if deadline is None:
+ c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME)
+ else:
+ c_deadline = _timespec_from_time(deadline)
+
+ with nogil:
+ while True:
+ c_timeout = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), c_increment)
+ if gpr_time_cmp(c_timeout, c_deadline) > 0:
+ c_timeout = c_deadline
+ c_event = grpc_completion_queue_next(c_completion_queue, c_timeout, NULL)
+ if (c_event.type != GRPC_QUEUE_TIMEOUT or
+ gpr_time_cmp(c_timeout, c_deadline) == 0):
+ break
+
+ # Handle any signals
+ with gil:
+ cpython.PyErr_CheckSignals()
+ return c_event
+
+
+cdef _interpret_event(grpc_event c_event):
+ cdef _Tag tag
+ if c_event.type == GRPC_QUEUE_TIMEOUT:
+ # NOTE(nathaniel): For now we coopt ConnectivityEvent here.
+ return None, ConnectivityEvent(GRPC_QUEUE_TIMEOUT, False, None)
+ elif c_event.type == GRPC_QUEUE_SHUTDOWN:
+ # NOTE(nathaniel): For now we coopt ConnectivityEvent here.
+ return None, ConnectivityEvent(GRPC_QUEUE_SHUTDOWN, False, None)
+ else:
+ tag = <_Tag>c_event.tag
+ # We receive event tags only after they've been inc-ref'd elsewhere in
+ # the code.
+ cpython.Py_DECREF(tag)
+ return tag, tag.event(c_event)
+
+
+cdef _latent_event(grpc_completion_queue *c_completion_queue, object deadline):
+ cdef grpc_event c_event = _next(c_completion_queue, deadline)
+ return _interpret_event(c_event)
+
+
cdef class CompletionQueue:
def __cinit__(self, shutdown_cq=False):
@@ -36,48 +83,16 @@ cdef class CompletionQueue:
self.is_shutting_down = False
self.is_shutdown = False
- cdef _interpret_event(self, grpc_event event):
- cdef _Tag tag = None
- if event.type == GRPC_QUEUE_TIMEOUT:
- # NOTE(nathaniel): For now we coopt ConnectivityEvent here.
- return ConnectivityEvent(GRPC_QUEUE_TIMEOUT, False, None)
- elif event.type == GRPC_QUEUE_SHUTDOWN:
+ cdef _interpret_event(self, grpc_event c_event):
+ unused_tag, event = _interpret_event(c_event)
+ if event.completion_type == GRPC_QUEUE_SHUTDOWN:
self.is_shutdown = True
- # NOTE(nathaniel): For now we coopt ConnectivityEvent here.
- return ConnectivityEvent(GRPC_QUEUE_TIMEOUT, True, None)
- else:
- tag = <_Tag>event.tag
- # We receive event tags only after they've been inc-ref'd elsewhere in
- # the code.
- cpython.Py_DECREF(tag)
- return tag.event(event)
+ return event
+ # We name this 'poll' to avoid problems with CPython's expectations for
+ # 'special' methods (like next and __next__).
def poll(self, deadline=None):
- # We name this 'poll' to avoid problems with CPython's expectations for
- # 'special' methods (like next and __next__).
- cdef gpr_timespec c_increment
- cdef gpr_timespec c_timeout
- cdef gpr_timespec c_deadline
- if deadline is None:
- c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME)
- else:
- c_deadline = _timespec_from_time(deadline)
- with nogil:
- c_increment = gpr_time_from_millis(_INTERRUPT_CHECK_PERIOD_MS, GPR_TIMESPAN)
-
- while True:
- c_timeout = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), c_increment)
- if gpr_time_cmp(c_timeout, c_deadline) > 0:
- c_timeout = c_deadline
- event = grpc_completion_queue_next(
- self.c_completion_queue, c_timeout, NULL)
- if event.type != GRPC_QUEUE_TIMEOUT or gpr_time_cmp(c_timeout, c_deadline) == 0:
- break;
-
- # Handle any signals
- with gil:
- cpython.PyErr_CheckSignals()
- return self._interpret_event(event)
+ return self._interpret_event(_next(self.c_completion_queue, deadline))
def shutdown(self):
with nogil:
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pxd b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pxd
new file mode 100644
index 0000000000..f5688d08cd
--- /dev/null
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pxd
@@ -0,0 +1,152 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# distutils: language=c++
+
+from libc.stdint cimport uint32_t
+
+cdef extern from "grpc/impl/codegen/slice.h":
+ struct grpc_slice_buffer:
+ int count
+
+cdef extern from "src/core/lib/iomgr/error.h":
+ struct grpc_error:
+ pass
+
+cdef extern from "src/core/lib/iomgr/gevent_util.h":
+ grpc_error* grpc_socket_error(char* error)
+ char* grpc_slice_buffer_start(grpc_slice_buffer* buffer, int i)
+ int grpc_slice_buffer_length(grpc_slice_buffer* buffer, int i)
+
+cdef extern from "src/core/lib/iomgr/sockaddr.h":
+ ctypedef struct grpc_sockaddr:
+ pass
+
+cdef extern from "src/core/lib/iomgr/resolve_address.h":
+ ctypedef struct grpc_resolved_addresses:
+ size_t naddrs
+ grpc_resolved_address* addrs
+
+ ctypedef struct grpc_resolved_address:
+ char[128] addr
+ size_t len
+
+cdef extern from "src/core/lib/iomgr/resolve_address_custom.h":
+ struct grpc_custom_resolver:
+ pass
+
+ struct grpc_custom_resolver_vtable:
+ grpc_error* (*resolve)(char* host, char* port, grpc_resolved_addresses** res);
+ void (*resolve_async)(grpc_custom_resolver* resolver, char* host, char* port);
+
+ void grpc_custom_resolve_callback(grpc_custom_resolver* resolver,
+ grpc_resolved_addresses* result,
+ grpc_error* error);
+
+cdef extern from "src/core/lib/iomgr/tcp_custom.h":
+ struct grpc_custom_socket:
+ void* impl
+ # We don't care about the rest of the fields
+ ctypedef void (*grpc_custom_connect_callback)(grpc_custom_socket* socket,
+ grpc_error* error)
+ ctypedef void (*grpc_custom_write_callback)(grpc_custom_socket* socket,
+ grpc_error* error)
+ ctypedef void (*grpc_custom_read_callback)(grpc_custom_socket* socket,
+ size_t nread, grpc_error* error)
+ ctypedef void (*grpc_custom_accept_callback)(grpc_custom_socket* socket,
+ grpc_custom_socket* client,
+ grpc_error* error)
+ ctypedef void (*grpc_custom_close_callback)(grpc_custom_socket* socket)
+
+ struct grpc_socket_vtable:
+ grpc_error* (*init)(grpc_custom_socket* socket, int domain);
+ void (*connect)(grpc_custom_socket* socket, const grpc_sockaddr* addr,
+ size_t len, grpc_custom_connect_callback cb);
+ void (*destroy)(grpc_custom_socket* socket);
+ void (*shutdown)(grpc_custom_socket* socket);
+ void (*close)(grpc_custom_socket* socket, grpc_custom_close_callback cb);
+ void (*write)(grpc_custom_socket* socket, grpc_slice_buffer* slices,
+ grpc_custom_write_callback cb);
+ void (*read)(grpc_custom_socket* socket, char* buffer, size_t length,
+ grpc_custom_read_callback cb);
+ grpc_error* (*getpeername)(grpc_custom_socket* socket,
+ const grpc_sockaddr* addr, int* len);
+ grpc_error* (*getsockname)(grpc_custom_socket* socket,
+ const grpc_sockaddr* addr, int* len);
+ grpc_error* (*bind)(grpc_custom_socket* socket, const grpc_sockaddr* addr,
+ size_t len, int flags);
+ grpc_error* (*listen)(grpc_custom_socket* socket);
+ void (*accept)(grpc_custom_socket* socket, grpc_custom_socket* client,
+ grpc_custom_accept_callback cb);
+
+cdef extern from "src/core/lib/iomgr/timer_custom.h":
+ struct grpc_custom_timer:
+ void* timer
+ int timeout_ms
+ # We don't care about the rest of the fields
+
+ struct grpc_custom_timer_vtable:
+ void (*start)(grpc_custom_timer* t);
+ void (*stop)(grpc_custom_timer* t);
+
+ void grpc_custom_timer_callback(grpc_custom_timer* t, grpc_error* error);
+
+cdef extern from "src/core/lib/iomgr/pollset_custom.h":
+ struct grpc_custom_poller_vtable:
+ void (*init)()
+ void (*poll)(size_t timeout_ms)
+ void (*kick)()
+ void (*shutdown)()
+
+cdef extern from "src/core/lib/iomgr/iomgr_custom.h":
+ void grpc_custom_iomgr_init(grpc_socket_vtable* socket,
+ grpc_custom_resolver_vtable* resolver,
+ grpc_custom_timer_vtable* timer,
+ grpc_custom_poller_vtable* poller);
+
+cdef extern from "src/core/lib/iomgr/sockaddr_utils.h":
+ int grpc_sockaddr_get_port(const grpc_resolved_address *addr);
+ int grpc_sockaddr_to_string(char **out, const grpc_resolved_address *addr,
+ int normalize);
+ void grpc_string_to_sockaddr(grpc_resolved_address *out, char* addr, int port);
+ int grpc_sockaddr_set_port(const grpc_resolved_address *resolved_addr,
+ int port)
+ const char* grpc_sockaddr_get_uri_scheme(const grpc_resolved_address* resolved_addr)
+
+
+cdef class TimerWrapper:
+
+ cdef grpc_custom_timer *c_timer
+ cdef object timer
+ cdef object event
+
+cdef class SocketWrapper:
+ cdef object sockopts
+ cdef object socket
+ cdef object closed
+ cdef grpc_custom_socket *c_socket
+ cdef char* c_buffer
+ cdef size_t len
+ cdef grpc_custom_socket *accepting_socket
+
+ cdef grpc_custom_connect_callback connect_cb
+ cdef grpc_custom_write_callback write_cb
+ cdef grpc_custom_read_callback read_cb
+ cdef grpc_custom_accept_callback accept_cb
+ cdef grpc_custom_close_callback close_cb
+
+
+cdef class ResolveWrapper:
+ cdef grpc_custom_resolver *c_resolver
+ cdef char* c_host
+ cdef char* c_port
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx
new file mode 100644
index 0000000000..31ef671aed
--- /dev/null
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx
@@ -0,0 +1,448 @@
+# Copyright 2018 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# distutils: language=c++
+
+cimport cpython
+from libc cimport string
+from libc.stdlib cimport malloc, free
+import errno
+gevent_g = None
+gevent_socket = None
+gevent_hub = None
+gevent_event = None
+g_event = None
+g_pool = None
+
+cdef grpc_error* grpc_error_none():
+ return <grpc_error*>0
+
+cdef grpc_error* socket_error(str syscall, str err):
+ error_str = "{} failed: {}".format(syscall, err)
+ error_bytes = str_to_bytes(error_str)
+ return grpc_socket_error(error_bytes)
+
+cdef resolved_addr_to_tuple(grpc_resolved_address* address):
+ cdef char* res_str
+ port = grpc_sockaddr_get_port(address)
+ str_len = grpc_sockaddr_to_string(&res_str, address, 0)
+ byte_str = _decode(<bytes>res_str[:str_len])
+ if byte_str.endswith(':' + str(port)):
+ byte_str = byte_str[:(0 - len(str(port)) - 1)]
+ byte_str = byte_str.lstrip('[')
+ byte_str = byte_str.rstrip(']')
+ byte_str = '{}'.format(byte_str)
+ return byte_str, port
+
+cdef sockaddr_to_tuple(const grpc_sockaddr* address, size_t length):
+ cdef grpc_resolved_address c_addr
+ string.memcpy(<void*>c_addr.addr, <void*> address, length)
+ c_addr.len = length
+ return resolved_addr_to_tuple(&c_addr)
+
+cdef sockaddr_is_ipv4(const grpc_sockaddr* address, size_t length):
+ cdef grpc_resolved_address c_addr
+ string.memcpy(<void*>c_addr.addr, <void*> address, length)
+ c_addr.len = length
+ return grpc_sockaddr_get_uri_scheme(&c_addr) == b'ipv4'
+
+cdef grpc_resolved_addresses* tuples_to_resolvaddr(tups):
+ cdef grpc_resolved_addresses* addresses
+ tups_set = set((tup[4][0], tup[4][1]) for tup in tups)
+ addresses = <grpc_resolved_addresses*> malloc(sizeof(grpc_resolved_addresses))
+ addresses.naddrs = len(tups_set)
+ addresses.addrs = <grpc_resolved_address*> malloc(sizeof(grpc_resolved_address) * len(tups_set))
+ i = 0
+ for tup in set(tups_set):
+ hostname = str_to_bytes(tup[0])
+ grpc_string_to_sockaddr(&addresses.addrs[i], hostname, tup[1])
+ i += 1
+ return addresses
+
+def _spawn_greenlet(*args):
+ greenlet = g_pool.spawn(*args)
+
+###############################
+### socket implementation ###
+###############################
+
+cdef class SocketWrapper:
+ def __cinit__(self):
+ self.sockopts = []
+ self.socket = None
+ self.c_socket = NULL
+ self.c_buffer = NULL
+ self.len = 0
+
+cdef grpc_error* socket_init(grpc_custom_socket* socket, int domain) with gil:
+ sw = SocketWrapper()
+ sw.c_socket = socket
+ sw.sockopts = []
+ cpython.Py_INCREF(sw)
+ # Python doesn't support AF_UNSPEC sockets, so we defer creation until
+ # bind/connect when we know what type of socket we need
+ sw.socket = None
+ sw.closed = False
+ sw.accepting_socket = NULL
+ socket.impl = <void*>sw
+ return grpc_error_none()
+
+cdef socket_connect_async_cython(SocketWrapper socket_wrapper, addr_tuple):
+ try:
+ socket_wrapper.socket.connect(addr_tuple)
+ socket_wrapper.connect_cb(<grpc_custom_socket*>socket_wrapper.c_socket,
+ grpc_error_none())
+ except IOError as io_error:
+ socket_wrapper.connect_cb(<grpc_custom_socket*>socket_wrapper.c_socket,
+ socket_error("connect", str(io_error)))
+ g_event.set()
+
+def socket_connect_async(socket_wrapper, addr_tuple):
+ socket_connect_async_cython(socket_wrapper, addr_tuple)
+
+cdef void socket_connect(grpc_custom_socket* socket, const grpc_sockaddr* addr,
+ size_t addr_len,
+ grpc_custom_connect_callback cb) with gil:
+ py_socket = None
+ socket_wrapper = <SocketWrapper>socket.impl
+ socket_wrapper.connect_cb = cb
+ addr_tuple = sockaddr_to_tuple(addr, addr_len)
+ if sockaddr_is_ipv4(addr, addr_len):
+ py_socket = gevent_socket.socket(gevent_socket.AF_INET)
+ else:
+ py_socket = gevent_socket.socket(gevent_socket.AF_INET6)
+ applysockopts(py_socket)
+ socket_wrapper.socket = py_socket
+ _spawn_greenlet(socket_connect_async, socket_wrapper, addr_tuple)
+
+cdef void socket_destroy(grpc_custom_socket* socket) with gil:
+ cpython.Py_DECREF(<SocketWrapper>socket.impl)
+
+cdef void socket_shutdown(grpc_custom_socket* socket) with gil:
+ try:
+ (<SocketWrapper>socket.impl).socket.shutdown(gevent_socket.SHUT_RDWR)
+ except IOError as io_error:
+ if io_error.errno != errno.ENOTCONN:
+ raise io_error
+
+cdef void socket_close(grpc_custom_socket* socket,
+ grpc_custom_close_callback cb) with gil:
+ socket_wrapper = (<SocketWrapper>socket.impl)
+ if socket_wrapper.socket is not None:
+ socket_wrapper.socket.close()
+ socket_wrapper.closed = True
+ socket_wrapper.close_cb = cb
+ # Delay the close callback until the accept() call has picked it up
+ if socket_wrapper.accepting_socket != NULL:
+ return
+ socket_wrapper.close_cb(socket)
+
+def socket_sendmsg(socket, write_bytes):
+ try:
+ return socket.sendmsg(write_bytes)
+ except AttributeError:
+ # sendmsg not available on all Pythons/Platforms
+ return socket.send(b''.join(write_bytes))
+
+cdef socket_write_async_cython(SocketWrapper socket_wrapper, write_bytes):
+ try:
+ while write_bytes:
+ sent_byte_count = socket_sendmsg(socket_wrapper.socket, write_bytes)
+ while sent_byte_count > 0:
+ if sent_byte_count < len(write_bytes[0]):
+ write_bytes[0] = write_bytes[0][sent_byte_count:]
+ sent_byte_count = 0
+ else:
+ sent_byte_count -= len(write_bytes[0])
+ write_bytes = write_bytes[1:]
+ socket_wrapper.write_cb(<grpc_custom_socket*>socket_wrapper.c_socket,
+ grpc_error_none())
+ except IOError as io_error:
+ socket_wrapper.write_cb(<grpc_custom_socket*>socket_wrapper.c_socket,
+ socket_error("send", str(io_error)))
+ g_event.set()
+
+def socket_write_async(socket_wrapper, write_bytes):
+ socket_write_async_cython(socket_wrapper, write_bytes)
+
+cdef void socket_write(grpc_custom_socket* socket, grpc_slice_buffer* buffer,
+ grpc_custom_write_callback cb) with gil:
+ cdef char* start
+ sw = <SocketWrapper>socket.impl
+ sw.write_cb = cb
+ write_bytes = []
+ for i in range(buffer.count):
+ start = grpc_slice_buffer_start(buffer, i)
+ length = grpc_slice_buffer_length(buffer, i)
+ write_bytes.append(<bytes>start[:length])
+ _spawn_greenlet(socket_write_async, <SocketWrapper>socket.impl, write_bytes)
+
+cdef socket_read_async_cython(SocketWrapper socket_wrapper):
+ cdef char* buff_char_arr
+ try:
+ buff_str = socket_wrapper.socket.recv(socket_wrapper.len)
+ buff_char_arr = buff_str
+ string.memcpy(<void*>socket_wrapper.c_buffer, buff_char_arr, len(buff_str))
+ socket_wrapper.read_cb(<grpc_custom_socket*>socket_wrapper.c_socket,
+ len(buff_str), grpc_error_none())
+ except IOError as io_error:
+ socket_wrapper.read_cb(<grpc_custom_socket*>socket_wrapper.c_socket,
+ -1, socket_error("recv", str(io_error)))
+ g_event.set()
+
+def socket_read_async(socket_wrapper):
+ socket_read_async_cython(socket_wrapper)
+
+cdef void socket_read(grpc_custom_socket* socket, char* buffer,
+ size_t length, grpc_custom_read_callback cb) with gil:
+ sw = <SocketWrapper>socket.impl
+ sw.read_cb = cb
+ sw.c_buffer = buffer
+ sw.len = length
+ _spawn_greenlet(socket_read_async, sw)
+
+cdef grpc_error* socket_getpeername(grpc_custom_socket* socket,
+ const grpc_sockaddr* addr,
+ int* length) with gil:
+ cdef char* src_buf
+ peer = (<SocketWrapper>socket.impl).socket.getpeername()
+
+ cdef grpc_resolved_address c_addr
+ hostname = str_to_bytes(peer[0])
+ grpc_string_to_sockaddr(&c_addr, hostname, peer[1])
+ string.memcpy(<void*>addr, <void*>c_addr.addr, c_addr.len)
+ length[0] = c_addr.len
+ return grpc_error_none()
+
+cdef grpc_error* socket_getsockname(grpc_custom_socket* socket,
+ const grpc_sockaddr* addr,
+ int* length) with gil:
+ cdef char* src_buf
+ cdef grpc_resolved_address c_addr
+ if (<SocketWrapper>socket.impl).socket is None:
+ peer = ('0.0.0.0', 0)
+ else:
+ peer = (<SocketWrapper>socket.impl).socket.getsockname()
+ hostname = str_to_bytes(peer[0])
+ grpc_string_to_sockaddr(&c_addr, hostname, peer[1])
+ string.memcpy(<void*>addr, <void*>c_addr.addr, c_addr.len)
+ length[0] = c_addr.len
+ return grpc_error_none()
+
+def applysockopts(s):
+ s.setsockopt(gevent_socket.SOL_SOCKET, gevent_socket.SO_REUSEADDR, 1)
+ s.setsockopt(gevent_socket.IPPROTO_TCP, gevent_socket.TCP_NODELAY, True)
+
+cdef grpc_error* socket_bind(grpc_custom_socket* socket,
+ const grpc_sockaddr* addr,
+ size_t len, int flags) with gil:
+ addr_tuple = sockaddr_to_tuple(addr, len)
+ try:
+ try:
+ py_socket = gevent_socket.socket(gevent_socket.AF_INET)
+ applysockopts(py_socket)
+ py_socket.bind(addr_tuple)
+ except gevent_socket.gaierror as e:
+ py_socket = gevent_socket.socket(gevent_socket.AF_INET6)
+ applysockopts(py_socket)
+ py_socket.bind(addr_tuple)
+ (<SocketWrapper>socket.impl).socket = py_socket
+ except IOError as io_error:
+ return socket_error("bind", str(io_error))
+ else:
+ return grpc_error_none()
+
+cdef grpc_error* socket_listen(grpc_custom_socket* socket) with gil:
+ (<SocketWrapper>socket.impl).socket.listen(50)
+ return grpc_error_none()
+
+cdef void accept_callback_cython(SocketWrapper s):
+ try:
+ conn, address = s.socket.accept()
+ sw = SocketWrapper()
+ sw.closed = False
+ sw.c_socket = s.accepting_socket
+ sw.sockopts = []
+ sw.socket = conn
+ sw.c_socket.impl = <void*>sw
+ sw.accepting_socket = NULL
+ cpython.Py_INCREF(sw)
+ s.accepting_socket = NULL
+ s.accept_cb(<grpc_custom_socket*>s.c_socket, sw.c_socket, grpc_error_none())
+ except IOError as io_error:
+ #TODO actual error
+ s.accepting_socket = NULL
+ s.accept_cb(<grpc_custom_socket*>s.c_socket, s.accepting_socket,
+ socket_error("accept", str(io_error)))
+ if s.closed:
+ s.close_cb(<grpc_custom_socket*>s.c_socket)
+ g_event.set()
+
+def socket_accept_async(s):
+ accept_callback_cython(s)
+
+cdef void socket_accept(grpc_custom_socket* socket, grpc_custom_socket* client,
+ grpc_custom_accept_callback cb) with gil:
+ sw = <SocketWrapper>socket.impl
+ sw.accepting_socket = client
+ sw.accept_cb = cb
+ _spawn_greenlet(socket_accept_async, sw)
+
+#####################################
+######Resolver implementation #######
+#####################################
+
+cdef class ResolveWrapper:
+ def __cinit__(self):
+ self.c_resolver = NULL
+ self.c_host = NULL
+ self.c_port = NULL
+
+cdef socket_resolve_async_cython(ResolveWrapper resolve_wrapper):
+ try:
+ res = gevent_socket.getaddrinfo(resolve_wrapper.c_host, resolve_wrapper.c_port)
+ grpc_custom_resolve_callback(<grpc_custom_resolver*>resolve_wrapper.c_resolver,
+ tuples_to_resolvaddr(res), grpc_error_none())
+ except IOError as io_error:
+ grpc_custom_resolve_callback(<grpc_custom_resolver*>resolve_wrapper.c_resolver,
+ <grpc_resolved_addresses*>0,
+ socket_error("getaddrinfo", str(io_error)))
+ g_event.set()
+
+def socket_resolve_async_python(resolve_wrapper):
+ socket_resolve_async_cython(resolve_wrapper)
+
+cdef void socket_resolve_async(grpc_custom_resolver* r, char* host, char* port) with gil:
+ rw = ResolveWrapper()
+ rw.c_resolver = r
+ rw.c_host = host
+ rw.c_port = port
+ _spawn_greenlet(socket_resolve_async_python, rw)
+
+cdef grpc_error* socket_resolve(char* host, char* port,
+ grpc_resolved_addresses** res) with gil:
+ try:
+ result = gevent_socket.getaddrinfo(host, port)
+ res[0] = tuples_to_resolvaddr(result)
+ return grpc_error_none()
+ except IOError as io_error:
+ return socket_error("getaddrinfo", str(io_error))
+
+###############################
+### timer implementation ######
+###############################
+
+cdef class TimerWrapper:
+ def __cinit__(self, deadline):
+ self.timer = gevent_hub.get_hub().loop.timer(deadline)
+ self.event = None
+
+ def start(self):
+ self.event = gevent_event.Event()
+ self.timer.start(self.on_finish)
+
+ def on_finish(self):
+ grpc_custom_timer_callback(self.c_timer, grpc_error_none())
+ self.timer.stop()
+ g_event.set()
+
+ def stop(self):
+ self.event.set()
+ self.timer.stop()
+
+cdef void timer_start(grpc_custom_timer* t) with gil:
+ timer = TimerWrapper(t.timeout_ms / 1000.0)
+ timer.c_timer = t
+ t.timer = <void*>timer
+ timer.start()
+
+cdef void timer_stop(grpc_custom_timer* t) with gil:
+ time_wrapper = <object>t.timer
+ time_wrapper.stop()
+
+###############################
+### pollset implementation ###
+###############################
+
+cdef void init_loop() with gil:
+ pass
+
+cdef void destroy_loop() with gil:
+ g_pool.join()
+
+cdef void kick_loop() with gil:
+ g_event.set()
+
+cdef void run_loop(size_t timeout_ms) with gil:
+ timeout = timeout_ms / 1000.0
+ if timeout_ms > 0:
+ g_event.wait(timeout)
+ g_event.clear()
+
+###############################
+### Initializer ###############
+###############################
+
+cdef grpc_socket_vtable gevent_socket_vtable
+cdef grpc_custom_resolver_vtable gevent_resolver_vtable
+cdef grpc_custom_timer_vtable gevent_timer_vtable
+cdef grpc_custom_poller_vtable gevent_pollset_vtable
+
+def init_grpc_gevent():
+ # Lazily import gevent
+ global gevent_socket
+ global gevent_g
+ global gevent_hub
+ global gevent_event
+ global g_event
+ global g_pool
+ import gevent
+ gevent_g = gevent
+ import gevent.socket
+ gevent_socket = gevent.socket
+ import gevent.hub
+ gevent_hub = gevent.hub
+ import gevent.event
+ gevent_event = gevent.event
+ import gevent.pool
+
+ g_event = gevent.event.Event()
+ g_pool = gevent.pool.Group()
+ gevent_resolver_vtable.resolve = socket_resolve
+ gevent_resolver_vtable.resolve_async = socket_resolve_async
+
+ gevent_socket_vtable.init = socket_init
+ gevent_socket_vtable.connect = socket_connect
+ gevent_socket_vtable.destroy = socket_destroy
+ gevent_socket_vtable.shutdown = socket_shutdown
+ gevent_socket_vtable.close = socket_close
+ gevent_socket_vtable.write = socket_write
+ gevent_socket_vtable.read = socket_read
+ gevent_socket_vtable.getpeername = socket_getpeername
+ gevent_socket_vtable.getsockname = socket_getsockname
+ gevent_socket_vtable.bind = socket_bind
+ gevent_socket_vtable.listen = socket_listen
+ gevent_socket_vtable.accept = socket_accept
+
+ gevent_timer_vtable.start = timer_start
+ gevent_timer_vtable.stop = timer_stop
+
+ gevent_pollset_vtable.init = init_loop
+ gevent_pollset_vtable.poll = run_loop
+ gevent_pollset_vtable.kick = kick_loop
+ gevent_pollset_vtable.shutdown = destroy_loop
+
+ grpc_custom_iomgr_init(&gevent_socket_vtable,
+ &gevent_resolver_vtable,
+ &gevent_timer_vtable,
+ &gevent_pollset_vtable)
diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pxd b/src/python/grpcio/grpc/_cython/cygrpc.pxd
index b6a794c6d7..c8ace7c3cc 100644
--- a/src/python/grpcio/grpc/_cython/cygrpc.pxd
+++ b/src/python/grpcio/grpc/_cython/cygrpc.pxd
@@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+# distutils: language=c++
include "_cygrpc/grpc.pxi"
@@ -27,3 +28,5 @@ include "_cygrpc/security.pxd.pxi"
include "_cygrpc/server.pxd.pxi"
include "_cygrpc/tag.pxd.pxi"
include "_cygrpc/time.pxd.pxi"
+
+include "_cygrpc/grpc_gevent.pxd"
diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx
index 2ee2e6b73e..f5f08fc983 100644
--- a/src/python/grpcio/grpc/_cython/cygrpc.pyx
+++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx
@@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+# distutils: language=c++
cimport cpython
@@ -35,6 +36,8 @@ include "_cygrpc/server.pyx.pxi"
include "_cygrpc/tag.pyx.pxi"
include "_cygrpc/time.pyx.pxi"
+include "_cygrpc/grpc_gevent.pyx"
+
#
# initialize gRPC
#
diff --git a/src/python/grpcio/grpc/_grpcio_metadata.py b/src/python/grpcio/grpc/_grpcio_metadata.py
index 4a69d859fc..ad53f60ad3 100644
--- a/src/python/grpcio/grpc/_grpcio_metadata.py
+++ b/src/python/grpcio/grpc/_grpcio_metadata.py
@@ -14,4 +14,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio/grpc/_grpcio_metadata.py.template`!!!
-__version__ = """1.11.0.dev0"""
+__version__ = """1.13.0.dev0"""
diff --git a/src/python/grpcio/grpc/_interceptor.py b/src/python/grpcio/grpc/_interceptor.py
index d029472c68..f465e35a9c 100644
--- a/src/python/grpcio/grpc/_interceptor.py
+++ b/src/python/grpcio/grpc/_interceptor.py
@@ -334,6 +334,19 @@ class _Channel(grpc.Channel):
else:
return thunk(method)
+ def _close(self):
+ self._channel.close()
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self._close()
+ return False
+
+ def close(self):
+ self._channel.close()
+
def intercept_channel(channel, *interceptors):
for interceptor in reversed(list(interceptors)):
diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py
index c988e0c87c..d849cadbee 100644
--- a/src/python/grpcio/grpc/_server.py
+++ b/src/python/grpcio/grpc/_server.py
@@ -780,14 +780,8 @@ def _start(state):
state.stage = _ServerStage.STARTED
_request_call(state)
- def cleanup_server(timeout):
- if timeout is None:
- _stop(state, _UNEXPECTED_EXIT_SERVER_GRACE).wait()
- else:
- _stop(state, timeout).wait()
-
- thread = _common.CleanupThread(
- cleanup_server, target=_serve, args=(state,))
+ thread = threading.Thread(target=_serve, args=(state,))
+ thread.daemon = True
thread.start()
diff --git a/src/python/grpcio/grpc/beta/_server_adaptations.py b/src/python/grpcio/grpc/beta/_server_adaptations.py
index 3c04fd7639..ccafec8951 100644
--- a/src/python/grpcio/grpc/beta/_server_adaptations.py
+++ b/src/python/grpcio/grpc/beta/_server_adaptations.py
@@ -168,11 +168,8 @@ def _run_request_pipe_thread(request_iterator, request_consumer,
return
request_consumer.terminate()
- def stop_request_pipe(timeout): # pylint: disable=unused-argument
- thread_joined.set()
-
- request_pipe_thread = _common.CleanupThread(
- stop_request_pipe, target=pipe_requests)
+ request_pipe_thread = threading.Thread(target=pipe_requests)
+ request_pipe_thread.daemon = True
request_pipe_thread.start()
diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/__init__.py b/src/python/grpcio/grpc/experimental/__init__.py
index 5fb4f3c3cf..dcec322b69 100644
--- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/__init__.py
+++ b/src/python/grpcio/grpc/experimental/__init__.py
@@ -1,4 +1,4 @@
-# Copyright 2015 gRPC authors.
+# Copyright 2018 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -11,3 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+"""gRPC's experimental APIs.
+
+These APIs are subject to be removed during any minor version release.
+"""
diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_3069_test_constant.py b/src/python/grpcio/grpc/experimental/gevent.py
index 6eb7ba33f6..159d612b4e 100644
--- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_3069_test_constant.py
+++ b/src/python/grpcio/grpc/experimental/gevent.py
@@ -1,4 +1,4 @@
-# Copyright 2015 gRPC authors.
+# Copyright 2018 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -11,11 +11,17 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-"""A test constant working around issue 3069."""
+"""gRPC's Python gEvent APIs."""
-# test_constants is referenced from specification in this module.
-from tests.unit.framework.common import test_constants # pylint: disable=unused-import
+from grpc._cython import cygrpc as _cygrpc
-# TODO(issue 3069): Replace uses of this constant with
-# test_constants.SHORT_TIMEOUT.
-REALLY_SHORT_TIMEOUT = 0.1
+
+def init_gevent():
+ """Patches gRPC's libraries to be compatible with gevent.
+
+ This must be called AFTER the python standard lib has been patched,
+ but BEFORE creating and gRPC objects.
+
+ In order for progress to be made, the application must drive the event loop.
+ """
+ _cygrpc.init_grpc_gevent()
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index c1654358a3..bf6c2534a8 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -15,6 +15,9 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio/grpc_core_dependencies.py.template`!!!
CORE_SOURCE_FILES = [
+ 'third_party/address_sorting/address_sorting.c',
+ 'third_party/address_sorting/address_sorting_posix.c',
+ 'third_party/address_sorting/address_sorting_windows.c',
'src/core/lib/gpr/alloc.cc',
'src/core/lib/gpr/arena.cc',
'src/core/lib/gpr/atm.cc',
@@ -60,10 +63,13 @@ CORE_SOURCE_FILES = [
'src/core/lib/channel/channel_args.cc',
'src/core/lib/channel/channel_stack.cc',
'src/core/lib/channel/channel_stack_builder.cc',
+ 'src/core/lib/channel/channel_trace.cc',
+ 'src/core/lib/channel/channel_trace_registry.cc',
'src/core/lib/channel/connected_channel.cc',
'src/core/lib/channel/handshaker.cc',
'src/core/lib/channel/handshaker_factory.cc',
'src/core/lib/channel/handshaker_registry.cc',
+ 'src/core/lib/channel/status_util.cc',
'src/core/lib/compression/compression.cc',
'src/core/lib/compression/compression_internal.cc',
'src/core/lib/compression/message_compress.cc',
@@ -97,6 +103,8 @@ CORE_SOURCE_FILES = [
'src/core/lib/iomgr/gethostname_sysconf.cc',
'src/core/lib/iomgr/iocp_windows.cc',
'src/core/lib/iomgr/iomgr.cc',
+ 'src/core/lib/iomgr/iomgr_custom.cc',
+ 'src/core/lib/iomgr/iomgr_internal.cc',
'src/core/lib/iomgr/iomgr_posix.cc',
'src/core/lib/iomgr/iomgr_uv.cc',
'src/core/lib/iomgr/iomgr_windows.cc',
@@ -105,12 +113,16 @@ CORE_SOURCE_FILES = [
'src/core/lib/iomgr/lockfree_event.cc',
'src/core/lib/iomgr/network_status_tracker.cc',
'src/core/lib/iomgr/polling_entity.cc',
- 'src/core/lib/iomgr/pollset_set_uv.cc',
+ 'src/core/lib/iomgr/pollset.cc',
+ 'src/core/lib/iomgr/pollset_custom.cc',
+ 'src/core/lib/iomgr/pollset_set.cc',
+ 'src/core/lib/iomgr/pollset_set_custom.cc',
'src/core/lib/iomgr/pollset_set_windows.cc',
'src/core/lib/iomgr/pollset_uv.cc',
'src/core/lib/iomgr/pollset_windows.cc',
+ 'src/core/lib/iomgr/resolve_address.cc',
+ 'src/core/lib/iomgr/resolve_address_custom.cc',
'src/core/lib/iomgr/resolve_address_posix.cc',
- 'src/core/lib/iomgr/resolve_address_uv.cc',
'src/core/lib/iomgr/resolve_address_windows.cc',
'src/core/lib/iomgr/resource_quota.cc',
'src/core/lib/iomgr/sockaddr_utils.cc',
@@ -122,19 +134,24 @@ CORE_SOURCE_FILES = [
'src/core/lib/iomgr/socket_utils_uv.cc',
'src/core/lib/iomgr/socket_utils_windows.cc',
'src/core/lib/iomgr/socket_windows.cc',
+ 'src/core/lib/iomgr/tcp_client.cc',
+ 'src/core/lib/iomgr/tcp_client_custom.cc',
'src/core/lib/iomgr/tcp_client_posix.cc',
- 'src/core/lib/iomgr/tcp_client_uv.cc',
'src/core/lib/iomgr/tcp_client_windows.cc',
+ 'src/core/lib/iomgr/tcp_custom.cc',
'src/core/lib/iomgr/tcp_posix.cc',
+ 'src/core/lib/iomgr/tcp_server.cc',
+ 'src/core/lib/iomgr/tcp_server_custom.cc',
'src/core/lib/iomgr/tcp_server_posix.cc',
'src/core/lib/iomgr/tcp_server_utils_posix_common.cc',
'src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.cc',
'src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.cc',
- 'src/core/lib/iomgr/tcp_server_uv.cc',
'src/core/lib/iomgr/tcp_server_windows.cc',
'src/core/lib/iomgr/tcp_uv.cc',
'src/core/lib/iomgr/tcp_windows.cc',
'src/core/lib/iomgr/time_averaged_stats.cc',
+ 'src/core/lib/iomgr/timer.cc',
+ 'src/core/lib/iomgr/timer_custom.cc',
'src/core/lib/iomgr/timer_generic.cc',
'src/core/lib/iomgr/timer_heap.cc',
'src/core/lib/iomgr/timer_manager.cc',
@@ -279,9 +296,9 @@ CORE_SOURCE_FILES = [
'third_party/nanopb/pb_decode.c',
'third_party/nanopb/pb_encode.c',
'src/core/tsi/transport_security.cc',
- 'src/core/tsi/transport_security_adapter.cc',
'src/core/ext/transport/chttp2/client/insecure/channel_create.cc',
'src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc',
+ 'src/core/ext/transport/chttp2/client/authority.cc',
'src/core/ext/transport/chttp2/client/chttp2_connector.cc',
'src/core/ext/filters/client_channel/backup_poller.cc',
'src/core/ext/filters/client_channel/channel_connectivity.cc',
@@ -301,13 +318,15 @@ CORE_SOURCE_FILES = [
'src/core/ext/filters/client_channel/resolver.cc',
'src/core/ext/filters/client_channel/resolver_registry.cc',
'src/core/ext/filters/client_channel/retry_throttle.cc',
- 'src/core/ext/filters/client_channel/status_util.cc',
'src/core/ext/filters/client_channel/subchannel.cc',
'src/core/ext/filters/client_channel/subchannel_index.cc',
'src/core/ext/filters/client_channel/uri_parser.cc',
'src/core/ext/filters/deadline/deadline_filter.cc',
'src/core/tsi/alts_transport_security.cc',
'src/core/tsi/fake_transport_security.cc',
+ 'src/core/tsi/ssl/session_cache/ssl_session_boringssl.cc',
+ 'src/core/tsi/ssl/session_cache/ssl_session_cache.cc',
+ 'src/core/tsi/ssl/session_cache/ssl_session_openssl.cc',
'src/core/tsi/ssl_transport_security.cc',
'src/core/tsi/transport_security_grpc.cc',
'src/core/ext/transport/chttp2/server/chttp2_server.cc',
@@ -324,7 +343,6 @@ CORE_SOURCE_FILES = [
'src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c',
'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc',
'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc',
- 'src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc',
'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc',
@@ -337,6 +355,7 @@ CORE_SOURCE_FILES = [
'src/core/ext/census/grpc_context.cc',
'src/core/ext/filters/max_age/max_age_filter.cc',
'src/core/ext/filters/message_size/message_size_filter.cc',
+ 'src/core/ext/filters/http/client_authority_filter.cc',
'src/core/ext/filters/workarounds/workaround_cronet_compression_filter.cc',
'src/core/ext/filters/workarounds/workaround_utils.cc',
'src/core/plugin_registry/grpc_plugin_registry.cc',
@@ -409,7 +428,6 @@ CORE_SOURCE_FILES = [
'third_party/boringssl/crypto/cpu-intel.c',
'third_party/boringssl/crypto/cpu-ppc64le.c',
'third_party/boringssl/crypto/crypto.c',
- 'third_party/boringssl/crypto/curve25519/curve25519.c',
'third_party/boringssl/crypto/curve25519/spake25519.c',
'third_party/boringssl/crypto/curve25519/x25519-x86_64.c',
'third_party/boringssl/crypto/dh/check.c',
@@ -595,6 +613,7 @@ CORE_SOURCE_FILES = [
'third_party/boringssl/ssl/tls13_server.cc',
'third_party/boringssl/ssl/tls_method.cc',
'third_party/boringssl/ssl/tls_record.cc',
+ 'third_party/boringssl/third_party/fiat/curve25519.c',
'third_party/zlib/adler32.c',
'third_party/zlib/compress.c',
'third_party/zlib/crc32.c',
diff --git a/src/python/grpcio/grpc_version.py b/src/python/grpcio/grpc_version.py
index 32e82493f3..57dc26dbeb 100644
--- a/src/python/grpcio/grpc_version.py
+++ b/src/python/grpcio/grpc_version.py
@@ -14,4 +14,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio/grpc_version.py.template`!!!
-VERSION = '1.11.0.dev0'
+VERSION = '1.13.0.dev0'
diff --git a/src/python/grpcio_health_checking/grpc_version.py b/src/python/grpcio_health_checking/grpc_version.py
index ad4c85cc12..ba0d4a3b6d 100644
--- a/src/python/grpcio_health_checking/grpc_version.py
+++ b/src/python/grpcio_health_checking/grpc_version.py
@@ -14,4 +14,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_health_checking/grpc_version.py.template`!!!
-VERSION = '1.11.0.dev0'
+VERSION = '1.13.0.dev0'
diff --git a/src/python/grpcio_health_checking/setup.py b/src/python/grpcio_health_checking/setup.py
index 60d309ec65..35c09827ba 100644
--- a/src/python/grpcio_health_checking/setup.py
+++ b/src/python/grpcio_health_checking/setup.py
@@ -57,7 +57,7 @@ PACKAGE_DIRECTORIES = {
}
INSTALL_REQUIRES = (
- 'protobuf>=3.5.0.post1',
+ 'protobuf>=3.5.2.post1',
'grpcio>={version}'.format(version=grpc_version.VERSION),
)
diff --git a/src/python/grpcio_reflection/README.rst b/src/python/grpcio_reflection/README.rst
new file mode 100644
index 0000000000..da99a44904
--- /dev/null
+++ b/src/python/grpcio_reflection/README.rst
@@ -0,0 +1,10 @@
+gRPC Python Reflection package
+==============================
+
+Reference package for reflection in GRPC Python.
+
+Dependencies
+------------
+
+Depends on the `grpcio` package, available from PyPI via `pip install grpcio`.
+
diff --git a/src/python/grpcio_reflection/grpc_version.py b/src/python/grpcio_reflection/grpc_version.py
index 6322d847b1..ea2878d9ee 100644
--- a/src/python/grpcio_reflection/grpc_version.py
+++ b/src/python/grpcio_reflection/grpc_version.py
@@ -14,4 +14,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_reflection/grpc_version.py.template`!!!
-VERSION = '1.11.0.dev0'
+VERSION = '1.13.0.dev0'
diff --git a/src/python/grpcio_reflection/setup.py b/src/python/grpcio_reflection/setup.py
index 10c4c38f19..589d0ff556 100644
--- a/src/python/grpcio_reflection/setup.py
+++ b/src/python/grpcio_reflection/setup.py
@@ -58,7 +58,7 @@ PACKAGE_DIRECTORIES = {
}
INSTALL_REQUIRES = (
- 'protobuf>=3.5.0.post1',
+ 'protobuf>=3.5.2.post1',
'grpcio>={version}'.format(version=grpc_version.VERSION),
)
diff --git a/src/python/grpcio_testing/grpc_testing/_channel/_channel.py b/src/python/grpcio_testing/grpc_testing/_channel/_channel.py
index b015b8d738..0c1941e6be 100644
--- a/src/python/grpcio_testing/grpc_testing/_channel/_channel.py
+++ b/src/python/grpcio_testing/grpc_testing/_channel/_channel.py
@@ -56,6 +56,21 @@ class TestingChannel(grpc_testing.Channel):
response_deserializer=None):
return _multi_callable.StreamStream(method, self._state)
+ def _close(self):
+ # TODO(https://github.com/grpc/grpc/issues/12531): Decide what
+ # action to take here, if any?
+ pass
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self._close()
+ return False
+
+ def close(self):
+ self._close()
+
def take_unary_unary(self, method_descriptor):
return _channel_rpc.unary_unary(self._state, method_descriptor)
diff --git a/src/python/grpcio_testing/grpc_version.py b/src/python/grpcio_testing/grpc_version.py
index 1e75fea12e..02f19f2283 100644
--- a/src/python/grpcio_testing/grpc_version.py
+++ b/src/python/grpcio_testing/grpc_version.py
@@ -14,4 +14,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_testing/grpc_version.py.template`!!!
-VERSION = '1.11.0.dev0'
+VERSION = '1.13.0.dev0'
diff --git a/src/python/grpcio_testing/setup.py b/src/python/grpcio_testing/setup.py
index 5a9d593ec1..eb480a5464 100644
--- a/src/python/grpcio_testing/setup.py
+++ b/src/python/grpcio_testing/setup.py
@@ -29,7 +29,7 @@ PACKAGE_DIRECTORIES = {
}
INSTALL_REQUIRES = (
- 'protobuf>=3.5.0.post1',
+ 'protobuf>=3.5.2.post1',
'grpcio>={version}'.format(version=grpc_version.VERSION),
)
diff --git a/src/python/grpcio_tests/commands.py b/src/python/grpcio_tests/commands.py
index 93f84572b7..42e01c18d3 100644
--- a/src/python/grpcio_tests/commands.py
+++ b/src/python/grpcio_tests/commands.py
@@ -108,6 +108,57 @@ class TestLite(setuptools.Command):
self.distribution.fetch_build_eggs(self.distribution.tests_require)
+class TestGevent(setuptools.Command):
+ """Command to run tests w/gevent."""
+
+ BANNED_TESTS = (
+ # These tests send a lot of RPCs and are really slow on gevent. They will
+ # eventually succeed, but need to dig into performance issues.
+ 'unit._cython._no_messages_server_completion_queue_per_call_test.Test.test_rpcs',
+ 'unit._cython._no_messages_single_server_completion_queue_test.Test.test_rpcs',
+ # I have no idea why this doesn't work in gevent, but it shouldn't even be
+ # using the c-core
+ 'testing._client_test.ClientTest.test_infinite_request_stream_real_time',
+ # TODO(https://github.com/grpc/grpc/issues/14789) enable this test
+ 'unit._server_ssl_cert_config_test',
+ # TODO(https://github.com/grpc/grpc/issues/14901) enable this test
+ 'protoc_plugin._python_plugin_test.PythonPluginTest',
+ # Beta API is unsupported for gevent
+ 'protoc_plugin.beta_python_plugin_test',
+ 'unit.beta._beta_features_test',
+ )
+ description = 'run tests with gevent. Assumes grpc/gevent are installed'
+ user_options = []
+
+ def initialize_options(self):
+ pass
+
+ def finalize_options(self):
+ # distutils requires this override.
+ pass
+
+ def run(self):
+ from gevent import monkey
+ monkey.patch_all()
+
+ import tests
+
+ import grpc.experimental.gevent
+ grpc.experimental.gevent.init_gevent()
+
+ import gevent
+
+ import tests
+ loader = tests.Loader()
+ loader.loadTestsFromNames(['tests'])
+ runner = tests.Runner()
+ runner.skip_tests(self.BANNED_TESTS)
+ result = gevent.spawn(runner.run, loader.suite)
+ result.join()
+ if not result.value.wasSuccessful():
+ sys.exit('Test failure')
+
+
class RunInterop(test.test):
description = 'run interop test client/server'
diff --git a/src/python/grpcio_tests/grpc_version.py b/src/python/grpcio_tests/grpc_version.py
index 0cd7bd257f..9d2e41644e 100644
--- a/src/python/grpcio_tests/grpc_version.py
+++ b/src/python/grpcio_tests/grpc_version.py
@@ -14,4 +14,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_tests/grpc_version.py.template`!!!
-VERSION = '1.11.0.dev0'
+VERSION = '1.13.0.dev0'
diff --git a/src/python/grpcio_tests/setup.py b/src/python/grpcio_tests/setup.py
index 250df65803..98ac19d188 100644
--- a/src/python/grpcio_tests/setup.py
+++ b/src/python/grpcio_tests/setup.py
@@ -41,7 +41,7 @@ INSTALL_REQUIRES = (
'grpcio>={version}'.format(version=grpc_version.VERSION),
'grpcio-tools>={version}'.format(version=grpc_version.VERSION),
'grpcio-health-checking>={version}'.format(version=grpc_version.VERSION),
- 'oauth2client>=1.4.7', 'protobuf>=3.5.0.post1', 'six>=1.10',
+ 'oauth2client>=1.4.7', 'protobuf>=3.5.2.post1', 'six>=1.10',
'google-auth>=1.0.0', 'requests>=2.14.2')
COMMAND_CLASS = {
@@ -50,7 +50,8 @@ COMMAND_CLASS = {
'build_package_protos': grpc_tools.command.BuildPackageProtos,
'build_py': commands.BuildPy,
'run_interop': commands.RunInterop,
- 'test_lite': commands.TestLite
+ 'test_lite': commands.TestLite,
+ 'test_gevent': commands.TestGevent,
}
PACKAGE_DATA = {
diff --git a/src/python/grpcio_tests/tests/_loader.py b/src/python/grpcio_tests/tests/_loader.py
index 31680916b4..be0af64646 100644
--- a/src/python/grpcio_tests/tests/_loader.py
+++ b/src/python/grpcio_tests/tests/_loader.py
@@ -54,7 +54,7 @@ class Loader(object):
for module in modules:
try:
package_paths = module.__path__
- except:
+ except AttributeError:
continue
self.walk_packages(package_paths)
coverage_context.stop()
diff --git a/src/python/grpcio_tests/tests/_result.py b/src/python/grpcio_tests/tests/_result.py
index 9907c4e1f9..b105f18e78 100644
--- a/src/python/grpcio_tests/tests/_result.py
+++ b/src/python/grpcio_tests/tests/_result.py
@@ -46,7 +46,7 @@ class CaseResult(
None.
"""
- class Kind:
+ class Kind(object):
UNTESTED = 'untested'
RUNNING = 'running'
ERROR = 'error'
@@ -257,7 +257,7 @@ class CoverageResult(AugmentedResult):
#coverage.Coverage().combine()
-class _Colors:
+class _Colors(object):
"""Namespaced constants for terminal color magic numbers."""
HEADER = '\033[95m'
INFO = '\033[94m'
diff --git a/src/python/grpcio_tests/tests/_runner.py b/src/python/grpcio_tests/tests/_runner.py
index 8e27dc6c6d..eaaa027e61 100644
--- a/src/python/grpcio_tests/tests/_runner.py
+++ b/src/python/grpcio_tests/tests/_runner.py
@@ -117,6 +117,12 @@ class AugmentedCase(collections.namedtuple('AugmentedCase', ['case', 'id'])):
class Runner(object):
+ def __init__(self):
+ self._skipped_tests = []
+
+ def skip_tests(self, tests):
+ self._skipped_tests = tests
+
def run(self, suite):
"""See setuptools' test_runner setup argument for information."""
# only run test cases with id starting with given prefix
@@ -181,27 +187,31 @@ class Runner(object):
# Run the tests
result.startTestRun()
for augmented_case in augmented_cases:
- sys.stdout.write('Running {}\n'.format(
- augmented_case.case.id()))
- sys.stdout.flush()
- case_thread = threading.Thread(
- target=augmented_case.case.run, args=(result,))
- try:
- with stdout_pipe, stderr_pipe:
- case_thread.start()
- while case_thread.is_alive():
- check_kill_self()
- time.sleep(0)
- case_thread.join()
- except:
- # re-raise the exception after forcing the with-block to end
- raise
- result.set_output(augmented_case.case, stdout_pipe.output(),
- stderr_pipe.output())
- sys.stdout.write(result_out.getvalue())
- sys.stdout.flush()
- result_out.truncate(0)
- check_kill_self()
+ for skipped_test in self._skipped_tests:
+ if skipped_test in augmented_case.case.id():
+ break
+ else:
+ sys.stdout.write('Running {}\n'.format(
+ augmented_case.case.id()))
+ sys.stdout.flush()
+ case_thread = threading.Thread(
+ target=augmented_case.case.run, args=(result,))
+ try:
+ with stdout_pipe, stderr_pipe:
+ case_thread.start()
+ while case_thread.is_alive():
+ check_kill_self()
+ time.sleep(0)
+ case_thread.join()
+ except:
+ # re-raise the exception after forcing the with-block to end
+ raise
+ result.set_output(augmented_case.case, stdout_pipe.output(),
+ stderr_pipe.output())
+ sys.stdout.write(result_out.getvalue())
+ sys.stdout.flush()
+ result_out.truncate(0)
+ check_kill_self()
result.stopTestRun()
stdout_pipe.close()
stderr_pipe.close()
diff --git a/src/python/grpcio_tests/tests/interop/_insecure_intraop_test.py b/src/python/grpcio_tests/tests/interop/_insecure_intraop_test.py
index 8d464b2d4b..ace15bea58 100644
--- a/src/python/grpcio_tests/tests/interop/_insecure_intraop_test.py
+++ b/src/python/grpcio_tests/tests/interop/_insecure_intraop_test.py
@@ -36,6 +36,9 @@ class InsecureIntraopTest(_intraop_test_case.IntraopTestCase,
self.stub = test_pb2_grpc.TestServiceStub(
grpc.insecure_channel('localhost:{}'.format(port)))
+ def tearDown(self):
+ self.server.stop(None)
+
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/src/python/grpcio_tests/tests/interop/_secure_intraop_test.py b/src/python/grpcio_tests/tests/interop/_secure_intraop_test.py
index c89135998d..e27e551ecb 100644
--- a/src/python/grpcio_tests/tests/interop/_secure_intraop_test.py
+++ b/src/python/grpcio_tests/tests/interop/_secure_intraop_test.py
@@ -45,6 +45,9 @@ class SecureIntraopTest(_intraop_test_case.IntraopTestCase, unittest.TestCase):
_SERVER_HOST_OVERRIDE,
),)))
+ def tearDown(self):
+ self.server.stop(None)
+
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/src/python/grpcio_tests/tests/interop/client.py b/src/python/grpcio_tests/tests/interop/client.py
index 3780ed9020..698c37017f 100644
--- a/src/python/grpcio_tests/tests/interop/client.py
+++ b/src/python/grpcio_tests/tests/interop/client.py
@@ -66,10 +66,6 @@ def _args():
return parser.parse_args()
-def _application_default_credentials():
- return oauth2client_client.GoogleCredentials.get_application_default()
-
-
def _stub(args):
target = '{}:{}'.format(args.server_host, args.server_port)
if args.test_case == 'oauth2_auth_token':
diff --git a/src/python/grpcio_tests/tests/protoc_plugin/_python_plugin_test.py b/src/python/grpcio_tests/tests/protoc_plugin/_python_plugin_test.py
index 6d85f43130..00e60b43ef 100644
--- a/src/python/grpcio_tests/tests/protoc_plugin/_python_plugin_test.py
+++ b/src/python/grpcio_tests/tests/protoc_plugin/_python_plugin_test.py
@@ -237,6 +237,7 @@ class PythonPluginTest(unittest.TestCase):
self.assertIsNotNone(service.servicer_methods)
self.assertIsNotNone(service.server)
self.assertIsNotNone(service.stub)
+ service.server.stop(None)
def testIncompleteServicer(self):
service = _CreateIncompleteService()
@@ -245,6 +246,7 @@ class PythonPluginTest(unittest.TestCase):
service.stub.UnaryCall(request)
self.assertIs(exception_context.exception.code(),
grpc.StatusCode.UNIMPLEMENTED)
+ service.server.stop(None)
def testUnaryCall(self):
service = _CreateService()
@@ -253,6 +255,7 @@ class PythonPluginTest(unittest.TestCase):
expected_response = service.servicer_methods.UnaryCall(
request, 'not a real context!')
self.assertEqual(expected_response, response)
+ service.server.stop(None)
def testUnaryCallFuture(self):
service = _CreateService()
@@ -264,6 +267,7 @@ class PythonPluginTest(unittest.TestCase):
expected_response = service.servicer_methods.UnaryCall(
request, 'not a real RpcContext!')
self.assertEqual(expected_response, response)
+ service.server.stop(None)
def testUnaryCallFutureExpired(self):
service = _CreateService()
@@ -276,6 +280,7 @@ class PythonPluginTest(unittest.TestCase):
self.assertIs(exception_context.exception.code(),
grpc.StatusCode.DEADLINE_EXCEEDED)
self.assertIs(response_future.code(), grpc.StatusCode.DEADLINE_EXCEEDED)
+ service.server.stop(None)
def testUnaryCallFutureCancelled(self):
service = _CreateService()
@@ -285,6 +290,7 @@ class PythonPluginTest(unittest.TestCase):
response_future.cancel()
self.assertTrue(response_future.cancelled())
self.assertIs(response_future.code(), grpc.StatusCode.CANCELLED)
+ service.server.stop(None)
def testUnaryCallFutureFailed(self):
service = _CreateService()
@@ -293,6 +299,7 @@ class PythonPluginTest(unittest.TestCase):
response_future = service.stub.UnaryCall.future(request)
self.assertIsNotNone(response_future.exception())
self.assertIs(response_future.code(), grpc.StatusCode.UNKNOWN)
+ service.server.stop(None)
def testStreamingOutputCall(self):
service = _CreateService()
@@ -303,6 +310,7 @@ class PythonPluginTest(unittest.TestCase):
for expected_response, response in moves.zip_longest(
expected_responses, responses):
self.assertEqual(expected_response, response)
+ service.server.stop(None)
def testStreamingOutputCallExpired(self):
service = _CreateService()
@@ -314,6 +322,7 @@ class PythonPluginTest(unittest.TestCase):
list(responses)
self.assertIs(exception_context.exception.code(),
grpc.StatusCode.DEADLINE_EXCEEDED)
+ service.server.stop(None)
def testStreamingOutputCallCancelled(self):
service = _CreateService()
@@ -324,6 +333,7 @@ class PythonPluginTest(unittest.TestCase):
with self.assertRaises(grpc.RpcError) as exception_context:
next(responses)
self.assertIs(responses.code(), grpc.StatusCode.CANCELLED)
+ service.server.stop(None)
def testStreamingOutputCallFailed(self):
service = _CreateService()
@@ -335,6 +345,7 @@ class PythonPluginTest(unittest.TestCase):
next(responses)
self.assertIs(exception_context.exception.code(),
grpc.StatusCode.UNKNOWN)
+ service.server.stop(None)
def testStreamingInputCall(self):
service = _CreateService()
@@ -343,6 +354,7 @@ class PythonPluginTest(unittest.TestCase):
expected_response = service.servicer_methods.StreamingInputCall(
_streaming_input_request_iterator(), 'not a real RpcContext!')
self.assertEqual(expected_response, response)
+ service.server.stop(None)
def testStreamingInputCallFuture(self):
service = _CreateService()
@@ -353,6 +365,7 @@ class PythonPluginTest(unittest.TestCase):
expected_response = service.servicer_methods.StreamingInputCall(
_streaming_input_request_iterator(), 'not a real RpcContext!')
self.assertEqual(expected_response, response)
+ service.server.stop(None)
def testStreamingInputCallFutureExpired(self):
service = _CreateService()
@@ -367,6 +380,7 @@ class PythonPluginTest(unittest.TestCase):
grpc.StatusCode.DEADLINE_EXCEEDED)
self.assertIs(exception_context.exception.code(),
grpc.StatusCode.DEADLINE_EXCEEDED)
+ service.server.stop(None)
def testStreamingInputCallFutureCancelled(self):
service = _CreateService()
@@ -377,6 +391,7 @@ class PythonPluginTest(unittest.TestCase):
self.assertTrue(response_future.cancelled())
with self.assertRaises(grpc.FutureCancelledError):
response_future.result()
+ service.server.stop(None)
def testStreamingInputCallFutureFailed(self):
service = _CreateService()
@@ -385,6 +400,7 @@ class PythonPluginTest(unittest.TestCase):
_streaming_input_request_iterator())
self.assertIsNotNone(response_future.exception())
self.assertIs(response_future.code(), grpc.StatusCode.UNKNOWN)
+ service.server.stop(None)
def testFullDuplexCall(self):
service = _CreateService()
@@ -394,6 +410,7 @@ class PythonPluginTest(unittest.TestCase):
for expected_response, response in moves.zip_longest(
expected_responses, responses):
self.assertEqual(expected_response, response)
+ service.server.stop(None)
def testFullDuplexCallExpired(self):
request_iterator = _full_duplex_request_iterator()
@@ -405,6 +422,7 @@ class PythonPluginTest(unittest.TestCase):
list(responses)
self.assertIs(exception_context.exception.code(),
grpc.StatusCode.DEADLINE_EXCEEDED)
+ service.server.stop(None)
def testFullDuplexCallCancelled(self):
service = _CreateService()
@@ -416,6 +434,7 @@ class PythonPluginTest(unittest.TestCase):
next(responses)
self.assertIs(exception_context.exception.code(),
grpc.StatusCode.CANCELLED)
+ service.server.stop(None)
def testFullDuplexCallFailed(self):
request_iterator = _full_duplex_request_iterator()
@@ -426,6 +445,7 @@ class PythonPluginTest(unittest.TestCase):
next(responses)
self.assertIs(exception_context.exception.code(),
grpc.StatusCode.UNKNOWN)
+ service.server.stop(None)
def testHalfDuplexCall(self):
service = _CreateService()
@@ -445,6 +465,7 @@ class PythonPluginTest(unittest.TestCase):
for expected_response, response in moves.zip_longest(
expected_responses, responses):
self.assertEqual(expected_response, response)
+ service.server.stop(None)
def testHalfDuplexCallWedged(self):
condition = threading.Condition()
@@ -478,6 +499,7 @@ class PythonPluginTest(unittest.TestCase):
next(responses)
self.assertIs(exception_context.exception.code(),
grpc.StatusCode.DEADLINE_EXCEEDED)
+ service.server.stop(None)
if __name__ == '__main__':
diff --git a/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py b/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py
index ab33775ad3..e21ea0010a 100644
--- a/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py
+++ b/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py
@@ -271,6 +271,7 @@ class _Test(six.with_metaclass(abc.ABCMeta, unittest.TestCase)):
stub = services_module.TestServiceStub(channel)
response = stub.Call(self._messages_pb2.Request())
self.assertEqual(self._messages_pb2.Response(), response)
+ server.stop(None)
def _create_test_case_class(split_proto, protoc_style):
diff --git a/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py b/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py
index ad0ecf0079..b46e53315e 100644
--- a/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py
+++ b/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py
@@ -329,9 +329,7 @@ class PythonPluginTest(unittest.TestCase):
_packagify(self._python_out)
- with _system_path([
- self._python_out,
- ]):
+ with _system_path([self._python_out]):
self._payload_pb2 = importlib.import_module(_PAYLOAD_PB2)
self._requests_pb2 = importlib.import_module(_REQUESTS_PB2)
self._responses_pb2 = importlib.import_module(_RESPONSES_PB2)
diff --git a/src/python/grpcio_tests/tests/qps/benchmark_client.py b/src/python/grpcio_tests/tests/qps/benchmark_client.py
index e6392a8b8c..0488450740 100644
--- a/src/python/grpcio_tests/tests/qps/benchmark_client.py
+++ b/src/python/grpcio_tests/tests/qps/benchmark_client.py
@@ -22,7 +22,7 @@ from six.moves import queue
import grpc
from src.proto.grpc.testing import messages_pb2
-from src.proto.grpc.testing import services_pb2_grpc
+from src.proto.grpc.testing import benchmark_service_pb2_grpc
from tests.unit import resources
from tests.unit import test_common
@@ -58,7 +58,8 @@ class BenchmarkClient:
if config.payload_config.WhichOneof('payload') == 'simple_params':
self._generic = False
- self._stub = services_pb2_grpc.BenchmarkServiceStub(channel)
+ self._stub = benchmark_service_pb2_grpc.BenchmarkServiceStub(
+ channel)
payload = messages_pb2.Payload(
body='\0' * config.payload_config.simple_params.req_size)
self._request = messages_pb2.SimpleRequest(
diff --git a/src/python/grpcio_tests/tests/qps/benchmark_server.py b/src/python/grpcio_tests/tests/qps/benchmark_server.py
index bb07844491..2bd89cbbdf 100644
--- a/src/python/grpcio_tests/tests/qps/benchmark_server.py
+++ b/src/python/grpcio_tests/tests/qps/benchmark_server.py
@@ -13,10 +13,10 @@
# limitations under the License.
from src.proto.grpc.testing import messages_pb2
-from src.proto.grpc.testing import services_pb2_grpc
+from src.proto.grpc.testing import benchmark_service_pb2_grpc
-class BenchmarkServer(services_pb2_grpc.BenchmarkServiceServicer):
+class BenchmarkServer(benchmark_service_pb2_grpc.BenchmarkServiceServicer):
"""Synchronous Server implementation for the Benchmark service."""
def UnaryCall(self, request, context):
@@ -29,7 +29,8 @@ class BenchmarkServer(services_pb2_grpc.BenchmarkServiceServicer):
yield messages_pb2.SimpleResponse(payload=payload)
-class GenericBenchmarkServer(services_pb2_grpc.BenchmarkServiceServicer):
+class GenericBenchmarkServer(
+ benchmark_service_pb2_grpc.BenchmarkServiceServicer):
"""Generic Server implementation for the Benchmark service."""
def __init__(self, resp_size):
diff --git a/src/python/grpcio_tests/tests/qps/qps_worker.py b/src/python/grpcio_tests/tests/qps/qps_worker.py
index 54f69db109..c33d013882 100644
--- a/src/python/grpcio_tests/tests/qps/qps_worker.py
+++ b/src/python/grpcio_tests/tests/qps/qps_worker.py
@@ -17,7 +17,7 @@ import argparse
import time
import grpc
-from src.proto.grpc.testing import services_pb2_grpc
+from src.proto.grpc.testing import worker_service_pb2_grpc
from tests.qps import worker_server
from tests.unit import test_common
@@ -26,7 +26,8 @@ from tests.unit import test_common
def run_worker_server(port):
server = test_common.test_server()
servicer = worker_server.WorkerServer()
- services_pb2_grpc.add_WorkerServiceServicer_to_server(servicer, server)
+ worker_service_pb2_grpc.add_WorkerServiceServicer_to_server(
+ servicer, server)
server.add_insecure_port('[::]:{}'.format(port))
server.start()
servicer.wait_for_quit()
diff --git a/src/python/grpcio_tests/tests/qps/worker_server.py b/src/python/grpcio_tests/tests/qps/worker_server.py
index 41e2403c8f..db145fbf64 100644
--- a/src/python/grpcio_tests/tests/qps/worker_server.py
+++ b/src/python/grpcio_tests/tests/qps/worker_server.py
@@ -20,7 +20,7 @@ import time
from concurrent import futures
import grpc
from src.proto.grpc.testing import control_pb2
-from src.proto.grpc.testing import services_pb2_grpc
+from src.proto.grpc.testing import worker_service_pb2_grpc
from src.proto.grpc.testing import stats_pb2
from tests.qps import benchmark_client
@@ -31,7 +31,7 @@ from tests.unit import resources
from tests.unit import test_common
-class WorkerServer(services_pb2_grpc.WorkerServiceServicer):
+class WorkerServer(worker_service_pb2_grpc.WorkerServiceServicer):
"""Python Worker Server implementation."""
def __init__(self):
@@ -72,7 +72,7 @@ class WorkerServer(services_pb2_grpc.WorkerServiceServicer):
server = test_common.test_server(max_workers=server_threads)
if config.server_type == control_pb2.ASYNC_SERVER:
servicer = benchmark_server.BenchmarkServer()
- services_pb2_grpc.add_BenchmarkServiceServicer_to_server(
+ worker_service_pb2_grpc.add_BenchmarkServiceServicer_to_server(
servicer, server)
elif config.server_type == control_pb2.ASYNC_GENERIC_SERVER:
resp_size = config.payload_config.bytebuf_params.resp_size
diff --git a/src/python/grpcio_tests/tests/stress/test_runner.py b/src/python/grpcio_tests/tests/stress/test_runner.py
index d5038e3ba2..764cda17fb 100644
--- a/src/python/grpcio_tests/tests/stress/test_runner.py
+++ b/src/python/grpcio_tests/tests/stress/test_runner.py
@@ -50,7 +50,7 @@ class TestRunner(threading.Thread):
test_case.test_interoperability(self._stub, None)
end_time = time.time()
self._histogram.add((end_time - start_time) * 1e9)
- except Exception as e:
+ except Exception as e: # pylint: disable=broad-except
traceback.print_exc()
self._exception_queue.put(
Exception("An exception occured during test {}"
diff --git a/src/python/grpcio_tests/tests/testing/_client_application.py b/src/python/grpcio_tests/tests/testing/_client_application.py
index 7d0d74c8c4..3ddeba2373 100644
--- a/src/python/grpcio_tests/tests/testing/_client_application.py
+++ b/src/python/grpcio_tests/tests/testing/_client_application.py
@@ -215,30 +215,6 @@ def _run_infinite_request_stream(stub):
return _UNSATISFACTORY_OUTCOME
-def run(scenario, channel):
- stub = services_pb2_grpc.FirstServiceStub(channel)
- try:
- if scenario is Scenario.UNARY_UNARY:
- return _run_unary_unary(stub)
- elif scenario is Scenario.UNARY_STREAM:
- return _run_unary_stream(stub)
- elif scenario is Scenario.STREAM_UNARY:
- return _run_stream_unary(stub)
- elif scenario is Scenario.STREAM_STREAM:
- return _run_stream_stream(stub)
- elif scenario is Scenario.CONCURRENT_STREAM_UNARY:
- return _run_concurrent_stream_unary(stub)
- elif scenario is Scenario.CONCURRENT_STREAM_STREAM:
- return _run_concurrent_stream_stream(stub)
- elif scenario is Scenario.CANCEL_UNARY_UNARY:
- return _run_cancel_unary_unary(stub)
- elif scenario is Scenario.INFINITE_REQUEST_STREAM:
- return _run_infinite_request_stream(stub)
- except grpc.RpcError as rpc_error:
- return Outcome(Outcome.Kind.RPC_ERROR, rpc_error.code(),
- rpc_error.details())
-
-
_IMPLEMENTATIONS = {
Scenario.UNARY_UNARY: _run_unary_unary,
Scenario.UNARY_STREAM: _run_unary_stream,
diff --git a/src/python/grpcio_tests/tests/testing/_server_application.py b/src/python/grpcio_tests/tests/testing/_server_application.py
index 02769ca68d..243c385daf 100644
--- a/src/python/grpcio_tests/tests/testing/_server_application.py
+++ b/src/python/grpcio_tests/tests/testing/_server_application.py
@@ -38,7 +38,7 @@ class FirstServiceServicer(services_pb2_grpc.FirstServiceServicer):
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
context.set_details('Something is wrong with your request!')
return
- yield services_pb2.Strange()
+ yield services_pb2.Strange() # pylint: disable=unreachable
def StreUn(self, request_iterator, context):
context.send_initial_metadata(((
diff --git a/src/python/grpcio_tests/tests/testing/_server_test.py b/src/python/grpcio_tests/tests/testing/_server_test.py
index 4f4abd7708..88e3a79ae5 100644
--- a/src/python/grpcio_tests/tests/testing/_server_test.py
+++ b/src/python/grpcio_tests/tests/testing/_server_test.py
@@ -21,13 +21,8 @@ import grpc_testing
from tests.testing import _application_common
from tests.testing import _application_testing_common
from tests.testing import _server_application
-from tests.testing.proto import services_pb2
-# TODO(https://github.com/google/protobuf/issues/3452): Drop this skip.
-@unittest.skipIf(
- services_pb2.DESCRIPTOR.services_by_name.get('FirstService') is None,
- 'Fix protobuf issue 3452!')
class FirstServiceServicerTest(unittest.TestCase):
def setUp(self):
diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json
index e033c1063f..0d94426413 100644
--- a/src/python/grpcio_tests/tests/tests.json
+++ b/src/python/grpcio_tests/tests/tests.json
@@ -25,6 +25,7 @@
"unit._auth_test.AccessTokenAuthMetadataPluginTest",
"unit._auth_test.GoogleCallCredentialsTest",
"unit._channel_args_test.ChannelArgsTest",
+ "unit._channel_close_test.ChannelCloseTest",
"unit._channel_connectivity_test.ChannelConnectivityTest",
"unit._channel_ready_future_test.ChannelReadyFutureTest",
"unit._compression_test.CompressionTest",
@@ -52,16 +53,9 @@
"unit._server_ssl_cert_config_test.ServerSSLCertReloadTestCertConfigReuse",
"unit._server_ssl_cert_config_test.ServerSSLCertReloadTestWithClientAuth",
"unit._server_ssl_cert_config_test.ServerSSLCertReloadTestWithoutClientAuth",
- "unit._thread_cleanup_test.CleanupThreadTest",
"unit.beta._beta_features_test.BetaFeaturesTest",
"unit.beta._beta_features_test.ContextManagementAndLifecycleTest",
"unit.beta._connectivity_channel_test.ConnectivityStatesTest",
- "unit.beta._face_interface_test.DynamicInvokerBlockingInvocationInlineServiceTest",
- "unit.beta._face_interface_test.DynamicInvokerFutureInvocationAsynchronousEventServiceTest",
- "unit.beta._face_interface_test.GenericInvokerBlockingInvocationInlineServiceTest",
- "unit.beta._face_interface_test.GenericInvokerFutureInvocationAsynchronousEventServiceTest",
- "unit.beta._face_interface_test.MultiCallableInvokerBlockingInvocationInlineServiceTest",
- "unit.beta._face_interface_test.MultiCallableInvokerFutureInvocationAsynchronousEventServiceTest",
"unit.beta._implementations_test.CallCredentialsTest",
"unit.beta._implementations_test.ChannelCredentialsTest",
"unit.beta._not_found_test.NotFoundTest",
diff --git a/src/python/grpcio_tests/tests/unit/_auth_context_test.py b/src/python/grpcio_tests/tests/unit/_auth_context_test.py
index 468869a03e..8c1a30e032 100644
--- a/src/python/grpcio_tests/tests/unit/_auth_context_test.py
+++ b/src/python/grpcio_tests/tests/unit/_auth_context_test.py
@@ -102,7 +102,8 @@ class AuthContextTest(unittest.TestCase):
self.assertIsNone(auth_data[_ID])
self.assertIsNone(auth_data[_ID_KEY])
self.assertDictEqual({
- 'transport_security_type': [b'ssl']
+ 'transport_security_type': [b'ssl'],
+ 'ssl_session_reused': [b'false'],
}, auth_data[_AUTH_CTX])
def testSecureClientCert(self):
diff --git a/src/python/grpcio_tests/tests/unit/_channel_close_test.py b/src/python/grpcio_tests/tests/unit/_channel_close_test.py
new file mode 100644
index 0000000000..af3a9ee1ee
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/_channel_close_test.py
@@ -0,0 +1,185 @@
+# Copyright 2018 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Tests server and client side compression."""
+
+import threading
+import time
+import unittest
+
+import grpc
+
+from tests.unit import test_common
+from tests.unit.framework.common import test_constants
+
+_BEAT = 0.5
+_SOME_TIME = 5
+_MORE_TIME = 10
+
+
+class _MethodHandler(grpc.RpcMethodHandler):
+
+ request_streaming = True
+ response_streaming = True
+ request_deserializer = None
+ response_serializer = None
+
+ def stream_stream(self, request_iterator, servicer_context):
+ for request in request_iterator:
+ yield request * 2
+
+
+_METHOD_HANDLER = _MethodHandler()
+
+
+class _GenericHandler(grpc.GenericRpcHandler):
+
+ def service(self, handler_call_details):
+ return _METHOD_HANDLER
+
+
+_GENERIC_HANDLER = _GenericHandler()
+
+
+class _Pipe(object):
+
+ def __init__(self, values):
+ self._condition = threading.Condition()
+ self._values = list(values)
+ self._open = True
+
+ def __iter__(self):
+ return self
+
+ def _next(self):
+ with self._condition:
+ while not self._values and self._open:
+ self._condition.wait()
+ if self._values:
+ return self._values.pop(0)
+ else:
+ raise StopIteration()
+
+ def next(self):
+ return self._next()
+
+ def __next__(self):
+ return self._next()
+
+ def add(self, value):
+ with self._condition:
+ self._values.append(value)
+ self._condition.notify()
+
+ def close(self):
+ with self._condition:
+ self._open = False
+ self._condition.notify()
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, type, value, traceback):
+ self.close()
+
+
+class ChannelCloseTest(unittest.TestCase):
+
+ def setUp(self):
+ self._server = test_common.test_server(
+ max_workers=test_constants.THREAD_CONCURRENCY)
+ self._server.add_generic_rpc_handlers((_GENERIC_HANDLER,))
+ self._port = self._server.add_insecure_port('[::]:0')
+ self._server.start()
+
+ def tearDown(self):
+ self._server.stop(None)
+
+ def test_close_immediately_after_call_invocation(self):
+ channel = grpc.insecure_channel('localhost:{}'.format(self._port))
+ multi_callable = channel.stream_stream('Meffod')
+ request_iterator = _Pipe(())
+ response_iterator = multi_callable(request_iterator)
+ channel.close()
+ request_iterator.close()
+
+ self.assertIs(response_iterator.code(), grpc.StatusCode.CANCELLED)
+
+ def test_close_while_call_active(self):
+ channel = grpc.insecure_channel('localhost:{}'.format(self._port))
+ multi_callable = channel.stream_stream('Meffod')
+ request_iterator = _Pipe((b'abc',))
+ response_iterator = multi_callable(request_iterator)
+ next(response_iterator)
+ channel.close()
+ request_iterator.close()
+
+ self.assertIs(response_iterator.code(), grpc.StatusCode.CANCELLED)
+
+ def test_context_manager_close_while_call_active(self):
+ with grpc.insecure_channel('localhost:{}'.format(
+ self._port)) as channel: # pylint: disable=bad-continuation
+ multi_callable = channel.stream_stream('Meffod')
+ request_iterator = _Pipe((b'abc',))
+ response_iterator = multi_callable(request_iterator)
+ next(response_iterator)
+ request_iterator.close()
+
+ self.assertIs(response_iterator.code(), grpc.StatusCode.CANCELLED)
+
+ def test_context_manager_close_while_many_calls_active(self):
+ with grpc.insecure_channel('localhost:{}'.format(
+ self._port)) as channel: # pylint: disable=bad-continuation
+ multi_callable = channel.stream_stream('Meffod')
+ request_iterators = tuple(
+ _Pipe((b'abc',))
+ for _ in range(test_constants.THREAD_CONCURRENCY))
+ response_iterators = []
+ for request_iterator in request_iterators:
+ response_iterator = multi_callable(request_iterator)
+ next(response_iterator)
+ response_iterators.append(response_iterator)
+ for request_iterator in request_iterators:
+ request_iterator.close()
+
+ for response_iterator in response_iterators:
+ self.assertIs(response_iterator.code(), grpc.StatusCode.CANCELLED)
+
+ def test_many_concurrent_closes(self):
+ channel = grpc.insecure_channel('localhost:{}'.format(self._port))
+ multi_callable = channel.stream_stream('Meffod')
+ request_iterator = _Pipe((b'abc',))
+ response_iterator = multi_callable(request_iterator)
+ next(response_iterator)
+ start = time.time()
+ end = start + _MORE_TIME
+
+ def sleep_some_time_then_close():
+ time.sleep(_SOME_TIME)
+ channel.close()
+
+ for _ in range(test_constants.THREAD_CONCURRENCY):
+ close_thread = threading.Thread(target=sleep_some_time_then_close)
+ close_thread.start()
+ while True:
+ request_iterator.add(b'def')
+ time.sleep(_BEAT)
+ if end < time.time():
+ break
+ request_iterator.close()
+
+ self.assertIs(response_iterator.code(), grpc.StatusCode.CANCELLED)
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_tests/tests/unit/_compression_test.py b/src/python/grpcio_tests/tests/unit/_compression_test.py
index 7550cd39ba..0b11f03adf 100644
--- a/src/python/grpcio_tests/tests/unit/_compression_test.py
+++ b/src/python/grpcio_tests/tests/unit/_compression_test.py
@@ -52,9 +52,9 @@ class _MethodHandler(grpc.RpcMethodHandler):
self.stream_unary = None
self.stream_stream = None
if self.request_streaming and self.response_streaming:
- self.stream_stream = lambda x, y: handle_stream(x, y)
+ self.stream_stream = handle_stream
elif not self.request_streaming and not self.response_streaming:
- self.unary_unary = lambda x, y: handle_unary(x, y)
+ self.unary_unary = handle_unary
class _GenericHandler(grpc.GenericRpcHandler):
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py b/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py
index 3765ce4fb0..578a3d79ad 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py
@@ -19,6 +19,7 @@ import unittest
from grpc._cython import cygrpc
from grpc.framework.foundation import logging_pool
from tests.unit.framework.common import test_constants
+from tests.unit._cython import test_utilities
_EMPTY_FLAGS = 0
_EMPTY_METADATA = ()
@@ -30,6 +31,8 @@ _RECEIVE_MESSAGE_TAG = 'receive_message'
_SERVER_COMPLETE_CALL_TAG = 'server_complete_call'
_SUCCESS_CALL_FRACTION = 1.0 / 8.0
+_SUCCESSFUL_CALLS = int(test_constants.RPC_CONCURRENCY * _SUCCESS_CALL_FRACTION)
+_UNSUCCESSFUL_CALLS = test_constants.RPC_CONCURRENCY - _SUCCESSFUL_CALLS
class _State(object):
@@ -43,7 +46,7 @@ class _State(object):
def _is_cancellation_event(event):
return (event.tag is _RECEIVE_CLOSE_ON_SERVER_TAG and
- event.batch_operations[0].received_cancelled)
+ event.batch_operations[0].cancelled())
class _Handler(object):
@@ -150,7 +153,8 @@ class CancelManyCallsTest(unittest.TestCase):
server.register_completion_queue(server_completion_queue)
port = server.add_http2_port(b'[::]:0')
server.start()
- channel = cygrpc.Channel('localhost:{}'.format(port).encode(), None)
+ channel = cygrpc.Channel('localhost:{}'.format(port).encode(), None,
+ None)
state = _State()
@@ -165,31 +169,33 @@ class CancelManyCallsTest(unittest.TestCase):
client_condition = threading.Condition()
client_due = set()
- client_completion_queue = cygrpc.CompletionQueue()
- client_driver = _QueueDriver(client_condition, client_completion_queue,
- client_due)
- client_driver.start()
with client_condition:
client_calls = []
for index in range(test_constants.RPC_CONCURRENCY):
- client_call = channel.create_call(None, _EMPTY_FLAGS,
- client_completion_queue,
- b'/twinkies', None, None)
- operations = (
- cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA,
- _EMPTY_FLAGS),
- cygrpc.SendMessageOperation(b'\x45\x56', _EMPTY_FLAGS),
- cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
- cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
- cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
- cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
- )
tag = 'client_complete_call_{0:04d}_tag'.format(index)
- client_call.start_client_batch(operations, tag)
+ client_call = channel.integrated_call(
+ _EMPTY_FLAGS, b'/twinkies', None, None, _EMPTY_METADATA,
+ None, ((
+ (
+ cygrpc.SendInitialMetadataOperation(
+ _EMPTY_METADATA, _EMPTY_FLAGS),
+ cygrpc.SendMessageOperation(b'\x45\x56',
+ _EMPTY_FLAGS),
+ cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
+ cygrpc.ReceiveInitialMetadataOperation(
+ _EMPTY_FLAGS),
+ cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
+ cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
+ ),
+ tag,
+ ),))
client_due.add(tag)
client_calls.append(client_call)
+ client_events_future = test_utilities.SimpleFuture(
+ lambda: tuple(channel.next_call_event() for _ in range(_SUCCESSFUL_CALLS)))
+
with state.condition:
while True:
if state.parked_handlers < test_constants.THREAD_CONCURRENCY:
@@ -201,12 +207,14 @@ class CancelManyCallsTest(unittest.TestCase):
state.condition.notify_all()
break
- client_driver.events(
- test_constants.RPC_CONCURRENCY * _SUCCESS_CALL_FRACTION)
+ client_events_future.result()
with client_condition:
for client_call in client_calls:
- client_call.cancel()
+ client_call.cancel(cygrpc.StatusCode.cancelled, 'Cancelled!')
+ for _ in range(_UNSUCCESSFUL_CALLS):
+ channel.next_call_event()
+ channel.close(cygrpc.StatusCode.unknown, 'Cancelled on channel close!')
with state.condition:
server.shutdown(server_completion_queue, _SERVER_SHUTDOWN_TAG)
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_channel_test.py b/src/python/grpcio_tests/tests/unit/_cython/_channel_test.py
index 7305d0fa3f..d95286071d 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/_channel_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/_channel_test.py
@@ -21,25 +21,20 @@ from grpc._cython import cygrpc
from tests.unit.framework.common import test_constants
-def _channel_and_completion_queue():
- channel = cygrpc.Channel(b'localhost:54321', ())
- completion_queue = cygrpc.CompletionQueue()
- return channel, completion_queue
+def _channel():
+ return cygrpc.Channel(b'localhost:54321', (), None)
-def _connectivity_loop(channel, completion_queue):
+def _connectivity_loop(channel):
for _ in range(100):
connectivity = channel.check_connectivity_state(True)
- channel.watch_connectivity_state(connectivity,
- time.time() + 0.2, completion_queue,
- None)
- completion_queue.poll()
+ channel.watch_connectivity_state(connectivity, time.time() + 0.2)
def _create_loop_destroy():
- channel, completion_queue = _channel_and_completion_queue()
- _connectivity_loop(channel, completion_queue)
- completion_queue.shutdown()
+ channel = _channel()
+ _connectivity_loop(channel)
+ channel.close(cygrpc.StatusCode.ok, 'Channel close!')
def _in_parallel(behavior, arguments):
@@ -55,12 +50,9 @@ def _in_parallel(behavior, arguments):
class ChannelTest(unittest.TestCase):
def test_single_channel_lonely_connectivity(self):
- channel, completion_queue = _channel_and_completion_queue()
- _in_parallel(_connectivity_loop, (
- channel,
- completion_queue,
- ))
- completion_queue.shutdown()
+ channel = _channel()
+ _connectivity_loop(channel)
+ channel.close(cygrpc.StatusCode.ok, 'Channel close!')
def test_multiple_channels_lonely_connectivity(self):
_in_parallel(_create_loop_destroy, ())
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_common.py b/src/python/grpcio_tests/tests/unit/_cython/_common.py
index 7fd3d19b4e..d8210f36f8 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/_common.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/_common.py
@@ -100,7 +100,8 @@ class RpcTest(object):
self.server.register_completion_queue(self.server_completion_queue)
port = self.server.add_http2_port(b'[::]:0')
self.server.start()
- self.channel = cygrpc.Channel('localhost:{}'.format(port).encode(), [])
+ self.channel = cygrpc.Channel('localhost:{}'.format(port).encode(), [],
+ None)
self._server_shutdown_tag = 'server_shutdown_tag'
self.server_condition = threading.Condition()
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py
index 7caa98f72d..8a721788f4 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py
@@ -19,6 +19,7 @@ import unittest
from grpc._cython import cygrpc
from tests.unit._cython import _common
+from tests.unit._cython import test_utilities
class Test(_common.RpcTest, unittest.TestCase):
@@ -41,31 +42,27 @@ class Test(_common.RpcTest, unittest.TestCase):
server_request_call_tag,
})
- client_call = self.channel.create_call(None, _common.EMPTY_FLAGS,
- self.client_completion_queue,
- b'/twinkies', None, None)
client_receive_initial_metadata_tag = 'client_receive_initial_metadata_tag'
client_complete_rpc_tag = 'client_complete_rpc_tag'
- with self.client_condition:
- client_receive_initial_metadata_start_batch_result = (
- client_call.start_client_batch([
- cygrpc.ReceiveInitialMetadataOperation(_common.EMPTY_FLAGS),
- ], client_receive_initial_metadata_tag))
- self.assertEqual(cygrpc.CallError.ok,
- client_receive_initial_metadata_start_batch_result)
- client_complete_rpc_start_batch_result = client_call.start_client_batch(
+ client_call = self.channel.integrated_call(
+ _common.EMPTY_FLAGS, b'/twinkies', None, None,
+ _common.INVOCATION_METADATA, None, [(
[
- cygrpc.SendInitialMetadataOperation(
- _common.INVOCATION_METADATA, _common.EMPTY_FLAGS),
- cygrpc.SendCloseFromClientOperation(_common.EMPTY_FLAGS),
- cygrpc.ReceiveStatusOnClientOperation(_common.EMPTY_FLAGS),
- ], client_complete_rpc_tag)
- self.assertEqual(cygrpc.CallError.ok,
- client_complete_rpc_start_batch_result)
- self.client_driver.add_due({
+ cygrpc.ReceiveInitialMetadataOperation(_common.EMPTY_FLAGS),
+ ],
client_receive_initial_metadata_tag,
- client_complete_rpc_tag,
- })
+ )])
+ client_call.operate([
+ cygrpc.SendInitialMetadataOperation(_common.INVOCATION_METADATA,
+ _common.EMPTY_FLAGS),
+ cygrpc.SendCloseFromClientOperation(_common.EMPTY_FLAGS),
+ cygrpc.ReceiveStatusOnClientOperation(_common.EMPTY_FLAGS),
+ ], client_complete_rpc_tag)
+
+ client_events_future = test_utilities.SimpleFuture(
+ lambda: [
+ self.channel.next_call_event(),
+ self.channel.next_call_event(),])
server_request_call_event = self.server_driver.event_with_tag(
server_request_call_tag)
@@ -96,20 +93,23 @@ class Test(_common.RpcTest, unittest.TestCase):
server_complete_rpc_event = server_call_driver.event_with_tag(
server_complete_rpc_tag)
- client_receive_initial_metadata_event = self.client_driver.event_with_tag(
- client_receive_initial_metadata_tag)
- client_complete_rpc_event = self.client_driver.event_with_tag(
- client_complete_rpc_tag)
+ client_events = client_events_future.result()
+ if client_events[0].tag is client_receive_initial_metadata_tag:
+ client_receive_initial_metadata_event = client_events[0]
+ client_complete_rpc_event = client_events[1]
+ else:
+ client_complete_rpc_event = client_events[0]
+ client_receive_initial_metadata_event = client_events[1]
return (
_common.OperationResult(server_request_call_start_batch_result,
server_request_call_event.completion_type,
server_request_call_event.success),
_common.OperationResult(
- client_receive_initial_metadata_start_batch_result,
+ cygrpc.CallError.ok,
client_receive_initial_metadata_event.completion_type,
client_receive_initial_metadata_event.success),
- _common.OperationResult(client_complete_rpc_start_batch_result,
+ _common.OperationResult(cygrpc.CallError.ok,
client_complete_rpc_event.completion_type,
client_complete_rpc_event.success),
_common.OperationResult(
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py
index 8582a39c01..47f39ebce2 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py
@@ -19,6 +19,7 @@ import unittest
from grpc._cython import cygrpc
from tests.unit._cython import _common
+from tests.unit._cython import test_utilities
class Test(_common.RpcTest, unittest.TestCase):
@@ -36,28 +37,31 @@ class Test(_common.RpcTest, unittest.TestCase):
server_request_call_tag,
})
- client_call = self.channel.create_call(None, _common.EMPTY_FLAGS,
- self.client_completion_queue,
- b'/twinkies', None, None)
client_receive_initial_metadata_tag = 'client_receive_initial_metadata_tag'
client_complete_rpc_tag = 'client_complete_rpc_tag'
- with self.client_condition:
- client_receive_initial_metadata_start_batch_result = (
- client_call.start_client_batch([
- cygrpc.ReceiveInitialMetadataOperation(_common.EMPTY_FLAGS),
- ], client_receive_initial_metadata_tag))
- client_complete_rpc_start_batch_result = client_call.start_client_batch(
- [
- cygrpc.SendInitialMetadataOperation(
- _common.INVOCATION_METADATA, _common.EMPTY_FLAGS),
- cygrpc.SendCloseFromClientOperation(_common.EMPTY_FLAGS),
- cygrpc.ReceiveStatusOnClientOperation(_common.EMPTY_FLAGS),
- ], client_complete_rpc_tag)
- self.client_driver.add_due({
- client_receive_initial_metadata_tag,
- client_complete_rpc_tag,
- })
-
+ client_call = self.channel.integrated_call(
+ _common.EMPTY_FLAGS, b'/twinkies', None, None,
+ _common.INVOCATION_METADATA, None, [
+ (
+ [
+ cygrpc.SendInitialMetadataOperation(
+ _common.INVOCATION_METADATA, _common.EMPTY_FLAGS),
+ cygrpc.SendCloseFromClientOperation(
+ _common.EMPTY_FLAGS),
+ cygrpc.ReceiveStatusOnClientOperation(
+ _common.EMPTY_FLAGS),
+ ],
+ client_complete_rpc_tag,
+ ),
+ ])
+ client_call.operate([
+ cygrpc.ReceiveInitialMetadataOperation(_common.EMPTY_FLAGS),
+ ], client_receive_initial_metadata_tag)
+
+ client_events_future = test_utilities.SimpleFuture(
+ lambda: [
+ self.channel.next_call_event(),
+ self.channel.next_call_event(),])
server_request_call_event = self.server_driver.event_with_tag(
server_request_call_tag)
@@ -87,20 +91,19 @@ class Test(_common.RpcTest, unittest.TestCase):
server_complete_rpc_event = self.server_driver.event_with_tag(
server_complete_rpc_tag)
- client_receive_initial_metadata_event = self.client_driver.event_with_tag(
- client_receive_initial_metadata_tag)
- client_complete_rpc_event = self.client_driver.event_with_tag(
- client_complete_rpc_tag)
+ client_events = client_events_future.result()
+ client_receive_initial_metadata_event = client_events[0]
+ client_complete_rpc_event = client_events[1]
return (
_common.OperationResult(server_request_call_start_batch_result,
server_request_call_event.completion_type,
server_request_call_event.success),
_common.OperationResult(
- client_receive_initial_metadata_start_batch_result,
+ cygrpc.CallError.ok,
client_receive_initial_metadata_event.completion_type,
client_receive_initial_metadata_event.success),
- _common.OperationResult(client_complete_rpc_start_batch_result,
+ _common.OperationResult(cygrpc.CallError.ok,
client_complete_rpc_event.completion_type,
client_complete_rpc_event.success),
_common.OperationResult(
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py b/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py
index bc63b54879..8a903bfaf9 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py
@@ -17,6 +17,7 @@ import threading
import unittest
from grpc._cython import cygrpc
+from tests.unit._cython import test_utilities
_EMPTY_FLAGS = 0
_EMPTY_METADATA = ()
@@ -118,7 +119,8 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase):
server.register_completion_queue(server_completion_queue)
port = server.add_http2_port(b'[::]:0')
server.start()
- channel = cygrpc.Channel('localhost:{}'.format(port).encode(), set())
+ channel = cygrpc.Channel('localhost:{}'.format(port).encode(), set(),
+ None)
server_shutdown_tag = 'server_shutdown_tag'
server_driver = _ServerDriver(server_completion_queue,
@@ -127,10 +129,6 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase):
client_condition = threading.Condition()
client_due = set()
- client_completion_queue = cygrpc.CompletionQueue()
- client_driver = _QueueDriver(client_condition, client_completion_queue,
- client_due)
- client_driver.start()
server_call_condition = threading.Condition()
server_send_initial_metadata_tag = 'server_send_initial_metadata_tag'
@@ -154,25 +152,28 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase):
server_completion_queue,
server_rpc_tag)
- client_call = channel.create_call(None, _EMPTY_FLAGS,
- client_completion_queue, b'/twinkies',
- None, None)
client_receive_initial_metadata_tag = 'client_receive_initial_metadata_tag'
client_complete_rpc_tag = 'client_complete_rpc_tag'
- with client_condition:
- client_receive_initial_metadata_start_batch_result = (
- client_call.start_client_batch([
- cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
- ], client_receive_initial_metadata_tag))
- client_due.add(client_receive_initial_metadata_tag)
- client_complete_rpc_start_batch_result = (
- client_call.start_client_batch([
- cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA,
- _EMPTY_FLAGS),
- cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
- cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
- ], client_complete_rpc_tag))
- client_due.add(client_complete_rpc_tag)
+ client_call = channel.segregated_call(
+ _EMPTY_FLAGS, b'/twinkies', None, None, _EMPTY_METADATA, None, (
+ (
+ [
+ cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
+ ],
+ client_receive_initial_metadata_tag,
+ ),
+ (
+ [
+ cygrpc.SendInitialMetadataOperation(
+ _EMPTY_METADATA, _EMPTY_FLAGS),
+ cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
+ cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
+ ],
+ client_complete_rpc_tag,
+ ),
+ ))
+ client_receive_initial_metadata_event_future = test_utilities.SimpleFuture(
+ client_call.next_event)
server_rpc_event = server_driver.first_event()
@@ -208,19 +209,20 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase):
server_complete_rpc_tag)
server_call_driver.events()
- with client_condition:
- client_receive_first_message_tag = 'client_receive_first_message_tag'
- client_receive_first_message_start_batch_result = (
- client_call.start_client_batch([
- cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
- ], client_receive_first_message_tag))
- client_due.add(client_receive_first_message_tag)
- client_receive_first_message_event = client_driver.event_with_tag(
- client_receive_first_message_tag)
+ client_recieve_initial_metadata_event = client_receive_initial_metadata_event_future.result(
+ )
+
+ client_receive_first_message_tag = 'client_receive_first_message_tag'
+ client_call.operate([
+ cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
+ ], client_receive_first_message_tag)
+ client_receive_first_message_event = client_call.next_event()
- client_call_cancel_result = client_call.cancel()
- client_driver.events()
+ client_call_cancel_result = client_call.cancel(
+ cygrpc.StatusCode.cancelled, 'Cancelled during test!')
+ client_complete_rpc_event = client_call.next_event()
+ channel.close(cygrpc.StatusCode.unknown, 'Channel closed!')
server.shutdown(server_completion_queue, server_shutdown_tag)
server.cancel_all_calls()
server_driver.events()
@@ -228,11 +230,6 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase):
self.assertEqual(cygrpc.CallError.ok, request_call_result)
self.assertEqual(cygrpc.CallError.ok,
server_send_initial_metadata_start_batch_result)
- self.assertEqual(cygrpc.CallError.ok,
- client_receive_initial_metadata_start_batch_result)
- self.assertEqual(cygrpc.CallError.ok,
- client_complete_rpc_start_batch_result)
- self.assertEqual(cygrpc.CallError.ok, client_call_cancel_result)
self.assertIs(server_rpc_tag, server_rpc_event.tag)
self.assertEqual(cygrpc.CompletionType.operation_complete,
server_rpc_event.completion_type)
diff --git a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py
index 9045ff58a0..724a690746 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py
@@ -51,8 +51,8 @@ class TypeSmokeTest(unittest.TestCase):
del server
def testChannelUpDown(self):
- channel = cygrpc.Channel(b'[::]:0', None)
- del channel
+ channel = cygrpc.Channel(b'[::]:0', None, None)
+ channel.close(cygrpc.StatusCode.cancelled, 'Test method anyway!')
def test_metadata_plugin_call_credentials_up_down(self):
cygrpc.MetadataPluginCallCredentials(_metadata_plugin,
@@ -121,7 +121,7 @@ class ServerClientMixin(object):
client_credentials)
else:
self.client_channel = cygrpc.Channel('localhost:{}'.format(
- self.port).encode(), set())
+ self.port).encode(), set(), None)
if host_override:
self.host_argument = None # default host
self.expected_host = host_override
@@ -131,17 +131,20 @@ class ServerClientMixin(object):
self.expected_host = self.host_argument
def tearDownMixin(self):
+ self.client_channel.close(cygrpc.StatusCode.ok, 'test being torn down!')
+ del self.client_channel
del self.server
del self.client_completion_queue
del self.server_completion_queue
- def _perform_operations(self, operations, call, queue, deadline,
- description):
- """Perform the list of operations with given call, queue, and deadline.
+ def _perform_queue_operations(self, operations, call, queue, deadline,
+ description):
+ """Perform the operations with given call, queue, and deadline.
- Invocation errors are reported with as an exception with `description` in
- the message. Performs the operations asynchronously, returning a future.
- """
+ Invocation errors are reported with as an exception with `description`
+ in the message. Performs the operations asynchronously, returning a
+ future.
+ """
def performer():
tag = object()
@@ -185,9 +188,6 @@ class ServerClientMixin(object):
self.assertEqual(cygrpc.CallError.ok, request_call_result)
client_call_tag = object()
- client_call = self.client_channel.create_call(
- None, 0, self.client_completion_queue, METHOD, self.host_argument,
- DEADLINE)
client_initial_metadata = (
(
CLIENT_METADATA_ASCII_KEY,
@@ -198,18 +198,24 @@ class ServerClientMixin(object):
CLIENT_METADATA_BIN_VALUE,
),
)
- client_start_batch_result = client_call.start_client_batch([
- cygrpc.SendInitialMetadataOperation(client_initial_metadata,
- _EMPTY_FLAGS),
- cygrpc.SendMessageOperation(REQUEST, _EMPTY_FLAGS),
- cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
- cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
- cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
- cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
- ], client_call_tag)
- self.assertEqual(cygrpc.CallError.ok, client_start_batch_result)
- client_event_future = test_utilities.CompletionQueuePollFuture(
- self.client_completion_queue, DEADLINE)
+ client_call = self.client_channel.integrated_call(
+ 0, METHOD, self.host_argument, DEADLINE, client_initial_metadata,
+ None, [
+ (
+ [
+ cygrpc.SendInitialMetadataOperation(
+ client_initial_metadata, _EMPTY_FLAGS),
+ cygrpc.SendMessageOperation(REQUEST, _EMPTY_FLAGS),
+ cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
+ cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
+ cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
+ cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
+ ],
+ client_call_tag,
+ ),
+ ])
+ client_event_future = test_utilities.SimpleFuture(
+ self.client_channel.next_call_event)
request_event = self.server_completion_queue.poll(deadline=DEADLINE)
self.assertEqual(cygrpc.CompletionType.operation_complete,
@@ -285,7 +291,7 @@ class ServerClientMixin(object):
self.assertEqual(5, len(server_event.batch_operations))
found_server_op_types = set()
for server_result in server_event.batch_operations:
- self.assertNotIn(client_result.type(), found_server_op_types)
+ self.assertNotIn(server_result.type(), found_server_op_types)
found_server_op_types.add(server_result.type())
if server_result.type() == cygrpc.OperationType.receive_message:
self.assertEqual(REQUEST, server_result.message())
@@ -304,66 +310,76 @@ class ServerClientMixin(object):
del client_call
del server_call
- def test6522(self):
+ def test_6522(self):
DEADLINE = time.time() + 5
DEADLINE_TOLERANCE = 0.25
METHOD = b'twinkies'
empty_metadata = ()
+ # Prologue
server_request_tag = object()
self.server.request_call(self.server_completion_queue,
self.server_completion_queue,
server_request_tag)
- client_call = self.client_channel.create_call(
- None, 0, self.client_completion_queue, METHOD, self.host_argument,
- DEADLINE)
-
- # Prologue
- def perform_client_operations(operations, description):
- return self._perform_operations(operations, client_call,
- self.client_completion_queue,
- DEADLINE, description)
-
- client_event_future = perform_client_operations([
- cygrpc.SendInitialMetadataOperation(empty_metadata, _EMPTY_FLAGS),
- cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
- ], "Client prologue")
+ client_call = self.client_channel.segregated_call(
+ 0, METHOD, self.host_argument, DEADLINE, None, None, ([(
+ [
+ cygrpc.SendInitialMetadataOperation(empty_metadata,
+ _EMPTY_FLAGS),
+ cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
+ ],
+ object(),
+ ), (
+ [
+ cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
+ ],
+ object(),
+ )]))
+
+ client_initial_metadata_event_future = test_utilities.SimpleFuture(
+ client_call.next_event)
request_event = self.server_completion_queue.poll(deadline=DEADLINE)
server_call = request_event.call
def perform_server_operations(operations, description):
- return self._perform_operations(operations, server_call,
- self.server_completion_queue,
- DEADLINE, description)
+ return self._perform_queue_operations(operations, server_call,
+ self.server_completion_queue,
+ DEADLINE, description)
server_event_future = perform_server_operations([
cygrpc.SendInitialMetadataOperation(empty_metadata, _EMPTY_FLAGS),
], "Server prologue")
- client_event_future.result() # force completion
+ client_initial_metadata_event_future.result() # force completion
server_event_future.result()
# Messaging
for _ in range(10):
- client_event_future = perform_client_operations([
+ client_call.operate([
cygrpc.SendMessageOperation(b'', _EMPTY_FLAGS),
cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
], "Client message")
+ client_message_event_future = test_utilities.SimpleFuture(
+ client_call.next_event)
server_event_future = perform_server_operations([
cygrpc.SendMessageOperation(b'', _EMPTY_FLAGS),
cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
], "Server receive")
- client_event_future.result() # force completion
+ client_message_event_future.result() # force completion
server_event_future.result()
# Epilogue
- client_event_future = perform_client_operations([
+ client_call.operate([
cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
- cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS)
], "Client epilogue")
+ # One for ReceiveStatusOnClient, one for SendCloseFromClient.
+ client_events_future = test_utilities.SimpleFuture(
+ lambda: {
+ client_call.next_event(),
+ client_call.next_event(),})
server_event_future = perform_server_operations([
cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),
@@ -371,7 +387,7 @@ class ServerClientMixin(object):
empty_metadata, cygrpc.StatusCode.ok, b'', _EMPTY_FLAGS)
], "Server epilogue")
- client_event_future.result() # force completion
+ client_events_future.result() # force completion
server_event_future.result()
diff --git a/src/python/grpcio_tests/tests/unit/_cython/test_utilities.py b/src/python/grpcio_tests/tests/unit/_cython/test_utilities.py
index 4a00b9ef2f..7d5eaaaa84 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/test_utilities.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/test_utilities.py
@@ -25,7 +25,7 @@ class SimpleFuture(object):
def wrapped_function():
try:
self._result = function(*args, **kwargs)
- except Exception as error:
+ except Exception as error: # pylint: disable=broad-except
self._error = error
self._result = None
@@ -41,7 +41,7 @@ class SimpleFuture(object):
self._thread.join()
if self._error:
# TODO(atash): re-raise exceptions in a way that preserves tracebacks
- raise self._error
+ raise self._error # pylint: disable=raising-bad-type
return self._result
diff --git a/src/python/grpcio_tests/tests/unit/_exit_test.py b/src/python/grpcio_tests/tests/unit/_exit_test.py
index 6e6d9de0fb..f40f3ae07c 100644
--- a/src/python/grpcio_tests/tests/unit/_exit_test.py
+++ b/src/python/grpcio_tests/tests/unit/_exit_test.py
@@ -49,7 +49,7 @@ def cleanup_processes():
for process in processes:
try:
process.kill()
- except Exception:
+ except Exception: # pylint: disable=broad-except
pass
diff --git a/src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py b/src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py
index e683131722..ad847ae03e 100644
--- a/src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py
+++ b/src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py
@@ -14,7 +14,7 @@
_BEFORE_IMPORT = tuple(globals())
-from grpc import *
+from grpc import * # pylint: disable=wildcard-import
_AFTER_IMPORT = tuple(globals())
diff --git a/src/python/grpcio_tests/tests/unit/_invalid_metadata_test.py b/src/python/grpcio_tests/tests/unit/_invalid_metadata_test.py
index 4edf0fc4ad..f153089a24 100644
--- a/src/python/grpcio_tests/tests/unit/_invalid_metadata_test.py
+++ b/src/python/grpcio_tests/tests/unit/_invalid_metadata_test.py
@@ -81,29 +81,16 @@ class InvalidMetadataTest(unittest.TestCase):
request = b'\x07\x08'
metadata = (('InVaLiD', 'UnaryRequestFutureUnaryResponse'),)
expected_error_details = "metadata was invalid: %s" % metadata
- response_future = self._unary_unary.future(request, metadata=metadata)
- with self.assertRaises(grpc.RpcError) as exception_context:
- response_future.result()
- self.assertEqual(exception_context.exception.details(),
- expected_error_details)
- self.assertEqual(exception_context.exception.code(),
- grpc.StatusCode.INTERNAL)
- self.assertEqual(response_future.details(), expected_error_details)
- self.assertEqual(response_future.code(), grpc.StatusCode.INTERNAL)
+ with self.assertRaises(ValueError) as exception_context:
+ self._unary_unary.future(request, metadata=metadata)
def testUnaryRequestStreamResponse(self):
request = b'\x37\x58'
metadata = (('InVaLiD', 'UnaryRequestStreamResponse'),)
expected_error_details = "metadata was invalid: %s" % metadata
- response_iterator = self._unary_stream(request, metadata=metadata)
- with self.assertRaises(grpc.RpcError) as exception_context:
- next(response_iterator)
- self.assertEqual(exception_context.exception.details(),
- expected_error_details)
- self.assertEqual(exception_context.exception.code(),
- grpc.StatusCode.INTERNAL)
- self.assertEqual(response_iterator.details(), expected_error_details)
- self.assertEqual(response_iterator.code(), grpc.StatusCode.INTERNAL)
+ with self.assertRaises(ValueError) as exception_context:
+ self._unary_stream(request, metadata=metadata)
+ self.assertIn(expected_error_details, str(exception_context.exception))
def testStreamRequestBlockingUnaryResponse(self):
request_iterator = (
@@ -129,32 +116,18 @@ class InvalidMetadataTest(unittest.TestCase):
b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
metadata = (('InVaLiD', 'StreamRequestFutureUnaryResponse'),)
expected_error_details = "metadata was invalid: %s" % metadata
- response_future = self._stream_unary.future(
- request_iterator, metadata=metadata)
- with self.assertRaises(grpc.RpcError) as exception_context:
- response_future.result()
- self.assertEqual(exception_context.exception.details(),
- expected_error_details)
- self.assertEqual(exception_context.exception.code(),
- grpc.StatusCode.INTERNAL)
- self.assertEqual(response_future.details(), expected_error_details)
- self.assertEqual(response_future.code(), grpc.StatusCode.INTERNAL)
+ with self.assertRaises(ValueError) as exception_context:
+ self._stream_unary.future(request_iterator, metadata=metadata)
+ self.assertIn(expected_error_details, str(exception_context.exception))
def testStreamRequestStreamResponse(self):
request_iterator = (
b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
metadata = (('InVaLiD', 'StreamRequestStreamResponse'),)
expected_error_details = "metadata was invalid: %s" % metadata
- response_iterator = self._stream_stream(
- request_iterator, metadata=metadata)
- with self.assertRaises(grpc.RpcError) as exception_context:
- next(response_iterator)
- self.assertEqual(exception_context.exception.details(),
- expected_error_details)
- self.assertEqual(exception_context.exception.code(),
- grpc.StatusCode.INTERNAL)
- self.assertEqual(response_iterator.details(), expected_error_details)
- self.assertEqual(response_iterator.code(), grpc.StatusCode.INTERNAL)
+ with self.assertRaises(ValueError) as exception_context:
+ self._stream_stream(request_iterator, metadata=metadata)
+ self.assertIn(expected_error_details, str(exception_context.exception))
if __name__ == '__main__':
diff --git a/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py
index e40cca8b24..93a5fdf9ff 100644
--- a/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py
+++ b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py
@@ -165,11 +165,13 @@ class FailAfterFewIterationsCounter(object):
def __next__(self):
if self._current >= self._high:
- raise Exception("This is a deliberate failure in a unit test.")
+ raise test_control.Defect()
else:
self._current += 1
return self._bytestring
+ next = __next__
+
def _unary_unary_multi_callable(channel):
return channel.unary_unary(_UNARY_UNARY)
diff --git a/src/python/grpcio_tests/tests/unit/_reconnect_test.py b/src/python/grpcio_tests/tests/unit/_reconnect_test.py
index 8acba5a30b..a708d8d862 100644
--- a/src/python/grpcio_tests/tests/unit/_reconnect_test.py
+++ b/src/python/grpcio_tests/tests/unit/_reconnect_test.py
@@ -89,7 +89,10 @@ class ReconnectTest(unittest.TestCase):
multi_callable = channel.unary_unary(_UNARY_UNARY)
self.assertEqual(_RESPONSE, multi_callable(_REQUEST))
server.stop(None)
- time.sleep(1)
+ # By default, the channel connectivity is checked every 5s
+ # GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS can be set to change
+ # this.
+ time.sleep(5.1)
server = grpc.server(server_pool, (handler,))
server.add_insecure_port('[::]:{}'.format(port))
server.start()
diff --git a/src/python/grpcio_tests/tests/unit/_thread_cleanup_test.py b/src/python/grpcio_tests/tests/unit/_thread_cleanup_test.py
deleted file mode 100644
index 18f5af058a..0000000000
--- a/src/python/grpcio_tests/tests/unit/_thread_cleanup_test.py
+++ /dev/null
@@ -1,115 +0,0 @@
-# Copyright 2016 gRPC authors.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""Tests for CleanupThread."""
-
-import threading
-import time
-import unittest
-
-from grpc import _common
-
-_SHORT_TIME = 0.5
-_LONG_TIME = 5.0
-_EPSILON = 0.5
-
-
-def cleanup(timeout):
- if timeout is not None:
- time.sleep(timeout)
- else:
- time.sleep(_LONG_TIME)
-
-
-def slow_cleanup(timeout):
- # Don't respect timeout
- time.sleep(_LONG_TIME)
-
-
-class CleanupThreadTest(unittest.TestCase):
-
- def testTargetInvocation(self):
- event = threading.Event()
-
- def target(arg1, arg2, arg3=None):
- self.assertEqual('arg1', arg1)
- self.assertEqual('arg2', arg2)
- self.assertEqual('arg3', arg3)
- event.set()
-
- cleanup_thread = _common.CleanupThread(
- behavior=lambda x: None,
- target=target,
- name='test-name',
- args=('arg1', 'arg2'),
- kwargs={
- 'arg3': 'arg3'
- })
- cleanup_thread.start()
- cleanup_thread.join()
- self.assertEqual(cleanup_thread.name, 'test-name')
- self.assertTrue(event.is_set())
-
- def testJoinNoTimeout(self):
- cleanup_thread = _common.CleanupThread(behavior=cleanup)
- cleanup_thread.start()
- start_time = time.time()
- cleanup_thread.join()
- end_time = time.time()
- self.assertAlmostEqual(
- _LONG_TIME, end_time - start_time, delta=_EPSILON)
-
- def testJoinTimeout(self):
- cleanup_thread = _common.CleanupThread(behavior=cleanup)
- cleanup_thread.start()
- start_time = time.time()
- cleanup_thread.join(_SHORT_TIME)
- end_time = time.time()
- self.assertAlmostEqual(
- _SHORT_TIME, end_time - start_time, delta=_EPSILON)
-
- def testJoinTimeoutSlowBehavior(self):
- cleanup_thread = _common.CleanupThread(behavior=slow_cleanup)
- cleanup_thread.start()
- start_time = time.time()
- cleanup_thread.join(_SHORT_TIME)
- end_time = time.time()
- self.assertAlmostEqual(
- _LONG_TIME, end_time - start_time, delta=_EPSILON)
-
- def testJoinTimeoutSlowTarget(self):
- event = threading.Event()
-
- def target():
- event.wait(_LONG_TIME)
-
- cleanup_thread = _common.CleanupThread(behavior=cleanup, target=target)
- cleanup_thread.start()
- start_time = time.time()
- cleanup_thread.join(_SHORT_TIME)
- end_time = time.time()
- self.assertAlmostEqual(
- _SHORT_TIME, end_time - start_time, delta=_EPSILON)
- event.set()
-
- def testJoinZeroTimeout(self):
- cleanup_thread = _common.CleanupThread(behavior=cleanup)
- cleanup_thread.start()
- start_time = time.time()
- cleanup_thread.join(0)
- end_time = time.time()
- self.assertAlmostEqual(0, end_time - start_time, delta=_EPSILON)
-
-
-if __name__ == '__main__':
- unittest.main(verbosity=2)
diff --git a/src/python/grpcio_tests/tests/unit/beta/_beta_features_test.py b/src/python/grpcio_tests/tests/unit/beta/_beta_features_test.py
index 61c03f64ba..b43c647fc9 100644
--- a/src/python/grpcio_tests/tests/unit/beta/_beta_features_test.py
+++ b/src/python/grpcio_tests/tests/unit/beta/_beta_features_test.py
@@ -65,7 +65,7 @@ class _Servicer(object):
self._serviced = True
self._condition.notify_all()
return
- yield
+ yield # pylint: disable=unreachable
def stream_unary(self, request_iterator, context):
for request in request_iterator:
diff --git a/src/python/grpcio_tests/tests/unit/beta/_face_interface_test.py b/src/python/grpcio_tests/tests/unit/beta/_face_interface_test.py
deleted file mode 100644
index c99738e085..0000000000
--- a/src/python/grpcio_tests/tests/unit/beta/_face_interface_test.py
+++ /dev/null
@@ -1,132 +0,0 @@
-# Copyright 2015 gRPC authors.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""Tests Face interface compliance of the gRPC Python Beta API."""
-
-import collections
-import unittest
-
-import six
-
-from grpc.beta import implementations
-from grpc.beta import interfaces
-from tests.unit import resources
-from tests.unit import test_common as grpc_test_common
-from tests.unit.beta import test_utilities
-from tests.unit.framework.common import test_constants
-from tests.unit.framework.interfaces.face import test_cases
-from tests.unit.framework.interfaces.face import test_interfaces
-
-_SERVER_HOST_OVERRIDE = 'foo.test.google.fr'
-
-
-class _SerializationBehaviors(
- collections.namedtuple('_SerializationBehaviors', (
- 'request_serializers',
- 'request_deserializers',
- 'response_serializers',
- 'response_deserializers',
- ))):
- pass
-
-
-def _serialization_behaviors_from_test_methods(test_methods):
- request_serializers = {}
- request_deserializers = {}
- response_serializers = {}
- response_deserializers = {}
- for (group, method), test_method in six.iteritems(test_methods):
- request_serializers[group, method] = test_method.serialize_request
- request_deserializers[group, method] = test_method.deserialize_request
- response_serializers[group, method] = test_method.serialize_response
- response_deserializers[group, method] = test_method.deserialize_response
- return _SerializationBehaviors(request_serializers, request_deserializers,
- response_serializers, response_deserializers)
-
-
-class _Implementation(test_interfaces.Implementation):
-
- def instantiate(self, methods, method_implementations,
- multi_method_implementation):
- serialization_behaviors = _serialization_behaviors_from_test_methods(
- methods)
- # TODO(nathaniel): Add a "groups" attribute to _digest.TestServiceDigest.
- service = next(iter(methods))[0]
- # TODO(nathaniel): Add a "cardinalities_by_group" attribute to
- # _digest.TestServiceDigest.
- cardinalities = {
- method: method_object.cardinality()
- for (group, method), method_object in six.iteritems(methods)
- }
-
- server_options = implementations.server_options(
- request_deserializers=serialization_behaviors.request_deserializers,
- response_serializers=serialization_behaviors.response_serializers,
- thread_pool_size=test_constants.POOL_SIZE)
- server = implementations.server(
- method_implementations, options=server_options)
- server_credentials = implementations.ssl_server_credentials([
- (
- resources.private_key(),
- resources.certificate_chain(),
- ),
- ])
- port = server.add_secure_port('[::]:0', server_credentials)
- server.start()
- channel_credentials = implementations.ssl_channel_credentials(
- resources.test_root_certificates())
- channel = test_utilities.not_really_secure_channel(
- 'localhost', port, channel_credentials, _SERVER_HOST_OVERRIDE)
- stub_options = implementations.stub_options(
- request_serializers=serialization_behaviors.request_serializers,
- response_deserializers=serialization_behaviors.
- response_deserializers,
- thread_pool_size=test_constants.POOL_SIZE)
- generic_stub = implementations.generic_stub(
- channel, options=stub_options)
- dynamic_stub = implementations.dynamic_stub(
- channel, service, cardinalities, options=stub_options)
- return generic_stub, {service: dynamic_stub}, server
-
- def destantiate(self, memo):
- memo.stop(test_constants.SHORT_TIMEOUT).wait()
-
- def invocation_metadata(self):
- return grpc_test_common.INVOCATION_INITIAL_METADATA
-
- def initial_metadata(self):
- return grpc_test_common.SERVICE_INITIAL_METADATA
-
- def terminal_metadata(self):
- return grpc_test_common.SERVICE_TERMINAL_METADATA
-
- def code(self):
- return interfaces.StatusCode.OK
-
- def details(self):
- return grpc_test_common.DETAILS
-
- def metadata_transmitted(self, original_metadata, transmitted_metadata):
- return original_metadata is None or grpc_test_common.metadata_transmitted(
- original_metadata, transmitted_metadata)
-
-
-def load_tests(loader, tests, pattern):
- return unittest.TestSuite(
- tests=tuple(
- loader.loadTestsFromTestCase(test_case_class)
- for test_case_class in test_cases.test_cases(_Implementation())))
-
-
-if __name__ == '__main__':
- unittest.main(verbosity=2)
diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/__init__.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/__init__.py
deleted file mode 100644
index 5fb4f3c3cf..0000000000
--- a/src/python/grpcio_tests/tests/unit/framework/interfaces/__init__.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Copyright 2015 gRPC authors.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py
deleted file mode 100644
index 5d8679aa62..0000000000
--- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py
+++ /dev/null
@@ -1,287 +0,0 @@
-# Copyright 2015 gRPC authors.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""Test code for the Face layer of RPC Framework."""
-
-from __future__ import division
-
-import abc
-import itertools
-import unittest
-from concurrent import futures
-
-import six
-
-# test_interfaces is referenced from specification in this module.
-from grpc.framework.foundation import logging_pool
-from grpc.framework.interfaces.face import face
-from tests.unit.framework.common import test_constants
-from tests.unit.framework.common import test_control
-from tests.unit.framework.common import test_coverage
-from tests.unit.framework.interfaces.face import _3069_test_constant
-from tests.unit.framework.interfaces.face import _digest
-from tests.unit.framework.interfaces.face import _stock_service
-from tests.unit.framework.interfaces.face import test_interfaces # pylint: disable=unused-import
-
-
-class TestCase(
- six.with_metaclass(abc.ABCMeta, test_coverage.Coverage,
- unittest.TestCase)):
- """A test of the Face layer of RPC Framework.
-
- Concrete subclasses must have an "implementation" attribute of type
- test_interfaces.Implementation and an "invoker_constructor" attribute of type
- _invocation.InvokerConstructor.
- """
-
- NAME = 'BlockingInvocationInlineServiceTest'
-
- def setUp(self):
- """See unittest.TestCase.setUp for full specification.
-
- Overriding implementations must call this implementation.
- """
- self._control = test_control.PauseFailControl()
- self._digest = _digest.digest(_stock_service.STOCK_TEST_SERVICE,
- self._control, None)
-
- generic_stub, dynamic_stubs, self._memo = self.implementation.instantiate(
- self._digest.methods, self._digest.inline_method_implementations,
- None)
- self._invoker = self.invoker_constructor.construct_invoker(
- generic_stub, dynamic_stubs, self._digest.methods)
-
- def tearDown(self):
- """See unittest.TestCase.tearDown for full specification.
-
- Overriding implementations must call this implementation.
- """
- self._invoker = None
- self.implementation.destantiate(self._memo)
-
- def testSuccessfulUnaryRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (six.iteritems(
- self._digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
-
- response, call = self._invoker.blocking(group, method)(
- request, test_constants.LONG_TIMEOUT, with_call=True)
-
- test_messages.verify(request, response, self)
-
- def testSuccessfulUnaryRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (six.iteritems(
- self._digest.unary_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
-
- response_iterator = self._invoker.blocking(group, method)(
- request, test_constants.LONG_TIMEOUT)
- responses = list(response_iterator)
-
- test_messages.verify(request, responses, self)
-
- def testSuccessfulStreamRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (six.iteritems(
- self._digest.stream_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
-
- response, call = self._invoker.blocking(group, method)(
- iter(requests), test_constants.LONG_TIMEOUT, with_call=True)
-
- test_messages.verify(requests, response, self)
-
- def testSuccessfulStreamRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (six.iteritems(
- self._digest.stream_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
-
- response_iterator = self._invoker.blocking(group, method)(
- iter(requests), test_constants.LONG_TIMEOUT)
- responses = list(response_iterator)
-
- test_messages.verify(requests, responses, self)
-
- def testSequentialInvocations(self):
- for (group, method), test_messages_sequence in (six.iteritems(
- self._digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- first_request = test_messages.request()
- second_request = test_messages.request()
-
- first_response = self._invoker.blocking(group, method)(
- first_request, test_constants.LONG_TIMEOUT)
-
- test_messages.verify(first_request, first_response, self)
-
- second_response = self._invoker.blocking(group, method)(
- second_request, test_constants.LONG_TIMEOUT)
-
- test_messages.verify(second_request, second_response, self)
-
- def testParallelInvocations(self):
- pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
- for (group, method), test_messages_sequence in (six.iteritems(
- self._digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = []
- response_futures = []
- for _ in range(test_constants.THREAD_CONCURRENCY):
- request = test_messages.request()
- response_future = pool.submit(
- self._invoker.blocking(group, method), request,
- test_constants.LONG_TIMEOUT)
- requests.append(request)
- response_futures.append(response_future)
-
- responses = [
- response_future.result()
- for response_future in response_futures
- ]
-
- for request, response in zip(requests, responses):
- test_messages.verify(request, response, self)
- pool.shutdown(wait=True)
-
- def testWaitingForSomeButNotAllParallelInvocations(self):
- pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
- for (group, method), test_messages_sequence in (six.iteritems(
- self._digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = []
- response_futures_to_indices = {}
- for index in range(test_constants.THREAD_CONCURRENCY):
- request = test_messages.request()
- response_future = pool.submit(
- self._invoker.blocking(group, method), request,
- test_constants.LONG_TIMEOUT)
- requests.append(request)
- response_futures_to_indices[response_future] = index
-
- some_completed_response_futures_iterator = itertools.islice(
- futures.as_completed(response_futures_to_indices),
- test_constants.THREAD_CONCURRENCY // 2)
- for response_future in some_completed_response_futures_iterator:
- index = response_futures_to_indices[response_future]
- test_messages.verify(requests[index],
- response_future.result(), self)
- pool.shutdown(wait=True)
-
- @unittest.skip('Cancellation impossible with blocking control flow!')
- def testCancelledUnaryRequestUnaryResponse(self):
- raise NotImplementedError()
-
- @unittest.skip('Cancellation impossible with blocking control flow!')
- def testCancelledUnaryRequestStreamResponse(self):
- raise NotImplementedError()
-
- @unittest.skip('Cancellation impossible with blocking control flow!')
- def testCancelledStreamRequestUnaryResponse(self):
- raise NotImplementedError()
-
- @unittest.skip('Cancellation impossible with blocking control flow!')
- def testCancelledStreamRequestStreamResponse(self):
- raise NotImplementedError()
-
- def testExpiredUnaryRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (six.iteritems(
- self._digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
-
- with self._control.pause(), self.assertRaises(
- face.ExpirationError):
- self._invoker.blocking(group, method)(
- request, _3069_test_constant.REALLY_SHORT_TIMEOUT)
-
- def testExpiredUnaryRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (six.iteritems(
- self._digest.unary_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
-
- with self._control.pause(), self.assertRaises(
- face.ExpirationError):
- response_iterator = self._invoker.blocking(group, method)(
- request, _3069_test_constant.REALLY_SHORT_TIMEOUT)
- list(response_iterator)
-
- def testExpiredStreamRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (six.iteritems(
- self._digest.stream_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
-
- with self._control.pause(), self.assertRaises(
- face.ExpirationError):
- self._invoker.blocking(
- group, method)(iter(requests),
- _3069_test_constant.REALLY_SHORT_TIMEOUT)
-
- def testExpiredStreamRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (six.iteritems(
- self._digest.stream_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
-
- with self._control.pause(), self.assertRaises(
- face.ExpirationError):
- response_iterator = self._invoker.blocking(
- group, method)(iter(requests),
- _3069_test_constant.REALLY_SHORT_TIMEOUT)
- list(response_iterator)
-
- def testFailedUnaryRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (six.iteritems(
- self._digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
-
- with self._control.fail(), self.assertRaises(face.RemoteError):
- self._invoker.blocking(group, method)(
- request, test_constants.LONG_TIMEOUT)
-
- def testFailedUnaryRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (six.iteritems(
- self._digest.unary_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
-
- with self._control.fail(), self.assertRaises(face.RemoteError):
- response_iterator = self._invoker.blocking(group, method)(
- request, test_constants.LONG_TIMEOUT)
- list(response_iterator)
-
- def testFailedStreamRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (six.iteritems(
- self._digest.stream_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
-
- with self._control.fail(), self.assertRaises(face.RemoteError):
- self._invoker.blocking(group, method)(
- iter(requests), test_constants.LONG_TIMEOUT)
-
- def testFailedStreamRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (six.iteritems(
- self._digest.stream_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
-
- with self._control.fail(), self.assertRaises(face.RemoteError):
- response_iterator = self._invoker.blocking(group, method)(
- iter(requests), test_constants.LONG_TIMEOUT)
- list(response_iterator)
diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_digest.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_digest.py
deleted file mode 100644
index b1c33da43a..0000000000
--- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_digest.py
+++ /dev/null
@@ -1,432 +0,0 @@
-# Copyright 2015 gRPC authors.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""Code for making a service.TestService more amenable to use in tests."""
-
-import collections
-import threading
-
-import six
-
-# test_control, _service, and test_interfaces are referenced from specification
-# in this module.
-from grpc.framework.common import cardinality
-from grpc.framework.common import style
-from grpc.framework.foundation import stream
-from grpc.framework.foundation import stream_util
-from grpc.framework.interfaces.face import face
-from tests.unit.framework.common import test_control # pylint: disable=unused-import
-from tests.unit.framework.interfaces.face import _service # pylint: disable=unused-import
-from tests.unit.framework.interfaces.face import test_interfaces # pylint: disable=unused-import
-
-_IDENTITY = lambda x: x
-
-
-class TestServiceDigest(
- collections.namedtuple('TestServiceDigest', (
- 'methods',
- 'inline_method_implementations',
- 'event_method_implementations',
- 'multi_method_implementation',
- 'unary_unary_messages_sequences',
- 'unary_stream_messages_sequences',
- 'stream_unary_messages_sequences',
- 'stream_stream_messages_sequences',
- ))):
- """A transformation of a service.TestService.
-
- Attributes:
- methods: A dict from method group-name pair to test_interfaces.Method object
- describing the RPC methods that may be called during the test.
- inline_method_implementations: A dict from method group-name pair to
- face.MethodImplementation object to be used in tests of in-line calls to
- behaviors under test.
- event_method_implementations: A dict from method group-name pair to
- face.MethodImplementation object to be used in tests of event-driven calls
- to behaviors under test.
- multi_method_implementation: A face.MultiMethodImplementation to be used in
- tests of generic calls to behaviors under test.
- unary_unary_messages_sequences: A dict from method group-name pair to
- sequence of service.UnaryUnaryTestMessages objects to be used to test the
- identified method.
- unary_stream_messages_sequences: A dict from method group-name pair to
- sequence of service.UnaryStreamTestMessages objects to be used to test the
- identified method.
- stream_unary_messages_sequences: A dict from method group-name pair to
- sequence of service.StreamUnaryTestMessages objects to be used to test the
- identified method.
- stream_stream_messages_sequences: A dict from method group-name pair to
- sequence of service.StreamStreamTestMessages objects to be used to test
- the identified method.
- """
-
-
-class _BufferingConsumer(stream.Consumer):
- """A trivial Consumer that dumps what it consumes in a user-mutable buffer."""
-
- def __init__(self):
- self.consumed = []
- self.terminated = False
-
- def consume(self, value):
- self.consumed.append(value)
-
- def terminate(self):
- self.terminated = True
-
- def consume_and_terminate(self, value):
- self.consumed.append(value)
- self.terminated = True
-
-
-class _InlineUnaryUnaryMethod(face.MethodImplementation):
-
- def __init__(self, unary_unary_test_method, control):
- self._test_method = unary_unary_test_method
- self._control = control
-
- self.cardinality = cardinality.Cardinality.UNARY_UNARY
- self.style = style.Service.INLINE
-
- def unary_unary_inline(self, request, context):
- response_list = []
- self._test_method.service(request, response_list.append, context,
- self._control)
- return response_list.pop(0)
-
-
-class _EventUnaryUnaryMethod(face.MethodImplementation):
-
- def __init__(self, unary_unary_test_method, control, pool):
- self._test_method = unary_unary_test_method
- self._control = control
- self._pool = pool
-
- self.cardinality = cardinality.Cardinality.UNARY_UNARY
- self.style = style.Service.EVENT
-
- def unary_unary_event(self, request, response_callback, context):
- if self._pool is None:
- self._test_method.service(request, response_callback, context,
- self._control)
- else:
- self._pool.submit(self._test_method.service, request,
- response_callback, context, self._control)
-
-
-class _InlineUnaryStreamMethod(face.MethodImplementation):
-
- def __init__(self, unary_stream_test_method, control):
- self._test_method = unary_stream_test_method
- self._control = control
-
- self.cardinality = cardinality.Cardinality.UNARY_STREAM
- self.style = style.Service.INLINE
-
- def unary_stream_inline(self, request, context):
- response_consumer = _BufferingConsumer()
- self._test_method.service(request, response_consumer, context,
- self._control)
- for response in response_consumer.consumed:
- yield response
-
-
-class _EventUnaryStreamMethod(face.MethodImplementation):
-
- def __init__(self, unary_stream_test_method, control, pool):
- self._test_method = unary_stream_test_method
- self._control = control
- self._pool = pool
-
- self.cardinality = cardinality.Cardinality.UNARY_STREAM
- self.style = style.Service.EVENT
-
- def unary_stream_event(self, request, response_consumer, context):
- if self._pool is None:
- self._test_method.service(request, response_consumer, context,
- self._control)
- else:
- self._pool.submit(self._test_method.service, request,
- response_consumer, context, self._control)
-
-
-class _InlineStreamUnaryMethod(face.MethodImplementation):
-
- def __init__(self, stream_unary_test_method, control):
- self._test_method = stream_unary_test_method
- self._control = control
-
- self.cardinality = cardinality.Cardinality.STREAM_UNARY
- self.style = style.Service.INLINE
-
- def stream_unary_inline(self, request_iterator, context):
- response_list = []
- request_consumer = self._test_method.service(response_list.append,
- context, self._control)
- for request in request_iterator:
- request_consumer.consume(request)
- request_consumer.terminate()
- return response_list.pop(0)
-
-
-class _EventStreamUnaryMethod(face.MethodImplementation):
-
- def __init__(self, stream_unary_test_method, control, pool):
- self._test_method = stream_unary_test_method
- self._control = control
- self._pool = pool
-
- self.cardinality = cardinality.Cardinality.STREAM_UNARY
- self.style = style.Service.EVENT
-
- def stream_unary_event(self, response_callback, context):
- request_consumer = self._test_method.service(response_callback, context,
- self._control)
- if self._pool is None:
- return request_consumer
- else:
- return stream_util.ThreadSwitchingConsumer(request_consumer,
- self._pool)
-
-
-class _InlineStreamStreamMethod(face.MethodImplementation):
-
- def __init__(self, stream_stream_test_method, control):
- self._test_method = stream_stream_test_method
- self._control = control
-
- self.cardinality = cardinality.Cardinality.STREAM_STREAM
- self.style = style.Service.INLINE
-
- def stream_stream_inline(self, request_iterator, context):
- response_consumer = _BufferingConsumer()
- request_consumer = self._test_method.service(response_consumer, context,
- self._control)
-
- for request in request_iterator:
- request_consumer.consume(request)
- while response_consumer.consumed:
- yield response_consumer.consumed.pop(0)
- response_consumer.terminate()
-
-
-class _EventStreamStreamMethod(face.MethodImplementation):
-
- def __init__(self, stream_stream_test_method, control, pool):
- self._test_method = stream_stream_test_method
- self._control = control
- self._pool = pool
-
- self.cardinality = cardinality.Cardinality.STREAM_STREAM
- self.style = style.Service.EVENT
-
- def stream_stream_event(self, response_consumer, context):
- request_consumer = self._test_method.service(response_consumer, context,
- self._control)
- if self._pool is None:
- return request_consumer
- else:
- return stream_util.ThreadSwitchingConsumer(request_consumer,
- self._pool)
-
-
-class _UnaryConsumer(stream.Consumer):
- """A Consumer that only allows consumption of exactly one value."""
-
- def __init__(self, action):
- self._lock = threading.Lock()
- self._action = action
- self._consumed = False
- self._terminated = False
-
- def consume(self, value):
- with self._lock:
- if self._consumed:
- raise ValueError('Unary consumer already consumed!')
- elif self._terminated:
- raise ValueError('Unary consumer already terminated!')
- else:
- self._consumed = True
-
- self._action(value)
-
- def terminate(self):
- with self._lock:
- if not self._consumed:
- raise ValueError('Unary consumer hasn\'t yet consumed!')
- elif self._terminated:
- raise ValueError('Unary consumer already terminated!')
- else:
- self._terminated = True
-
- def consume_and_terminate(self, value):
- with self._lock:
- if self._consumed:
- raise ValueError('Unary consumer already consumed!')
- elif self._terminated:
- raise ValueError('Unary consumer already terminated!')
- else:
- self._consumed = True
- self._terminated = True
-
- self._action(value)
-
-
-class _UnaryUnaryAdaptation(object):
-
- def __init__(self, unary_unary_test_method):
- self._method = unary_unary_test_method
-
- def service(self, response_consumer, context, control):
-
- def action(request):
- self._method.service(request,
- response_consumer.consume_and_terminate,
- context, control)
-
- return _UnaryConsumer(action)
-
-
-class _UnaryStreamAdaptation(object):
-
- def __init__(self, unary_stream_test_method):
- self._method = unary_stream_test_method
-
- def service(self, response_consumer, context, control):
-
- def action(request):
- self._method.service(request, response_consumer, context, control)
-
- return _UnaryConsumer(action)
-
-
-class _StreamUnaryAdaptation(object):
-
- def __init__(self, stream_unary_test_method):
- self._method = stream_unary_test_method
-
- def service(self, response_consumer, context, control):
- return self._method.service(response_consumer.consume_and_terminate,
- context, control)
-
-
-class _MultiMethodImplementation(face.MultiMethodImplementation):
-
- def __init__(self, methods, control, pool):
- self._methods = methods
- self._control = control
- self._pool = pool
-
- def service(self, group, name, response_consumer, context):
- method = self._methods.get(group, name, None)
- if method is None:
- raise face.NoSuchMethodError(group, name)
- elif self._pool is None:
- return method(response_consumer, context, self._control)
- else:
- request_consumer = method(response_consumer, context, self._control)
- return stream_util.ThreadSwitchingConsumer(request_consumer,
- self._pool)
-
-
-class _Assembly(
- collections.namedtuple(
- '_Assembly',
- ['methods', 'inlines', 'events', 'adaptations', 'messages'])):
- """An intermediate structure created when creating a TestServiceDigest."""
-
-
-def _assemble(scenarios, identifiers, inline_method_constructor,
- event_method_constructor, adapter, control, pool):
- """Creates an _Assembly from the given scenarios."""
- methods = {}
- inlines = {}
- events = {}
- adaptations = {}
- messages = {}
- for identifier, scenario in six.iteritems(scenarios):
- if identifier in identifiers:
- raise ValueError('Repeated identifier "(%s, %s)"!' % identifier)
-
- test_method = scenario[0]
- inline_method = inline_method_constructor(test_method, control)
- event_method = event_method_constructor(test_method, control, pool)
- adaptation = adapter(test_method)
-
- methods[identifier] = test_method
- inlines[identifier] = inline_method
- events[identifier] = event_method
- adaptations[identifier] = adaptation
- messages[identifier] = scenario[1]
-
- return _Assembly(methods, inlines, events, adaptations, messages)
-
-
-def digest(service, control, pool):
- """Creates a TestServiceDigest from a TestService.
-
- Args:
- service: A _service.TestService.
- control: A test_control.Control.
- pool: If RPC methods should be serviced in a separate thread, a thread pool.
- None if RPC methods should be serviced in the thread belonging to the
- run-time that calls for their service.
-
- Returns:
- A TestServiceDigest synthesized from the given service.TestService.
- """
- identifiers = set()
-
- unary_unary = _assemble(service.unary_unary_scenarios(), identifiers,
- _InlineUnaryUnaryMethod, _EventUnaryUnaryMethod,
- _UnaryUnaryAdaptation, control, pool)
- identifiers.update(unary_unary.inlines)
-
- unary_stream = _assemble(service.unary_stream_scenarios(), identifiers,
- _InlineUnaryStreamMethod, _EventUnaryStreamMethod,
- _UnaryStreamAdaptation, control, pool)
- identifiers.update(unary_stream.inlines)
-
- stream_unary = _assemble(service.stream_unary_scenarios(), identifiers,
- _InlineStreamUnaryMethod, _EventStreamUnaryMethod,
- _StreamUnaryAdaptation, control, pool)
- identifiers.update(stream_unary.inlines)
-
- stream_stream = _assemble(service.stream_stream_scenarios(), identifiers,
- _InlineStreamStreamMethod,
- _EventStreamStreamMethod, _IDENTITY, control,
- pool)
- identifiers.update(stream_stream.inlines)
-
- methods = dict(unary_unary.methods)
- methods.update(unary_stream.methods)
- methods.update(stream_unary.methods)
- methods.update(stream_stream.methods)
- adaptations = dict(unary_unary.adaptations)
- adaptations.update(unary_stream.adaptations)
- adaptations.update(stream_unary.adaptations)
- adaptations.update(stream_stream.adaptations)
- inlines = dict(unary_unary.inlines)
- inlines.update(unary_stream.inlines)
- inlines.update(stream_unary.inlines)
- inlines.update(stream_stream.inlines)
- events = dict(unary_unary.events)
- events.update(unary_stream.events)
- events.update(stream_unary.events)
- events.update(stream_stream.events)
-
- return TestServiceDigest(methods, inlines, events,
- _MultiMethodImplementation(adaptations, control,
- pool),
- unary_unary.messages, unary_stream.messages,
- stream_unary.messages, stream_stream.messages)
diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py
deleted file mode 100644
index 3d9b2816aa..0000000000
--- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py
+++ /dev/null
@@ -1,508 +0,0 @@
-# Copyright 2015 gRPC authors.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""Test code for the Face layer of RPC Framework."""
-
-from __future__ import division
-
-import abc
-import contextlib
-import itertools
-import threading
-import unittest
-from concurrent import futures
-
-import six
-
-# test_interfaces is referenced from specification in this module.
-from grpc.framework.foundation import future
-from grpc.framework.foundation import logging_pool
-from grpc.framework.interfaces.face import face
-from tests.unit.framework.common import test_constants
-from tests.unit.framework.common import test_control
-from tests.unit.framework.common import test_coverage
-from tests.unit.framework.interfaces.face import _3069_test_constant
-from tests.unit.framework.interfaces.face import _digest
-from tests.unit.framework.interfaces.face import _stock_service
-from tests.unit.framework.interfaces.face import test_interfaces # pylint: disable=unused-import
-
-
-class _PauseableIterator(object):
-
- def __init__(self, upstream):
- self._upstream = upstream
- self._condition = threading.Condition()
- self._paused = False
-
- @contextlib.contextmanager
- def pause(self):
- with self._condition:
- self._paused = True
- yield
- with self._condition:
- self._paused = False
- self._condition.notify_all()
-
- def __iter__(self):
- return self
-
- def __next__(self):
- return self.next()
-
- def next(self):
- with self._condition:
- while self._paused:
- self._condition.wait()
- return next(self._upstream)
-
-
-class _Callback(object):
-
- def __init__(self):
- self._condition = threading.Condition()
- self._called = False
- self._passed_future = None
- self._passed_other_stuff = None
-
- def __call__(self, *args, **kwargs):
- with self._condition:
- self._called = True
- if args:
- self._passed_future = args[0]
- if 1 < len(args) or kwargs:
- self._passed_other_stuff = tuple(args[1:]), dict(kwargs)
- self._condition.notify_all()
-
- def future(self):
- with self._condition:
- while True:
- if self._passed_other_stuff is not None:
- raise ValueError(
- 'Test callback passed unexpected values: %s',
- self._passed_other_stuff)
- elif self._called:
- return self._passed_future
- else:
- self._condition.wait()
-
-
-class TestCase(
- six.with_metaclass(abc.ABCMeta, test_coverage.Coverage,
- unittest.TestCase)):
- """A test of the Face layer of RPC Framework.
-
- Concrete subclasses must have an "implementation" attribute of type
- test_interfaces.Implementation and an "invoker_constructor" attribute of type
- _invocation.InvokerConstructor.
- """
-
- NAME = 'FutureInvocationAsynchronousEventServiceTest'
-
- def setUp(self):
- """See unittest.TestCase.setUp for full specification.
-
- Overriding implementations must call this implementation.
- """
- self._control = test_control.PauseFailControl()
- self._digest_pool = logging_pool.pool(test_constants.POOL_SIZE)
- self._digest = _digest.digest(_stock_service.STOCK_TEST_SERVICE,
- self._control, self._digest_pool)
-
- generic_stub, dynamic_stubs, self._memo = self.implementation.instantiate(
- self._digest.methods, self._digest.event_method_implementations,
- None)
- self._invoker = self.invoker_constructor.construct_invoker(
- generic_stub, dynamic_stubs, self._digest.methods)
-
- def tearDown(self):
- """See unittest.TestCase.tearDown for full specification.
-
- Overriding implementations must call this implementation.
- """
- self._invoker = None
- self.implementation.destantiate(self._memo)
- self._digest_pool.shutdown(wait=True)
-
- def testSuccessfulUnaryRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (six.iteritems(
- self._digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
- callback = _Callback()
-
- response_future = self._invoker.future(group, method)(
- request, test_constants.LONG_TIMEOUT)
- response_future.add_done_callback(callback)
- response = response_future.result()
-
- test_messages.verify(request, response, self)
- self.assertIs(callback.future(), response_future)
- self.assertIsNone(response_future.exception())
- self.assertIsNone(response_future.traceback())
-
- def testSuccessfulUnaryRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (six.iteritems(
- self._digest.unary_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
-
- response_iterator = self._invoker.future(group, method)(
- request, test_constants.LONG_TIMEOUT)
- responses = list(response_iterator)
-
- test_messages.verify(request, responses, self)
-
- def testSuccessfulStreamRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (six.iteritems(
- self._digest.stream_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
- request_iterator = _PauseableIterator(iter(requests))
- callback = _Callback()
-
- # Use of a paused iterator of requests allows us to test that control is
- # returned to calling code before the iterator yields any requests.
- with request_iterator.pause():
- response_future = self._invoker.future(group, method)(
- request_iterator, test_constants.LONG_TIMEOUT)
- response_future.add_done_callback(callback)
- future_passed_to_callback = callback.future()
- response = future_passed_to_callback.result()
-
- test_messages.verify(requests, response, self)
- self.assertIs(future_passed_to_callback, response_future)
- self.assertIsNone(response_future.exception())
- self.assertIsNone(response_future.traceback())
-
- def testSuccessfulStreamRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (six.iteritems(
- self._digest.stream_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
- request_iterator = _PauseableIterator(iter(requests))
-
- # Use of a paused iterator of requests allows us to test that control is
- # returned to calling code before the iterator yields any requests.
- with request_iterator.pause():
- response_iterator = self._invoker.future(group, method)(
- request_iterator, test_constants.LONG_TIMEOUT)
- responses = list(response_iterator)
-
- test_messages.verify(requests, responses, self)
-
- def testSequentialInvocations(self):
- for (group, method), test_messages_sequence in (six.iteritems(
- self._digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- first_request = test_messages.request()
- second_request = test_messages.request()
-
- first_response_future = self._invoker.future(group, method)(
- first_request, test_constants.LONG_TIMEOUT)
- first_response = first_response_future.result()
-
- test_messages.verify(first_request, first_response, self)
-
- second_response_future = self._invoker.future(group, method)(
- second_request, test_constants.LONG_TIMEOUT)
- second_response = second_response_future.result()
-
- test_messages.verify(second_request, second_response, self)
-
- def testParallelInvocations(self):
- for (group, method), test_messages_sequence in (six.iteritems(
- self._digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- first_request = test_messages.request()
- second_request = test_messages.request()
-
- first_response_future = self._invoker.future(group, method)(
- first_request, test_constants.LONG_TIMEOUT)
- second_response_future = self._invoker.future(group, method)(
- second_request, test_constants.LONG_TIMEOUT)
- first_response = first_response_future.result()
- second_response = second_response_future.result()
-
- test_messages.verify(first_request, first_response, self)
- test_messages.verify(second_request, second_response, self)
-
- for (group, method), test_messages_sequence in (six.iteritems(
- self._digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = []
- response_futures = []
- for _ in range(test_constants.THREAD_CONCURRENCY):
- request = test_messages.request()
- response_future = self._invoker.future(group, method)(
- request, test_constants.LONG_TIMEOUT)
- requests.append(request)
- response_futures.append(response_future)
-
- responses = [
- response_future.result()
- for response_future in response_futures
- ]
-
- for request, response in zip(requests, responses):
- test_messages.verify(request, response, self)
-
- def testWaitingForSomeButNotAllParallelInvocations(self):
- pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
- for (group, method), test_messages_sequence in (six.iteritems(
- self._digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = []
- response_futures_to_indices = {}
- for index in range(test_constants.THREAD_CONCURRENCY):
- request = test_messages.request()
- inner_response_future = self._invoker.future(group, method)(
- request, test_constants.LONG_TIMEOUT)
- outer_response_future = pool.submit(
- inner_response_future.result)
- requests.append(request)
- response_futures_to_indices[outer_response_future] = index
-
- some_completed_response_futures_iterator = itertools.islice(
- futures.as_completed(response_futures_to_indices),
- test_constants.THREAD_CONCURRENCY // 2)
- for response_future in some_completed_response_futures_iterator:
- index = response_futures_to_indices[response_future]
- test_messages.verify(requests[index],
- response_future.result(), self)
- pool.shutdown(wait=True)
-
- def testCancelledUnaryRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (six.iteritems(
- self._digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
- callback = _Callback()
-
- with self._control.pause():
- response_future = self._invoker.future(group, method)(
- request, test_constants.LONG_TIMEOUT)
- response_future.add_done_callback(callback)
- cancel_method_return_value = response_future.cancel()
-
- self.assertIs(callback.future(), response_future)
- self.assertFalse(cancel_method_return_value)
- self.assertTrue(response_future.cancelled())
- with self.assertRaises(future.CancelledError):
- response_future.result()
- with self.assertRaises(future.CancelledError):
- response_future.exception()
- with self.assertRaises(future.CancelledError):
- response_future.traceback()
-
- def testCancelledUnaryRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (six.iteritems(
- self._digest.unary_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
-
- with self._control.pause():
- response_iterator = self._invoker.future(group, method)(
- request, test_constants.LONG_TIMEOUT)
- response_iterator.cancel()
-
- with self.assertRaises(face.CancellationError):
- next(response_iterator)
-
- def testCancelledStreamRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (six.iteritems(
- self._digest.stream_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
- callback = _Callback()
-
- with self._control.pause():
- response_future = self._invoker.future(group, method)(
- iter(requests), test_constants.LONG_TIMEOUT)
- response_future.add_done_callback(callback)
- cancel_method_return_value = response_future.cancel()
-
- self.assertIs(callback.future(), response_future)
- self.assertFalse(cancel_method_return_value)
- self.assertTrue(response_future.cancelled())
- with self.assertRaises(future.CancelledError):
- response_future.result()
- with self.assertRaises(future.CancelledError):
- response_future.exception()
- with self.assertRaises(future.CancelledError):
- response_future.traceback()
-
- def testCancelledStreamRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (six.iteritems(
- self._digest.stream_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
-
- with self._control.pause():
- response_iterator = self._invoker.future(group, method)(
- iter(requests), test_constants.LONG_TIMEOUT)
- response_iterator.cancel()
-
- with self.assertRaises(face.CancellationError):
- next(response_iterator)
-
- def testExpiredUnaryRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (six.iteritems(
- self._digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
- callback = _Callback()
-
- with self._control.pause():
- response_future = self._invoker.future(group, method)(
- request, _3069_test_constant.REALLY_SHORT_TIMEOUT)
- response_future.add_done_callback(callback)
- self.assertIs(callback.future(), response_future)
- self.assertIsInstance(response_future.exception(),
- face.ExpirationError)
- with self.assertRaises(face.ExpirationError):
- response_future.result()
- self.assertIsInstance(response_future.exception(),
- face.AbortionError)
- self.assertIsNotNone(response_future.traceback())
-
- def testExpiredUnaryRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (six.iteritems(
- self._digest.unary_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
-
- with self._control.pause():
- response_iterator = self._invoker.future(group, method)(
- request, _3069_test_constant.REALLY_SHORT_TIMEOUT)
- with self.assertRaises(face.ExpirationError):
- list(response_iterator)
-
- def testExpiredStreamRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (six.iteritems(
- self._digest.stream_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
- callback = _Callback()
-
- with self._control.pause():
- response_future = self._invoker.future(
- group, method)(iter(requests),
- _3069_test_constant.REALLY_SHORT_TIMEOUT)
- response_future.add_done_callback(callback)
- self.assertIs(callback.future(), response_future)
- self.assertIsInstance(response_future.exception(),
- face.ExpirationError)
- with self.assertRaises(face.ExpirationError):
- response_future.result()
- self.assertIsInstance(response_future.exception(),
- face.AbortionError)
- self.assertIsNotNone(response_future.traceback())
-
- def testExpiredStreamRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (six.iteritems(
- self._digest.stream_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
-
- with self._control.pause():
- response_iterator = self._invoker.future(
- group, method)(iter(requests),
- _3069_test_constant.REALLY_SHORT_TIMEOUT)
- with self.assertRaises(face.ExpirationError):
- list(response_iterator)
-
- def testFailedUnaryRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (six.iteritems(
- self._digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
- callback = _Callback()
- abortion_callback = _Callback()
-
- with self._control.fail():
- response_future = self._invoker.future(group, method)(
- request, _3069_test_constant.REALLY_SHORT_TIMEOUT)
- response_future.add_done_callback(callback)
- response_future.add_abortion_callback(abortion_callback)
-
- self.assertIs(callback.future(), response_future)
- # Because the servicer fails outside of the thread from which the
- # servicer-side runtime called into it its failure is
- # indistinguishable from simply not having called its
- # response_callback before the expiration of the RPC.
- self.assertIsInstance(response_future.exception(),
- face.ExpirationError)
- with self.assertRaises(face.ExpirationError):
- response_future.result()
- self.assertIsNotNone(response_future.traceback())
- self.assertIsNotNone(abortion_callback.future())
-
- def testFailedUnaryRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (six.iteritems(
- self._digest.unary_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
-
- # Because the servicer fails outside of the thread from which the
- # servicer-side runtime called into it its failure is indistinguishable
- # from simply not having called its response_consumer before the
- # expiration of the RPC.
- with self._control.fail(), self.assertRaises(
- face.ExpirationError):
- response_iterator = self._invoker.future(group, method)(
- request, _3069_test_constant.REALLY_SHORT_TIMEOUT)
- list(response_iterator)
-
- def testFailedStreamRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (six.iteritems(
- self._digest.stream_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
- callback = _Callback()
- abortion_callback = _Callback()
-
- with self._control.fail():
- response_future = self._invoker.future(
- group, method)(iter(requests),
- _3069_test_constant.REALLY_SHORT_TIMEOUT)
- response_future.add_done_callback(callback)
- response_future.add_abortion_callback(abortion_callback)
-
- self.assertIs(callback.future(), response_future)
- # Because the servicer fails outside of the thread from which the
- # servicer-side runtime called into it its failure is
- # indistinguishable from simply not having called its
- # response_callback before the expiration of the RPC.
- self.assertIsInstance(response_future.exception(),
- face.ExpirationError)
- with self.assertRaises(face.ExpirationError):
- response_future.result()
- self.assertIsNotNone(response_future.traceback())
- self.assertIsNotNone(abortion_callback.future())
-
- def testFailedStreamRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (six.iteritems(
- self._digest.stream_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
-
- # Because the servicer fails outside of the thread from which the
- # servicer-side runtime called into it its failure is indistinguishable
- # from simply not having called its response_consumer before the
- # expiration of the RPC.
- with self._control.fail(), self.assertRaises(
- face.ExpirationError):
- response_iterator = self._invoker.future(
- group, method)(iter(requests),
- _3069_test_constant.REALLY_SHORT_TIMEOUT)
- list(response_iterator)
diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_invocation.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_invocation.py
deleted file mode 100644
index efc93d56b0..0000000000
--- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_invocation.py
+++ /dev/null
@@ -1,198 +0,0 @@
-# Copyright 2015 gRPC authors.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""Coverage across the Face layer's generic-to-dynamic range for invocation."""
-
-import abc
-
-import six
-
-from grpc.framework.common import cardinality
-
-_CARDINALITY_TO_GENERIC_BLOCKING_BEHAVIOR = {
- cardinality.Cardinality.UNARY_UNARY: 'blocking_unary_unary',
- cardinality.Cardinality.UNARY_STREAM: 'inline_unary_stream',
- cardinality.Cardinality.STREAM_UNARY: 'blocking_stream_unary',
- cardinality.Cardinality.STREAM_STREAM: 'inline_stream_stream',
-}
-
-_CARDINALITY_TO_GENERIC_FUTURE_BEHAVIOR = {
- cardinality.Cardinality.UNARY_UNARY: 'future_unary_unary',
- cardinality.Cardinality.UNARY_STREAM: 'inline_unary_stream',
- cardinality.Cardinality.STREAM_UNARY: 'future_stream_unary',
- cardinality.Cardinality.STREAM_STREAM: 'inline_stream_stream',
-}
-
-_CARDINALITY_TO_GENERIC_EVENT_BEHAVIOR = {
- cardinality.Cardinality.UNARY_UNARY: 'event_unary_unary',
- cardinality.Cardinality.UNARY_STREAM: 'event_unary_stream',
- cardinality.Cardinality.STREAM_UNARY: 'event_stream_unary',
- cardinality.Cardinality.STREAM_STREAM: 'event_stream_stream',
-}
-
-_CARDINALITY_TO_MULTI_CALLABLE_ATTRIBUTE = {
- cardinality.Cardinality.UNARY_UNARY: 'unary_unary',
- cardinality.Cardinality.UNARY_STREAM: 'unary_stream',
- cardinality.Cardinality.STREAM_UNARY: 'stream_unary',
- cardinality.Cardinality.STREAM_STREAM: 'stream_stream',
-}
-
-
-class Invoker(six.with_metaclass(abc.ABCMeta)):
- """A type used to invoke test RPCs."""
-
- @abc.abstractmethod
- def blocking(self, group, name):
- """Invokes an RPC with blocking control flow."""
- raise NotImplementedError()
-
- @abc.abstractmethod
- def future(self, group, name):
- """Invokes an RPC with future control flow."""
- raise NotImplementedError()
-
- @abc.abstractmethod
- def event(self, group, name):
- """Invokes an RPC with event control flow."""
- raise NotImplementedError()
-
-
-class InvokerConstructor(six.with_metaclass(abc.ABCMeta)):
- """A type used to create Invokers."""
-
- @abc.abstractmethod
- def name(self):
- """Specifies the name of the Invoker constructed by this object."""
- raise NotImplementedError()
-
- @abc.abstractmethod
- def construct_invoker(self, generic_stub, dynamic_stubs, methods):
- """Constructs an Invoker for the given stubs and methods."""
- raise NotImplementedError()
-
-
-class _GenericInvoker(Invoker):
-
- def __init__(self, generic_stub, methods):
- self._stub = generic_stub
- self._methods = methods
-
- def _behavior(self, group, name, cardinality_to_generic_method):
- method_cardinality = self._methods[group, name].cardinality()
- behavior = getattr(self._stub,
- cardinality_to_generic_method[method_cardinality])
- return lambda *args, **kwargs: behavior(group, name, *args, **kwargs)
-
- def blocking(self, group, name):
- return self._behavior(group, name,
- _CARDINALITY_TO_GENERIC_BLOCKING_BEHAVIOR)
-
- def future(self, group, name):
- return self._behavior(group, name,
- _CARDINALITY_TO_GENERIC_FUTURE_BEHAVIOR)
-
- def event(self, group, name):
- return self._behavior(group, name,
- _CARDINALITY_TO_GENERIC_EVENT_BEHAVIOR)
-
-
-class _GenericInvokerConstructor(InvokerConstructor):
-
- def name(self):
- return 'GenericInvoker'
-
- def construct_invoker(self, generic_stub, dynamic_stub, methods):
- return _GenericInvoker(generic_stub, methods)
-
-
-class _MultiCallableInvoker(Invoker):
-
- def __init__(self, generic_stub, methods):
- self._stub = generic_stub
- self._methods = methods
-
- def _multi_callable(self, group, name):
- method_cardinality = self._methods[group, name].cardinality()
- behavior = getattr(
- self._stub,
- _CARDINALITY_TO_MULTI_CALLABLE_ATTRIBUTE[method_cardinality])
- return behavior(group, name)
-
- def blocking(self, group, name):
- return self._multi_callable(group, name)
-
- def future(self, group, name):
- method_cardinality = self._methods[group, name].cardinality()
- behavior = getattr(
- self._stub,
- _CARDINALITY_TO_MULTI_CALLABLE_ATTRIBUTE[method_cardinality])
- if method_cardinality in (cardinality.Cardinality.UNARY_UNARY,
- cardinality.Cardinality.STREAM_UNARY):
- return behavior(group, name).future
- else:
- return behavior(group, name)
-
- def event(self, group, name):
- return self._multi_callable(group, name).event
-
-
-class _MultiCallableInvokerConstructor(InvokerConstructor):
-
- def name(self):
- return 'MultiCallableInvoker'
-
- def construct_invoker(self, generic_stub, dynamic_stub, methods):
- return _MultiCallableInvoker(generic_stub, methods)
-
-
-class _DynamicInvoker(Invoker):
-
- def __init__(self, dynamic_stubs, methods):
- self._stubs = dynamic_stubs
- self._methods = methods
-
- def blocking(self, group, name):
- return getattr(self._stubs[group], name)
-
- def future(self, group, name):
- if self._methods[group, name].cardinality() in (
- cardinality.Cardinality.UNARY_UNARY,
- cardinality.Cardinality.STREAM_UNARY):
- return getattr(self._stubs[group], name).future
- else:
- return getattr(self._stubs[group], name)
-
- def event(self, group, name):
- return getattr(self._stubs[group], name).event
-
-
-class _DynamicInvokerConstructor(InvokerConstructor):
-
- def name(self):
- return 'DynamicInvoker'
-
- def construct_invoker(self, generic_stub, dynamic_stubs, methods):
- return _DynamicInvoker(dynamic_stubs, methods)
-
-
-def invoker_constructors():
- """Creates a sequence of InvokerConstructors to use in tests of RPCs.
-
- Returns:
- A sequence of InvokerConstructors.
- """
- return (
- _GenericInvokerConstructor(),
- _MultiCallableInvokerConstructor(),
- _DynamicInvokerConstructor(),
- )
diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_service.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_service.py
deleted file mode 100644
index f1c96b6dc5..0000000000
--- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_service.py
+++ /dev/null
@@ -1,304 +0,0 @@
-# Copyright 2015 gRPC authors.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""Private interfaces implemented by data sets used in Face-layer tests."""
-
-import abc
-
-import six
-
-# face is referenced from specification in this module.
-from grpc.framework.interfaces.face import face # pylint: disable=unused-import
-from tests.unit.framework.interfaces.face import test_interfaces
-
-
-class UnaryUnaryTestMethodImplementation(
- six.with_metaclass(abc.ABCMeta, test_interfaces.Method)):
- """A controllable implementation of a unary-unary method."""
-
- @abc.abstractmethod
- def service(self, request, response_callback, context, control):
- """Services an RPC that accepts one message and produces one message.
-
- Args:
- request: The single request message for the RPC.
- response_callback: A callback to be called to accept the response message
- of the RPC.
- context: An face.ServicerContext object.
- control: A test_control.Control to control execution of this method.
-
- Raises:
- abandonment.Abandoned: May or may not be raised when the RPC has been
- aborted.
- """
- raise NotImplementedError()
-
-
-class UnaryUnaryTestMessages(six.with_metaclass(abc.ABCMeta)):
- """A type for unary-request-unary-response message pairings."""
-
- @abc.abstractmethod
- def request(self):
- """Affords a request message.
-
- Implementations of this method should return a different message with each
- call so that multiple test executions of the test method may be made with
- different inputs.
-
- Returns:
- A request message.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def verify(self, request, response, test_case):
- """Verifies that the computed response matches the given request.
-
- Args:
- request: A request message.
- response: A response message.
- test_case: A unittest.TestCase object affording useful assertion methods.
-
- Raises:
- AssertionError: If the request and response do not match, indicating that
- there was some problem executing the RPC under test.
- """
- raise NotImplementedError()
-
-
-class UnaryStreamTestMethodImplementation(
- six.with_metaclass(abc.ABCMeta, test_interfaces.Method)):
- """A controllable implementation of a unary-stream method."""
-
- @abc.abstractmethod
- def service(self, request, response_consumer, context, control):
- """Services an RPC that takes one message and produces a stream of messages.
-
- Args:
- request: The single request message for the RPC.
- response_consumer: A stream.Consumer to be called to accept the response
- messages of the RPC.
- context: A face.ServicerContext object.
- control: A test_control.Control to control execution of this method.
-
- Raises:
- abandonment.Abandoned: May or may not be raised when the RPC has been
- aborted.
- """
- raise NotImplementedError()
-
-
-class UnaryStreamTestMessages(six.with_metaclass(abc.ABCMeta)):
- """A type for unary-request-stream-response message pairings."""
-
- @abc.abstractmethod
- def request(self):
- """Affords a request message.
-
- Implementations of this method should return a different message with each
- call so that multiple test executions of the test method may be made with
- different inputs.
-
- Returns:
- A request message.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def verify(self, request, responses, test_case):
- """Verifies that the computed responses match the given request.
-
- Args:
- request: A request message.
- responses: A sequence of response messages.
- test_case: A unittest.TestCase object affording useful assertion methods.
-
- Raises:
- AssertionError: If the request and responses do not match, indicating that
- there was some problem executing the RPC under test.
- """
- raise NotImplementedError()
-
-
-class StreamUnaryTestMethodImplementation(
- six.with_metaclass(abc.ABCMeta, test_interfaces.Method)):
- """A controllable implementation of a stream-unary method."""
-
- @abc.abstractmethod
- def service(self, response_callback, context, control):
- """Services an RPC that takes a stream of messages and produces one message.
-
- Args:
- response_callback: A callback to be called to accept the response message
- of the RPC.
- context: A face.ServicerContext object.
- control: A test_control.Control to control execution of this method.
-
- Returns:
- A stream.Consumer with which to accept the request messages of the RPC.
- The consumer returned from this method may or may not be invoked to
- completion: in the case of RPC abortion, RPC Framework will simply stop
- passing messages to this object. Implementations must not assume that
- this object will be called to completion of the request stream or even
- called at all.
-
- Raises:
- abandonment.Abandoned: May or may not be raised when the RPC has been
- aborted.
- """
- raise NotImplementedError()
-
-
-class StreamUnaryTestMessages(six.with_metaclass(abc.ABCMeta)):
- """A type for stream-request-unary-response message pairings."""
-
- @abc.abstractmethod
- def requests(self):
- """Affords a sequence of request messages.
-
- Implementations of this method should return a different sequences with each
- call so that multiple test executions of the test method may be made with
- different inputs.
-
- Returns:
- A sequence of request messages.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def verify(self, requests, response, test_case):
- """Verifies that the computed response matches the given requests.
-
- Args:
- requests: A sequence of request messages.
- response: A response message.
- test_case: A unittest.TestCase object affording useful assertion methods.
-
- Raises:
- AssertionError: If the requests and response do not match, indicating that
- there was some problem executing the RPC under test.
- """
- raise NotImplementedError()
-
-
-class StreamStreamTestMethodImplementation(
- six.with_metaclass(abc.ABCMeta, test_interfaces.Method)):
- """A controllable implementation of a stream-stream method."""
-
- @abc.abstractmethod
- def service(self, response_consumer, context, control):
- """Services an RPC that accepts and produces streams of messages.
-
- Args:
- response_consumer: A stream.Consumer to be called to accept the response
- messages of the RPC.
- context: A face.ServicerContext object.
- control: A test_control.Control to control execution of this method.
-
- Returns:
- A stream.Consumer with which to accept the request messages of the RPC.
- The consumer returned from this method may or may not be invoked to
- completion: in the case of RPC abortion, RPC Framework will simply stop
- passing messages to this object. Implementations must not assume that
- this object will be called to completion of the request stream or even
- called at all.
-
- Raises:
- abandonment.Abandoned: May or may not be raised when the RPC has been
- aborted.
- """
- raise NotImplementedError()
-
-
-class StreamStreamTestMessages(six.with_metaclass(abc.ABCMeta)):
- """A type for stream-request-stream-response message pairings."""
-
- @abc.abstractmethod
- def requests(self):
- """Affords a sequence of request messages.
-
- Implementations of this method should return a different sequences with each
- call so that multiple test executions of the test method may be made with
- different inputs.
-
- Returns:
- A sequence of request messages.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def verify(self, requests, responses, test_case):
- """Verifies that the computed response matches the given requests.
-
- Args:
- requests: A sequence of request messages.
- responses: A sequence of response messages.
- test_case: A unittest.TestCase object affording useful assertion methods.
-
- Raises:
- AssertionError: If the requests and responses do not match, indicating
- that there was some problem executing the RPC under test.
- """
- raise NotImplementedError()
-
-
-class TestService(six.with_metaclass(abc.ABCMeta)):
- """A specification of implemented methods to use in tests."""
-
- @abc.abstractmethod
- def unary_unary_scenarios(self):
- """Affords unary-request-unary-response test methods and their messages.
-
- Returns:
- A dict from method group-name pair to implementation/messages pair. The
- first element of the pair is a UnaryUnaryTestMethodImplementation object
- and the second element is a sequence of UnaryUnaryTestMethodMessages
- objects.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def unary_stream_scenarios(self):
- """Affords unary-request-stream-response test methods and their messages.
-
- Returns:
- A dict from method group-name pair to implementation/messages pair. The
- first element of the pair is a UnaryStreamTestMethodImplementation
- object and the second element is a sequence of
- UnaryStreamTestMethodMessages objects.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def stream_unary_scenarios(self):
- """Affords stream-request-unary-response test methods and their messages.
-
- Returns:
- A dict from method group-name pair to implementation/messages pair. The
- first element of the pair is a StreamUnaryTestMethodImplementation
- object and the second element is a sequence of
- StreamUnaryTestMethodMessages objects.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def stream_stream_scenarios(self):
- """Affords stream-request-stream-response test methods and their messages.
-
- Returns:
- A dict from method group-name pair to implementation/messages pair. The
- first element of the pair is a StreamStreamTestMethodImplementation
- object and the second element is a sequence of
- StreamStreamTestMethodMessages objects.
- """
- raise NotImplementedError()
diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_stock_service.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_stock_service.py
deleted file mode 100644
index a84e02a79a..0000000000
--- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_stock_service.py
+++ /dev/null
@@ -1,390 +0,0 @@
-# Copyright 2015 gRPC authors.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""Examples of Python implementations of the stock.proto Stock service."""
-
-from grpc.framework.common import cardinality
-from grpc.framework.foundation import abandonment
-from grpc.framework.foundation import stream
-from tests.unit.framework.common import test_constants
-from tests.unit.framework.interfaces.face import _service
-from tests.unit._junkdrawer import stock_pb2
-
-_STOCK_GROUP_NAME = 'Stock'
-_SYMBOL_FORMAT = 'test symbol:%03d'
-
-# A test-appropriate security-pricing function. :-P
-_price = lambda symbol_name: float(hash(symbol_name) % 4096)
-
-
-def _get_last_trade_price(stock_request, stock_reply_callback, control, active):
- """A unary-request, unary-response test method."""
- control.control()
- if active():
- stock_reply_callback(
- stock_pb2.StockReply(
- symbol=stock_request.symbol,
- price=_price(stock_request.symbol)))
- else:
- raise abandonment.Abandoned()
-
-
-def _get_last_trade_price_multiple(stock_reply_consumer, control, active):
- """A stream-request, stream-response test method."""
-
- def stock_reply_for_stock_request(stock_request):
- control.control()
- if active():
- return stock_pb2.StockReply(
- symbol=stock_request.symbol, price=_price(stock_request.symbol))
- else:
- raise abandonment.Abandoned()
-
- class StockRequestConsumer(stream.Consumer):
-
- def consume(self, stock_request):
- stock_reply_consumer.consume(
- stock_reply_for_stock_request(stock_request))
-
- def terminate(self):
- control.control()
- stock_reply_consumer.terminate()
-
- def consume_and_terminate(self, stock_request):
- stock_reply_consumer.consume_and_terminate(
- stock_reply_for_stock_request(stock_request))
-
- return StockRequestConsumer()
-
-
-def _watch_future_trades(stock_request, stock_reply_consumer, control, active):
- """A unary-request, stream-response test method."""
- base_price = _price(stock_request.symbol)
- for index in range(stock_request.num_trades_to_watch):
- control.control()
- if active():
- stock_reply_consumer.consume(
- stock_pb2.StockReply(
- symbol=stock_request.symbol, price=base_price + index))
- else:
- raise abandonment.Abandoned()
- stock_reply_consumer.terminate()
-
-
-def _get_highest_trade_price(stock_reply_callback, control, active):
- """A stream-request, unary-response test method."""
-
- class StockRequestConsumer(stream.Consumer):
- """Keeps an ongoing record of the most valuable symbol yet consumed."""
-
- def __init__(self):
- self._symbol = None
- self._price = None
-
- def consume(self, stock_request):
- control.control()
- if active():
- if self._price is None:
- self._symbol = stock_request.symbol
- self._price = _price(stock_request.symbol)
- else:
- candidate_price = _price(stock_request.symbol)
- if self._price < candidate_price:
- self._symbol = stock_request.symbol
- self._price = candidate_price
-
- def terminate(self):
- control.control()
- if active():
- if self._symbol is None:
- raise ValueError()
- else:
- stock_reply_callback(
- stock_pb2.StockReply(
- symbol=self._symbol, price=self._price))
- self._symbol = None
- self._price = None
-
- def consume_and_terminate(self, stock_request):
- control.control()
- if active():
- if self._price is None:
- stock_reply_callback(
- stock_pb2.StockReply(
- symbol=stock_request.symbol,
- price=_price(stock_request.symbol)))
- else:
- candidate_price = _price(stock_request.symbol)
- if self._price < candidate_price:
- stock_reply_callback(
- stock_pb2.StockReply(
- symbol=stock_request.symbol,
- price=candidate_price))
- else:
- stock_reply_callback(
- stock_pb2.StockReply(
- symbol=self._symbol, price=self._price))
-
- self._symbol = None
- self._price = None
-
- return StockRequestConsumer()
-
-
-class GetLastTradePrice(_service.UnaryUnaryTestMethodImplementation):
- """GetLastTradePrice for use in tests."""
-
- def group(self):
- return _STOCK_GROUP_NAME
-
- def name(self):
- return 'GetLastTradePrice'
-
- def cardinality(self):
- return cardinality.Cardinality.UNARY_UNARY
-
- def request_class(self):
- return stock_pb2.StockRequest
-
- def response_class(self):
- return stock_pb2.StockReply
-
- def serialize_request(self, request):
- return request.SerializeToString()
-
- def deserialize_request(self, serialized_request):
- return stock_pb2.StockRequest.FromString(serialized_request)
-
- def serialize_response(self, response):
- return response.SerializeToString()
-
- def deserialize_response(self, serialized_response):
- return stock_pb2.StockReply.FromString(serialized_response)
-
- def service(self, request, response_callback, context, control):
- _get_last_trade_price(request, response_callback, control,
- context.is_active)
-
-
-class GetLastTradePriceMessages(_service.UnaryUnaryTestMessages):
-
- def __init__(self):
- self._index = 0
-
- def request(self):
- symbol = _SYMBOL_FORMAT % self._index
- self._index += 1
- return stock_pb2.StockRequest(symbol=symbol)
-
- def verify(self, request, response, test_case):
- test_case.assertEqual(request.symbol, response.symbol)
- test_case.assertEqual(_price(request.symbol), response.price)
-
-
-class GetLastTradePriceMultiple(_service.StreamStreamTestMethodImplementation):
- """GetLastTradePriceMultiple for use in tests."""
-
- def group(self):
- return _STOCK_GROUP_NAME
-
- def name(self):
- return 'GetLastTradePriceMultiple'
-
- def cardinality(self):
- return cardinality.Cardinality.STREAM_STREAM
-
- def request_class(self):
- return stock_pb2.StockRequest
-
- def response_class(self):
- return stock_pb2.StockReply
-
- def serialize_request(self, request):
- return request.SerializeToString()
-
- def deserialize_request(self, serialized_request):
- return stock_pb2.StockRequest.FromString(serialized_request)
-
- def serialize_response(self, response):
- return response.SerializeToString()
-
- def deserialize_response(self, serialized_response):
- return stock_pb2.StockReply.FromString(serialized_response)
-
- def service(self, response_consumer, context, control):
- return _get_last_trade_price_multiple(response_consumer, control,
- context.is_active)
-
-
-class GetLastTradePriceMultipleMessages(_service.StreamStreamTestMessages):
- """Pairs of message streams for use with GetLastTradePriceMultiple."""
-
- def __init__(self):
- self._index = 0
-
- def requests(self):
- base_index = self._index
- self._index += 1
- return [
- stock_pb2.StockRequest(symbol=_SYMBOL_FORMAT % (base_index + index))
- for index in range(test_constants.STREAM_LENGTH)
- ]
-
- def verify(self, requests, responses, test_case):
- test_case.assertEqual(len(requests), len(responses))
- for stock_request, stock_reply in zip(requests, responses):
- test_case.assertEqual(stock_request.symbol, stock_reply.symbol)
- test_case.assertEqual(
- _price(stock_request.symbol), stock_reply.price)
-
-
-class WatchFutureTrades(_service.UnaryStreamTestMethodImplementation):
- """WatchFutureTrades for use in tests."""
-
- def group(self):
- return _STOCK_GROUP_NAME
-
- def name(self):
- return 'WatchFutureTrades'
-
- def cardinality(self):
- return cardinality.Cardinality.UNARY_STREAM
-
- def request_class(self):
- return stock_pb2.StockRequest
-
- def response_class(self):
- return stock_pb2.StockReply
-
- def serialize_request(self, request):
- return request.SerializeToString()
-
- def deserialize_request(self, serialized_request):
- return stock_pb2.StockRequest.FromString(serialized_request)
-
- def serialize_response(self, response):
- return response.SerializeToString()
-
- def deserialize_response(self, serialized_response):
- return stock_pb2.StockReply.FromString(serialized_response)
-
- def service(self, request, response_consumer, context, control):
- _watch_future_trades(request, response_consumer, control,
- context.is_active)
-
-
-class WatchFutureTradesMessages(_service.UnaryStreamTestMessages):
- """Pairs of a single request message and a sequence of response messages."""
-
- def __init__(self):
- self._index = 0
-
- def request(self):
- symbol = _SYMBOL_FORMAT % self._index
- self._index += 1
- return stock_pb2.StockRequest(
- symbol=symbol, num_trades_to_watch=test_constants.STREAM_LENGTH)
-
- def verify(self, request, responses, test_case):
- test_case.assertEqual(test_constants.STREAM_LENGTH, len(responses))
- base_price = _price(request.symbol)
- for index, response in enumerate(responses):
- test_case.assertEqual(base_price + index, response.price)
-
-
-class GetHighestTradePrice(_service.StreamUnaryTestMethodImplementation):
- """GetHighestTradePrice for use in tests."""
-
- def group(self):
- return _STOCK_GROUP_NAME
-
- def name(self):
- return 'GetHighestTradePrice'
-
- def cardinality(self):
- return cardinality.Cardinality.STREAM_UNARY
-
- def request_class(self):
- return stock_pb2.StockRequest
-
- def response_class(self):
- return stock_pb2.StockReply
-
- def serialize_request(self, request):
- return request.SerializeToString()
-
- def deserialize_request(self, serialized_request):
- return stock_pb2.StockRequest.FromString(serialized_request)
-
- def serialize_response(self, response):
- return response.SerializeToString()
-
- def deserialize_response(self, serialized_response):
- return stock_pb2.StockReply.FromString(serialized_response)
-
- def service(self, response_callback, context, control):
- return _get_highest_trade_price(response_callback, control,
- context.is_active)
-
-
-class GetHighestTradePriceMessages(_service.StreamUnaryTestMessages):
-
- def requests(self):
- return [
- stock_pb2.StockRequest(symbol=_SYMBOL_FORMAT % index)
- for index in range(test_constants.STREAM_LENGTH)
- ]
-
- def verify(self, requests, response, test_case):
- price = None
- symbol = None
- for stock_request in requests:
- current_symbol = stock_request.symbol
- current_price = _price(current_symbol)
- if price is None or price < current_price:
- price = current_price
- symbol = current_symbol
- test_case.assertEqual(price, response.price)
- test_case.assertEqual(symbol, response.symbol)
-
-
-class StockTestService(_service.TestService):
- """A corpus of test data with one method of each RPC cardinality."""
-
- def unary_unary_scenarios(self):
- return {
- (_STOCK_GROUP_NAME, 'GetLastTradePrice'):
- (GetLastTradePrice(), [GetLastTradePriceMessages()]),
- }
-
- def unary_stream_scenarios(self):
- return {
- (_STOCK_GROUP_NAME, 'WatchFutureTrades'):
- (WatchFutureTrades(), [WatchFutureTradesMessages()]),
- }
-
- def stream_unary_scenarios(self):
- return {
- (_STOCK_GROUP_NAME, 'GetHighestTradePrice'):
- (GetHighestTradePrice(), [GetHighestTradePriceMessages()])
- }
-
- def stream_stream_scenarios(self):
- return {
- (_STOCK_GROUP_NAME, 'GetLastTradePriceMultiple'):
- (GetLastTradePriceMultiple(),
- [GetLastTradePriceMultipleMessages()]),
- }
-
-
-STOCK_TEST_SERVICE = StockTestService()
diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_cases.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_cases.py
deleted file mode 100644
index cff4b7cdea..0000000000
--- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_cases.py
+++ /dev/null
@@ -1,53 +0,0 @@
-# Copyright 2015 gRPC authors.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""Tools for creating tests of implementations of the Face layer."""
-
-# unittest is referenced from specification in this module.
-import unittest # pylint: disable=unused-import
-
-# test_interfaces is referenced from specification in this module.
-from tests.unit.framework.interfaces.face import _blocking_invocation_inline_service
-from tests.unit.framework.interfaces.face import _future_invocation_asynchronous_event_service
-from tests.unit.framework.interfaces.face import _invocation
-from tests.unit.framework.interfaces.face import test_interfaces # pylint: disable=unused-import
-
-_TEST_CASE_SUPERCLASSES = (
- _blocking_invocation_inline_service.TestCase,
- _future_invocation_asynchronous_event_service.TestCase,
-)
-
-
-def test_cases(implementation):
- """Creates unittest.TestCase classes for a given Face layer implementation.
-
- Args:
- implementation: A test_interfaces.Implementation specifying creation and
- destruction of a given Face layer implementation.
-
- Returns:
- A sequence of subclasses of unittest.TestCase defining tests of the
- specified Face layer implementation.
- """
- test_case_classes = []
- for invoker_constructor in _invocation.invoker_constructors():
- for super_class in _TEST_CASE_SUPERCLASSES:
- test_case_classes.append(
- type(
- invoker_constructor.name() + super_class.NAME,
- (super_class,), {
- 'implementation': implementation,
- 'invoker_constructor': invoker_constructor,
- '__module__': implementation.__module__,
- }))
- return test_case_classes
diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_interfaces.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_interfaces.py
deleted file mode 100644
index d0de8e1c54..0000000000
--- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_interfaces.py
+++ /dev/null
@@ -1,212 +0,0 @@
-# Copyright 2015 gRPC authors.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""Interfaces used in tests of implementations of the Face layer."""
-
-import abc
-
-import six
-
-from grpc.framework.common import cardinality # pylint: disable=unused-import
-from grpc.framework.interfaces.face import face # pylint: disable=unused-import
-
-
-class Method(six.with_metaclass(abc.ABCMeta)):
- """Specifies a method to be used in tests."""
-
- @abc.abstractmethod
- def group(self):
- """Identify the group of the method.
-
- Returns:
- The group of the method.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def name(self):
- """Identify the name of the method.
-
- Returns:
- The name of the method.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def cardinality(self):
- """Identify the cardinality of the method.
-
- Returns:
- A cardinality.Cardinality value describing the streaming semantics of the
- method.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def request_class(self):
- """Identify the class used for the method's request objects.
-
- Returns:
- The class object of the class to which the method's request objects
- belong.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def response_class(self):
- """Identify the class used for the method's response objects.
-
- Returns:
- The class object of the class to which the method's response objects
- belong.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def serialize_request(self, request):
- """Serialize the given request object.
-
- Args:
- request: A request object appropriate for this method.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def deserialize_request(self, serialized_request):
- """Synthesize a request object from a given bytestring.
-
- Args:
- serialized_request: A bytestring deserializable into a request object
- appropriate for this method.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def serialize_response(self, response):
- """Serialize the given response object.
-
- Args:
- response: A response object appropriate for this method.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def deserialize_response(self, serialized_response):
- """Synthesize a response object from a given bytestring.
-
- Args:
- serialized_response: A bytestring deserializable into a response object
- appropriate for this method.
- """
- raise NotImplementedError()
-
-
-class Implementation(six.with_metaclass(abc.ABCMeta)):
- """Specifies an implementation of the Face layer."""
-
- @abc.abstractmethod
- def instantiate(self, methods, method_implementations,
- multi_method_implementation):
- """Instantiates the Face layer implementation to be used in a test.
-
- Args:
- methods: A sequence of Method objects describing the methods available to
- be called during the test.
- method_implementations: A dictionary from group-name pair to
- face.MethodImplementation object specifying implementation of a method.
- multi_method_implementation: A face.MultiMethodImplementation or None.
-
- Returns:
- A sequence of length three the first element of which is a
- face.GenericStub, the second element of which is dictionary from groups
- to face.DynamicStubs affording invocation of the group's methods, and
- the third element of which is an arbitrary memo object to be kept and
- passed to destantiate at the conclusion of the test. The returned stubs
- must be backed by the provided implementations.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def destantiate(self, memo):
- """Destroys the Face layer implementation under test.
-
- Args:
- memo: The object from the third position of the return value of a call to
- instantiate.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def invocation_metadata(self):
- """Provides the metadata to be used when invoking a test RPC.
-
- Returns:
- An object to use as the supplied-at-invocation-time metadata in a test
- RPC.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def initial_metadata(self):
- """Provides the metadata for use as a test RPC's first servicer metadata.
-
- Returns:
- An object to use as the from-the-servicer-before-responses metadata in a
- test RPC.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def terminal_metadata(self):
- """Provides the metadata for use as a test RPC's second servicer metadata.
-
- Returns:
- An object to use as the from-the-servicer-after-all-responses metadata in
- a test RPC.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def code(self):
- """Provides the value for use as a test RPC's code.
-
- Returns:
- An object to use as the from-the-servicer code in a test RPC.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def details(self):
- """Provides the value for use as a test RPC's details.
-
- Returns:
- An object to use as the from-the-servicer details in a test RPC.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def metadata_transmitted(self, original_metadata, transmitted_metadata):
- """Identifies whether or not metadata was properly transmitted.
-
- Args:
- original_metadata: A metadata value passed to the Face interface
- implementation under test.
- transmitted_metadata: The same metadata value after having been
- transmitted via an RPC performed by the Face interface implementation
- under test.
-
- Returns:
- Whether or not the metadata was properly transmitted by the Face interface
- implementation under test.
- """
- raise NotImplementedError()