aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/grpcio')
-rw-r--r--src/python/grpcio/commands.py2
-rw-r--r--src/python/grpcio/grpc/__init__.py111
-rw-r--r--src/python/grpcio/grpc/_adapter/_types.py2
-rw-r--r--src/python/grpcio/grpc/_channel.py39
-rw-r--r--src/python/grpcio/grpc/_common.py4
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi2
-rw-r--r--src/python/grpcio/grpc/_cython/imports.generated.c4
-rw-r--r--src/python/grpcio/grpc/_cython/imports.generated.h6
-rw-r--r--src/python/grpcio/grpc/beta/_client_adaptations.py8
-rw-r--r--src/python/grpcio/grpc/beta/interfaces.py3
-rw-r--r--src/python/grpcio/grpc_core_dependencies.py4
-rw-r--r--src/python/grpcio/tests/qps/benchmark_client.py9
-rw-r--r--src/python/grpcio/tests/tests.json35
-rw-r--r--src/python/grpcio/tests/unit/_adapter/_intermediary_low_test.py429
-rw-r--r--src/python/grpcio/tests/unit/_adapter/_low_test.py319
-rw-r--r--src/python/grpcio/tests/unit/_api_test.py111
-rw-r--r--src/python/grpcio/tests/unit/_channel_connectivity_test.py4
-rw-r--r--src/python/grpcio/tests/unit/_core_over_links_base_interface_test.py157
-rw-r--r--src/python/grpcio/tests/unit/_crust_over_core_over_links_face_interface_test.py163
-rw-r--r--src/python/grpcio/tests/unit/_from_grpc_import_star.py38
-rw-r--r--src/python/grpcio/tests/unit/_links/_lonely_invocation_link_test.py88
-rw-r--r--src/python/grpcio/tests/unit/_links/_transmission_test.py239
-rw-r--r--src/python/grpcio/tests/unit/_metadata_test.py216
-rw-r--r--src/python/grpcio/tests/unit/_rpc_test.py21
-rw-r--r--src/python/grpcio/tests/unit/beta/_connectivity_channel_test.py10
-rw-r--r--src/python/grpcio/tests/unit/beta/test_utilities.py2
-rw-r--r--src/python/grpcio/tests/unit/framework/_crust_over_core_face_interface_test.py113
-rw-r--r--src/python/grpcio/tests/unit/framework/core/_base_interface_test.py96
-rw-r--r--src/python/grpcio/tests/unit/framework/foundation/_later_test.py151
29 files changed, 553 insertions, 1833 deletions
diff --git a/src/python/grpcio/commands.py b/src/python/grpcio/commands.py
index 3e974eba0a..f498ed4190 100644
--- a/src/python/grpcio/commands.py
+++ b/src/python/grpcio/commands.py
@@ -182,6 +182,8 @@ class BuildProtoModules(setuptools.Command):
'--plugin=protoc-gen-python-grpc={}'.format(
self.grpc_python_plugin_command),
'-I {}'.format(GRPC_STEM),
+ '-I .',
+ '-I {}/third_party/protobuf/src'.format(GRPC_STEM),
'--python_out={}'.format(PROTO_GEN_STEM),
'--python-grpc_out={}'.format(PROTO_GEN_STEM),
] + [path]
diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py
index 2fbb9f45e6..b3eeaad1f7 100644
--- a/src/python/grpcio/grpc/__init__.py
+++ b/src/python/grpcio/grpc/__init__.py
@@ -29,8 +29,6 @@
"""gRPC's Python API."""
-__import__('pkg_resources').declare_namespace(__name__)
-
import abc
import enum
@@ -212,14 +210,14 @@ class ChannelConnectivity(enum.Enum):
READY: The channel is ready to conduct RPCs.
TRANSIENT_FAILURE: The channel has seen a failure from which it expects to
recover.
- FATAL_FAILURE: The channel has seen a failure from which it cannot recover.
+ SHUTDOWN: The channel has seen a failure from which it cannot recover.
"""
IDLE = (_cygrpc.ConnectivityState.idle, 'idle')
CONNECTING = (_cygrpc.ConnectivityState.connecting, 'connecting')
READY = (_cygrpc.ConnectivityState.ready, 'ready')
TRANSIENT_FAILURE = (
_cygrpc.ConnectivityState.transient_failure, 'transient failure')
- FATAL_FAILURE = (_cygrpc.ConnectivityState.fatal_failure, 'fatal failure')
+ SHUTDOWN = (_cygrpc.ConnectivityState.shutdown, 'shutdown')
@enum.unique
@@ -438,9 +436,7 @@ class UnaryUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
"""Affords invoking a unary-unary RPC."""
@abc.abstractmethod
- def __call__(
- self, request, timeout=None, metadata=None, credentials=None,
- with_call=False):
+ def __call__(self, request, timeout=None, metadata=None, credentials=None):
"""Synchronously invokes the underlying RPC.
Args:
@@ -449,12 +445,30 @@ class UnaryUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
metadata: An optional sequence of pairs of bytes to be transmitted to the
service-side of the RPC.
credentials: An optional CallCredentials for the RPC.
- with_call: Whether or not to include return a Call for the RPC in addition
- to the response.
Returns:
- The response value for the RPC, and a Call for the RPC if with_call was
- set to True at invocation.
+ The response value for the RPC.
+
+ Raises:
+ RpcError: Indicating that the RPC terminated with non-OK status. The
+ raised RpcError will also be a Call for the RPC affording the RPC's
+ metadata, status code, and details.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def with_call(self, request, timeout=None, metadata=None, credentials=None):
+ """Synchronously invokes the underlying RPC.
+
+ Args:
+ request: The request value for the RPC.
+ timeout: An optional durating of time in seconds to allow for the RPC.
+ metadata: An optional sequence of pairs of bytes to be transmitted to the
+ service-side of the RPC.
+ credentials: An optional CallCredentials for the RPC.
+
+ Returns:
+ The response value for the RPC and a Call value for the RPC.
Raises:
RpcError: Indicating that the RPC terminated with non-OK status. The
@@ -510,8 +524,7 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
@abc.abstractmethod
def __call__(
- self, request_iterator, timeout=None, metadata=None, credentials=None,
- with_call=False):
+ self, request_iterator, timeout=None, metadata=None, credentials=None):
"""Synchronously invokes the underlying RPC.
Args:
@@ -520,8 +533,6 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
metadata: An optional sequence of pairs of bytes to be transmitted to the
service-side of the RPC.
credentials: An optional CallCredentials for the RPC.
- with_call: Whether or not to include return a Call for the RPC in addition
- to the response.
Returns:
The response value for the RPC, and a Call for the RPC if with_call was
@@ -535,6 +546,28 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
raise NotImplementedError()
@abc.abstractmethod
+ def with_call(
+ self, request_iterator, timeout=None, metadata=None, credentials=None):
+ """Synchronously invokes the underlying RPC.
+
+ Args:
+ request_iterator: An iterator that yields request values for the RPC.
+ timeout: An optional duration of time in seconds to allow for the RPC.
+ metadata: An optional sequence of pairs of bytes to be transmitted to the
+ service-side of the RPC.
+ credentials: An optional CallCredentials for the RPC.
+
+ Returns:
+ The response value for the RPC and a Call for the RPC.
+
+ Raises:
+ RpcError: Indicating that the RPC terminated with non-OK status. The
+ raised RpcError will also be a Call for the RPC affording the RPC's
+ metadata, status code, and details.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
def future(
self, request_iterator, timeout=None, metadata=None, credentials=None):
"""Asynchronously invokes the underlying RPC.
@@ -1171,7 +1204,7 @@ def secure_channel(target, credentials, options=None):
A Channel to the target through which RPCs may be conducted.
"""
from grpc import _channel
- return _channel.Channel(target, options, credentials)
+ return _channel.Channel(target, options, credentials._credentials)
def server(generic_rpc_handlers, thread_pool, options=None):
@@ -1193,3 +1226,49 @@ def server(generic_rpc_handlers, thread_pool, options=None):
"""
from grpc import _server
return _server.Server(generic_rpc_handlers, thread_pool)
+
+
+################################### __all__ #################################
+
+
+__all__ = (
+ 'FutureTimeoutError',
+ 'FutureCancelledError',
+ 'Future',
+ 'ChannelConnectivity',
+ 'StatusCode',
+ 'RpcError',
+ 'RpcContext',
+ 'Call',
+ 'ChannelCredentials',
+ 'CallCredentials',
+ 'AuthMetadataContext',
+ 'AuthMetadataPluginCallback',
+ 'AuthMetadataPlugin',
+ 'ServerCredentials',
+ 'UnaryUnaryMultiCallable',
+ 'UnaryStreamMultiCallable',
+ 'StreamUnaryMultiCallable',
+ 'StreamStreamMultiCallable',
+ 'Channel',
+ 'ServicerContext',
+ 'RpcMethodHandler',
+ 'HandlerCallDetails',
+ 'GenericRpcHandler',
+ 'Server',
+ 'unary_unary_rpc_method_handler',
+ 'unary_stream_rpc_method_handler',
+ 'stream_unary_rpc_method_handler',
+ 'stream_stream_rpc_method_handler',
+ 'method_handlers_generic_handler',
+ 'ssl_channel_credentials',
+ 'metadata_call_credentials',
+ 'access_token_call_credentials',
+ 'composite_call_credentials',
+ 'composite_channel_credentials',
+ 'ssl_server_credentials',
+ 'channel_ready_future',
+ 'insecure_channel',
+ 'secure_channel',
+ 'server',
+)
diff --git a/src/python/grpcio/grpc/_adapter/_types.py b/src/python/grpcio/grpc/_adapter/_types.py
index f8405949d4..b7cc6fbbb5 100644
--- a/src/python/grpcio/grpc/_adapter/_types.py
+++ b/src/python/grpcio/grpc/_adapter/_types.py
@@ -114,7 +114,7 @@ class ConnectivityState(enum.IntEnum):
CONNECTING = cygrpc.ConnectivityState.connecting
READY = cygrpc.ConnectivityState.ready
TRANSIENT_FAILURE = cygrpc.ConnectivityState.transient_failure
- FATAL_FAILURE = cygrpc.ConnectivityState.fatal_failure
+ FATAL_FAILURE = cygrpc.ConnectivityState.shutdown
class Status(collections.namedtuple(
diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py
index d9315d2e6c..1f34beeb2c 100644
--- a/src/python/grpcio/grpc/_channel.py
+++ b/src/python/grpcio/grpc/_channel.py
@@ -449,9 +449,7 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
)
return state, operations, deadline, deadline_timespec, None
- def __call__(
- self, request, timeout=None, metadata=None, credentials=None,
- with_call=False):
+ def _blocking(self, request, timeout, metadata, credentials):
state, operations, deadline, deadline_timespec, rendezvous = self._prepare(
request, timeout, metadata)
if rendezvous:
@@ -464,7 +462,15 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
call.set_credentials(credentials._credentials)
call.start_batch(cygrpc.Operations(operations), None)
_handle_event(completion_queue.poll(), state, self._response_deserializer)
- return _end_unary_response_blocking(state, with_call, deadline)
+ return state, deadline
+
+ def __call__(self, request, timeout=None, metadata=None, credentials=None):
+ state, deadline, = self._blocking(request, timeout, metadata, credentials)
+ return _end_unary_response_blocking(state, False, deadline)
+
+ def with_call(self, request, timeout=None, metadata=None, credentials=None):
+ state, deadline, = self._blocking(request, timeout, metadata, credentials)
+ return _end_unary_response_blocking(state, True, deadline)
def future(self, request, timeout=None, metadata=None, credentials=None):
state, operations, deadline, deadline_timespec, rendezvous = self._prepare(
@@ -532,9 +538,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
self._request_serializer = request_serializer
self._response_deserializer = response_deserializer
- def __call__(
- self, request_iterator, timeout=None, metadata=None, credentials=None,
- with_call=False):
+ def _blocking(self, request_iterator, timeout, metadata, credentials):
deadline, deadline_timespec = _deadline(timeout)
state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
completion_queue = cygrpc.CompletionQueue()
@@ -563,7 +567,19 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
state.condition.notify_all()
if not state.due:
break
- return _end_unary_response_blocking(state, with_call, deadline)
+ return state, deadline
+
+ def __call__(
+ self, request_iterator, timeout=None, metadata=None, credentials=None):
+ state, deadline, = self._blocking(
+ request_iterator, timeout, metadata, credentials)
+ return _end_unary_response_blocking(state, False, deadline)
+
+ def with_call(
+ self, request_iterator, timeout=None, metadata=None, credentials=None):
+ state, deadline, = self._blocking(
+ request_iterator, timeout, metadata, credentials)
+ return _end_unary_response_blocking(state, True, deadline)
def future(
self, request_iterator, timeout=None, metadata=None, credentials=None):
@@ -814,6 +830,13 @@ def _options(options):
class Channel(grpc.Channel):
def __init__(self, target, options, credentials):
+ """Constructor.
+
+ Args:
+ target: The target to which to connect.
+ options: Configuration options for the channel.
+ credentials: A cygrpc.ChannelCredentials or None.
+ """
self._channel = cygrpc.Channel(target, _options(options), credentials)
self._call_state = _ChannelCallState(self._channel)
self._connectivity_state = _ChannelConnectivityState(self._channel)
diff --git a/src/python/grpcio/grpc/_common.py b/src/python/grpcio/grpc/_common.py
index 1fd1704f18..f351bea9e3 100644
--- a/src/python/grpcio/grpc/_common.py
+++ b/src/python/grpcio/grpc/_common.py
@@ -46,8 +46,8 @@ CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY = {
cygrpc.ConnectivityState.ready: grpc.ChannelConnectivity.READY,
cygrpc.ConnectivityState.transient_failure:
grpc.ChannelConnectivity.TRANSIENT_FAILURE,
- cygrpc.ConnectivityState.fatal_failure:
- grpc.ChannelConnectivity.FATAL_FAILURE,
+ cygrpc.ConnectivityState.shutdown:
+ grpc.ChannelConnectivity.SHUTDOWN,
}
CYGRPC_STATUS_CODE_TO_STATUS_CODE = {
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
index 2e52953c0a..0055d0d3a2 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
@@ -33,7 +33,7 @@ class ConnectivityState:
connecting = GRPC_CHANNEL_CONNECTING
ready = GRPC_CHANNEL_READY
transient_failure = GRPC_CHANNEL_TRANSIENT_FAILURE
- fatal_failure = GRPC_CHANNEL_SHUTDOWN
+ shutdown = GRPC_CHANNEL_SHUTDOWN
class ChannelArgKey:
diff --git a/src/python/grpcio/grpc/_cython/imports.generated.c b/src/python/grpcio/grpc/_cython/imports.generated.c
index 5c49f6cf3e..8437e74ba0 100644
--- a/src/python/grpcio/grpc/_cython/imports.generated.c
+++ b/src/python/grpcio/grpc/_cython/imports.generated.c
@@ -260,6 +260,8 @@ gpr_avl_unref_type gpr_avl_unref_import;
gpr_avl_add_type gpr_avl_add_import;
gpr_avl_remove_type gpr_avl_remove_import;
gpr_avl_get_type gpr_avl_get_import;
+gpr_avl_maybe_get_type gpr_avl_maybe_get_import;
+gpr_avl_is_empty_type gpr_avl_is_empty_import;
gpr_cmdline_create_type gpr_cmdline_create_import;
gpr_cmdline_add_int_type gpr_cmdline_add_int_import;
gpr_cmdline_add_flag_type gpr_cmdline_add_flag_import;
@@ -533,6 +535,8 @@ void pygrpc_load_imports(HMODULE library) {
gpr_avl_add_import = (gpr_avl_add_type) GetProcAddress(library, "gpr_avl_add");
gpr_avl_remove_import = (gpr_avl_remove_type) GetProcAddress(library, "gpr_avl_remove");
gpr_avl_get_import = (gpr_avl_get_type) GetProcAddress(library, "gpr_avl_get");
+ gpr_avl_maybe_get_import = (gpr_avl_maybe_get_type) GetProcAddress(library, "gpr_avl_maybe_get");
+ gpr_avl_is_empty_import = (gpr_avl_is_empty_type) GetProcAddress(library, "gpr_avl_is_empty");
gpr_cmdline_create_import = (gpr_cmdline_create_type) GetProcAddress(library, "gpr_cmdline_create");
gpr_cmdline_add_int_import = (gpr_cmdline_add_int_type) GetProcAddress(library, "gpr_cmdline_add_int");
gpr_cmdline_add_flag_import = (gpr_cmdline_add_flag_type) GetProcAddress(library, "gpr_cmdline_add_flag");
diff --git a/src/python/grpcio/grpc/_cython/imports.generated.h b/src/python/grpcio/grpc/_cython/imports.generated.h
index 62865f8a0e..d52e8591b3 100644
--- a/src/python/grpcio/grpc/_cython/imports.generated.h
+++ b/src/python/grpcio/grpc/_cython/imports.generated.h
@@ -731,6 +731,12 @@ extern gpr_avl_remove_type gpr_avl_remove_import;
typedef void *(*gpr_avl_get_type)(gpr_avl avl, void *key);
extern gpr_avl_get_type gpr_avl_get_import;
#define gpr_avl_get gpr_avl_get_import
+typedef int(*gpr_avl_maybe_get_type)(gpr_avl avl, void *key, void **value);
+extern gpr_avl_maybe_get_type gpr_avl_maybe_get_import;
+#define gpr_avl_maybe_get gpr_avl_maybe_get_import
+typedef int(*gpr_avl_is_empty_type)(gpr_avl avl);
+extern gpr_avl_is_empty_type gpr_avl_is_empty_import;
+#define gpr_avl_is_empty gpr_avl_is_empty_import
typedef gpr_cmdline *(*gpr_cmdline_create_type)(const char *description);
extern gpr_cmdline_create_type gpr_cmdline_create_import;
#define gpr_cmdline_create gpr_cmdline_create_import
diff --git a/src/python/grpcio/grpc/beta/_client_adaptations.py b/src/python/grpcio/grpc/beta/_client_adaptations.py
index 024808c540..56456cc117 100644
--- a/src/python/grpcio/grpc/beta/_client_adaptations.py
+++ b/src/python/grpcio/grpc/beta/_client_adaptations.py
@@ -186,9 +186,9 @@ def _blocking_unary_unary(
response_deserializer=response_deserializer)
effective_metadata = _effective_metadata(metadata, metadata_transformer)
if with_call:
- response, call = multi_callable(
+ response, call = multi_callable.with_call(
request, timeout=timeout, metadata=effective_metadata,
- credentials=_credentials(protocol_options), with_call=True)
+ credentials=_credentials(protocol_options))
return response, _Rendezvous(None, None, call)
else:
return multi_callable(
@@ -237,9 +237,9 @@ def _blocking_stream_unary(
response_deserializer=response_deserializer)
effective_metadata = _effective_metadata(metadata, metadata_transformer)
if with_call:
- response, call = multi_callable(
+ response, call = multi_callable.with_call(
request_iterator, timeout=timeout, metadata=effective_metadata,
- credentials=_credentials(protocol_options), with_call=True)
+ credentials=_credentials(protocol_options))
return response, _Rendezvous(None, None, call)
else:
return multi_callable(
diff --git a/src/python/grpcio/grpc/beta/interfaces.py b/src/python/grpcio/grpc/beta/interfaces.py
index 4343b6c4b5..90f6bbbfcc 100644
--- a/src/python/grpcio/grpc/beta/interfaces.py
+++ b/src/python/grpcio/grpc/beta/interfaces.py
@@ -36,6 +36,9 @@ import six
import grpc
ChannelConnectivity = grpc.ChannelConnectivity
+# FATAL_FAILURE was a Beta-API name for SHUTDOWN
+ChannelConnectivity.FATAL_FAILURE = ChannelConnectivity.SHUTDOWN
+
StatusCode = grpc.StatusCode
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index 2a5229434c..839c555f05 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -45,7 +45,6 @@ CORE_SOURCE_FILES = [
'src/core/lib/support/env_windows.c',
'src/core/lib/support/histogram.c',
'src/core/lib/support/host_port.c',
- 'src/core/lib/support/load_file.c',
'src/core/lib/support/log.c',
'src/core/lib/support/log_android.c',
'src/core/lib/support/log_linux.c',
@@ -94,6 +93,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/iomgr/endpoint.c',
'src/core/lib/iomgr/endpoint_pair_posix.c',
'src/core/lib/iomgr/endpoint_pair_windows.c',
+ 'src/core/lib/iomgr/error.c',
'src/core/lib/iomgr/ev_poll_and_epoll_posix.c',
'src/core/lib/iomgr/ev_poll_posix.c',
'src/core/lib/iomgr/ev_posix.c',
@@ -103,6 +103,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/iomgr/iomgr.c',
'src/core/lib/iomgr/iomgr_posix.c',
'src/core/lib/iomgr/iomgr_windows.c',
+ 'src/core/lib/iomgr/load_file.c',
'src/core/lib/iomgr/polling_entity.c',
'src/core/lib/iomgr/pollset_set_windows.c',
'src/core/lib/iomgr/pollset_windows.c',
@@ -205,6 +206,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/security/transport/secure_endpoint.c',
'src/core/lib/security/transport/security_connector.c',
'src/core/lib/security/transport/server_auth_filter.c',
+ 'src/core/lib/security/transport/tsi_error.c',
'src/core/lib/security/util/b64.c',
'src/core/lib/security/util/json_util.c',
'src/core/lib/surface/init_secure.c',
diff --git a/src/python/grpcio/tests/qps/benchmark_client.py b/src/python/grpcio/tests/qps/benchmark_client.py
index 1b100bb168..080281415d 100644
--- a/src/python/grpcio/tests/qps/benchmark_client.py
+++ b/src/python/grpcio/tests/qps/benchmark_client.py
@@ -30,11 +30,13 @@
"""Defines test client behaviors (UNARY/STREAMING) (SYNC/ASYNC)."""
import abc
+import threading
import time
from concurrent import futures
from six.moves import queue
+import grpc
from grpc.beta import implementations
from grpc.framework.interfaces.face import face
from src.proto.grpc.testing import messages_pb2
@@ -62,6 +64,13 @@ class BenchmarkClient:
else:
channel = implementations.insecure_channel(host, port)
+ connected_event = threading.Event()
+ def wait_for_ready(connectivity):
+ if connectivity == grpc.ChannelConnectivity.READY:
+ connected_event.set()
+ channel.subscribe(wait_for_ready, try_to_connect=True)
+ connected_event.wait()
+
if config.payload_config.WhichOneof('payload') == 'simple_params':
self._generic = False
self._stub = services_pb2.beta_create_BenchmarkService_stub(channel)
diff --git a/src/python/grpcio/tests/tests.json b/src/python/grpcio/tests/tests.json
index e64d2d04a7..8e509621a8 100644
--- a/src/python/grpcio/tests/tests.json
+++ b/src/python/grpcio/tests/tests.json
@@ -1,10 +1,9 @@
[
+ "_api_test.AllTest",
+ "_api_test.ChannelConnectivityTest",
+ "_api_test.ChannelTest",
"_auth_test.AccessTokenCallCredentialsTest",
"_auth_test.GoogleCallCredentialsTest",
- "_base_interface_test.AsyncEasyTest",
- "_base_interface_test.AsyncPeasyTest",
- "_base_interface_test.SyncEasyTest",
- "_base_interface_test.SyncPeasyTest",
"_beta_features_test.BetaFeaturesTest",
"_beta_features_test.ContextManagementAndLifecycleTest",
"_cancel_many_calls_test.CancelManyCallsTest",
@@ -12,22 +11,7 @@
"_channel_ready_future_test.ChannelReadyFutureTest",
"_channel_test.ChannelTest",
"_connectivity_channel_test.ChannelConnectivityTest",
- "_core_over_links_base_interface_test.AsyncEasyTest",
- "_core_over_links_base_interface_test.AsyncPeasyTest",
- "_core_over_links_base_interface_test.SyncEasyTest",
- "_core_over_links_base_interface_test.SyncPeasyTest",
- "_crust_over_core_face_interface_test.DynamicInvokerBlockingInvocationInlineServiceTest",
- "_crust_over_core_face_interface_test.DynamicInvokerFutureInvocationAsynchronousEventServiceTest",
- "_crust_over_core_face_interface_test.GenericInvokerBlockingInvocationInlineServiceTest",
- "_crust_over_core_face_interface_test.GenericInvokerFutureInvocationAsynchronousEventServiceTest",
- "_crust_over_core_face_interface_test.MultiCallableInvokerBlockingInvocationInlineServiceTest",
- "_crust_over_core_face_interface_test.MultiCallableInvokerFutureInvocationAsynchronousEventServiceTest",
- "_crust_over_core_over_links_face_interface_test.DynamicInvokerBlockingInvocationInlineServiceTest",
- "_crust_over_core_over_links_face_interface_test.DynamicInvokerFutureInvocationAsynchronousEventServiceTest",
- "_crust_over_core_over_links_face_interface_test.GenericInvokerBlockingInvocationInlineServiceTest",
- "_crust_over_core_over_links_face_interface_test.GenericInvokerFutureInvocationAsynchronousEventServiceTest",
- "_crust_over_core_over_links_face_interface_test.MultiCallableInvokerBlockingInvocationInlineServiceTest",
- "_crust_over_core_over_links_face_interface_test.MultiCallableInvokerFutureInvocationAsynchronousEventServiceTest",
+ "_connectivity_channel_test.ConnectivityStatesTest",
"_empty_message_test.EmptyMessageTest",
"_face_interface_test.DynamicInvokerBlockingInvocationInlineServiceTest",
"_face_interface_test.DynamicInvokerFutureInvocationAsynchronousEventServiceTest",
@@ -39,15 +23,8 @@
"_implementations_test.CallCredentialsTest",
"_implementations_test.ChannelCredentialsTest",
"_insecure_interop_test.InsecureInteropTest",
- "_intermediary_low_test.CancellationTest",
- "_intermediary_low_test.EchoTest",
- "_intermediary_low_test.ExpirationTest",
- "_intermediary_low_test.LonelyClientTest",
- "_later_test.LaterTest",
"_logging_pool_test.LoggingPoolTest",
- "_lonely_invocation_link_test.LonelyInvocationLinkTest",
- "_low_test.HangingServerShutdown",
- "_low_test.InsecureServerInsecureClient",
+ "_metadata_test.MetadataTest",
"_not_found_test.NotFoundTest",
"_python_plugin_test.PythonPluginTest",
"_read_some_but_not_all_responses_test.ReadSomeButNotAllResponsesTest",
@@ -55,8 +32,6 @@
"_sanity_test.Sanity",
"_secure_interop_test.SecureInteropTest",
"_thread_cleanup_test.CleanupThreadTest",
- "_transmission_test.RoundTripTest",
- "_transmission_test.TransmissionTest",
"_utilities_test.ChannelConnectivityTest",
"beta_python_plugin_test.PythonPluginTest",
"cygrpc_test.InsecureServerInsecureClient",
diff --git a/src/python/grpcio/tests/unit/_adapter/_intermediary_low_test.py b/src/python/grpcio/tests/unit/_adapter/_intermediary_low_test.py
deleted file mode 100644
index 09ebdeff33..0000000000
--- a/src/python/grpcio/tests/unit/_adapter/_intermediary_low_test.py
+++ /dev/null
@@ -1,429 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Tests for the old '_low'."""
-
-import threading
-import time
-import unittest
-
-import six
-from six.moves import queue
-
-from grpc._adapter import _intermediary_low as _low
-
-_STREAM_LENGTH = 300
-_TIMEOUT = 5
-_AFTER_DELAY = 2
-_FUTURE = time.time() + 60 * 60 * 24
-_BYTE_SEQUENCE = b'\abcdefghijklmnopqrstuvwxyz0123456789' * 200
-_BYTE_SEQUENCE_SEQUENCE = tuple(
- bytes(bytearray((row + column) % 256 for column in range(row)))
- for row in range(_STREAM_LENGTH))
-
-
-class LonelyClientTest(unittest.TestCase):
-
- def testLonelyClient(self):
- host = 'nosuchhostexists'
- port = 54321
- method = 'test method'
- deadline = time.time() + _TIMEOUT
- after_deadline = deadline + _AFTER_DELAY
- metadata_tag = object()
- finish_tag = object()
-
- completion_queue = _low.CompletionQueue()
- channel = _low.Channel('%s:%d' % (host, port), None)
- client_call = _low.Call(channel, completion_queue, method, host, deadline)
-
- client_call.invoke(completion_queue, metadata_tag, finish_tag)
- first_event = completion_queue.get(after_deadline)
- self.assertIsNotNone(first_event)
- second_event = completion_queue.get(after_deadline)
- self.assertIsNotNone(second_event)
- kinds = [event.kind for event in (first_event, second_event)]
- six.assertCountEqual(self,
- (_low.Event.Kind.METADATA_ACCEPTED, _low.Event.Kind.FINISH),
- kinds)
-
- self.assertIsNone(completion_queue.get(after_deadline))
-
- completion_queue.stop()
- stop_event = completion_queue.get(_FUTURE)
- self.assertEqual(_low.Event.Kind.STOP, stop_event.kind)
-
- del client_call
- del channel
- del completion_queue
-
-
-def _drive_completion_queue(completion_queue, event_queue):
- while True:
- event = completion_queue.get(_FUTURE)
- if event.kind is _low.Event.Kind.STOP:
- break
- event_queue.put(event)
-
-
-class EchoTest(unittest.TestCase):
-
- def setUp(self):
- self.host = 'localhost'
-
- self.server_completion_queue = _low.CompletionQueue()
- self.server = _low.Server(self.server_completion_queue)
- port = self.server.add_http2_addr('[::]:0')
- self.server.start()
- self.server_events = queue.Queue()
- self.server_completion_queue_thread = threading.Thread(
- target=_drive_completion_queue,
- args=(self.server_completion_queue, self.server_events))
- self.server_completion_queue_thread.start()
-
- self.client_completion_queue = _low.CompletionQueue()
- self.channel = _low.Channel('%s:%d' % (self.host, port), None)
- self.client_events = queue.Queue()
- self.client_completion_queue_thread = threading.Thread(
- target=_drive_completion_queue,
- args=(self.client_completion_queue, self.client_events))
- self.client_completion_queue_thread.start()
-
- def tearDown(self):
- self.server.stop()
- self.server.cancel_all_calls()
- self.server_completion_queue.stop()
- self.client_completion_queue.stop()
- self.server_completion_queue_thread.join()
- self.client_completion_queue_thread.join()
- del self.server
-
- def _perform_echo_test(self, test_data):
- method = 'test method'
- details = 'test details'
- server_leading_metadata_key = 'my_server_leading_key'
- server_leading_metadata_value = 'my_server_leading_value'
- server_trailing_metadata_key = 'my_server_trailing_key'
- server_trailing_metadata_value = 'my_server_trailing_value'
- client_metadata_key = 'my_client_key'
- client_metadata_value = 'my_client_value'
- server_leading_binary_metadata_key = 'my_server_leading_key-bin'
- server_leading_binary_metadata_value = b'\0'*2047
- server_trailing_binary_metadata_key = 'my_server_trailing_key-bin'
- server_trailing_binary_metadata_value = b'\0'*2047
- client_binary_metadata_key = 'my_client_key-bin'
- client_binary_metadata_value = b'\0'*2047
- deadline = _FUTURE
- metadata_tag = object()
- finish_tag = object()
- write_tag = object()
- complete_tag = object()
- service_tag = object()
- read_tag = object()
- status_tag = object()
-
- server_data = []
- client_data = []
-
- client_call = _low.Call(self.channel, self.client_completion_queue,
- method, self.host, deadline)
- client_call.add_metadata(client_metadata_key, client_metadata_value)
- client_call.add_metadata(client_binary_metadata_key,
- client_binary_metadata_value)
-
- client_call.invoke(self.client_completion_queue, metadata_tag, finish_tag)
-
- self.server.service(service_tag)
- service_accepted = self.server_events.get()
- self.assertIsNotNone(service_accepted)
- self.assertIs(service_accepted.kind, _low.Event.Kind.SERVICE_ACCEPTED)
- self.assertIs(service_accepted.tag, service_tag)
- self.assertEqual(method.encode(), service_accepted.service_acceptance.method)
- self.assertEqual(self.host.encode(), service_accepted.service_acceptance.host)
- self.assertIsNotNone(service_accepted.service_acceptance.call)
- metadata = dict(service_accepted.metadata)
- self.assertIn(client_metadata_key.encode(), metadata)
- self.assertEqual(client_metadata_value.encode(), metadata[client_metadata_key.encode()])
- self.assertIn(client_binary_metadata_key.encode(), metadata)
- self.assertEqual(client_binary_metadata_value,
- metadata[client_binary_metadata_key.encode()])
- server_call = service_accepted.service_acceptance.call
- server_call.accept(self.server_completion_queue, finish_tag)
- server_call.add_metadata(server_leading_metadata_key,
- server_leading_metadata_value)
- server_call.add_metadata(server_leading_binary_metadata_key,
- server_leading_binary_metadata_value)
- server_call.premetadata()
-
- metadata_accepted = self.client_events.get()
- self.assertIsNotNone(metadata_accepted)
- self.assertEqual(_low.Event.Kind.METADATA_ACCEPTED, metadata_accepted.kind)
- self.assertEqual(metadata_tag, metadata_accepted.tag)
- metadata = dict(metadata_accepted.metadata)
- self.assertIn(server_leading_metadata_key.encode(), metadata)
- self.assertEqual(server_leading_metadata_value.encode(),
- metadata[server_leading_metadata_key.encode()])
- self.assertIn(server_leading_binary_metadata_key.encode(), metadata)
- self.assertEqual(server_leading_binary_metadata_value,
- metadata[server_leading_binary_metadata_key.encode()])
-
- for datum in test_data:
- client_call.write(datum, write_tag, _low.WriteFlags.WRITE_NO_COMPRESS)
- write_accepted = self.client_events.get()
- self.assertIsNotNone(write_accepted)
- self.assertIs(write_accepted.kind, _low.Event.Kind.WRITE_ACCEPTED)
- self.assertIs(write_accepted.tag, write_tag)
- self.assertIs(write_accepted.write_accepted, True)
-
- server_call.read(read_tag)
- read_accepted = self.server_events.get()
- self.assertIsNotNone(read_accepted)
- self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
- self.assertEqual(read_tag, read_accepted.tag)
- self.assertIsNotNone(read_accepted.bytes)
- server_data.append(read_accepted.bytes)
-
- server_call.write(read_accepted.bytes, write_tag, 0)
- write_accepted = self.server_events.get()
- self.assertIsNotNone(write_accepted)
- self.assertEqual(_low.Event.Kind.WRITE_ACCEPTED, write_accepted.kind)
- self.assertEqual(write_tag, write_accepted.tag)
- self.assertTrue(write_accepted.write_accepted)
-
- client_call.read(read_tag)
- read_accepted = self.client_events.get()
- self.assertIsNotNone(read_accepted)
- self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
- self.assertEqual(read_tag, read_accepted.tag)
- self.assertIsNotNone(read_accepted.bytes)
- client_data.append(read_accepted.bytes)
-
- client_call.complete(complete_tag)
- complete_accepted = self.client_events.get()
- self.assertIsNotNone(complete_accepted)
- self.assertIs(complete_accepted.kind, _low.Event.Kind.COMPLETE_ACCEPTED)
- self.assertIs(complete_accepted.tag, complete_tag)
- self.assertIs(complete_accepted.complete_accepted, True)
-
- server_call.read(read_tag)
- read_accepted = self.server_events.get()
- self.assertIsNotNone(read_accepted)
- self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
- self.assertEqual(read_tag, read_accepted.tag)
- self.assertIsNone(read_accepted.bytes)
-
- server_call.add_metadata(server_trailing_metadata_key,
- server_trailing_metadata_value)
- server_call.add_metadata(server_trailing_binary_metadata_key,
- server_trailing_binary_metadata_value)
-
- server_call.status(_low.Status(_low.Code.OK, details), status_tag)
- server_terminal_event_one = self.server_events.get()
- server_terminal_event_two = self.server_events.get()
- if server_terminal_event_one.kind == _low.Event.Kind.COMPLETE_ACCEPTED:
- status_accepted = server_terminal_event_one
- rpc_accepted = server_terminal_event_two
- else:
- status_accepted = server_terminal_event_two
- rpc_accepted = server_terminal_event_one
- self.assertIsNotNone(status_accepted)
- self.assertIsNotNone(rpc_accepted)
- self.assertEqual(_low.Event.Kind.COMPLETE_ACCEPTED, status_accepted.kind)
- self.assertEqual(status_tag, status_accepted.tag)
- self.assertTrue(status_accepted.complete_accepted)
- self.assertEqual(_low.Event.Kind.FINISH, rpc_accepted.kind)
- self.assertEqual(finish_tag, rpc_accepted.tag)
- self.assertEqual(_low.Status(_low.Code.OK, ''), rpc_accepted.status)
-
- client_call.read(read_tag)
- client_terminal_event_one = self.client_events.get()
- client_terminal_event_two = self.client_events.get()
- if client_terminal_event_one.kind == _low.Event.Kind.READ_ACCEPTED:
- read_accepted = client_terminal_event_one
- finish_accepted = client_terminal_event_two
- else:
- read_accepted = client_terminal_event_two
- finish_accepted = client_terminal_event_one
- self.assertIsNotNone(read_accepted)
- self.assertIsNotNone(finish_accepted)
- self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
- self.assertEqual(read_tag, read_accepted.tag)
- self.assertIsNone(read_accepted.bytes)
- self.assertEqual(_low.Event.Kind.FINISH, finish_accepted.kind)
- self.assertEqual(finish_tag, finish_accepted.tag)
- self.assertEqual(_low.Status(_low.Code.OK, details.encode()), finish_accepted.status)
- metadata = dict(finish_accepted.metadata)
- self.assertIn(server_trailing_metadata_key.encode(), metadata)
- self.assertEqual(server_trailing_metadata_value.encode(),
- metadata[server_trailing_metadata_key.encode()])
- self.assertIn(server_trailing_binary_metadata_key.encode(), metadata)
- self.assertEqual(server_trailing_binary_metadata_value,
- metadata[server_trailing_binary_metadata_key.encode()])
- self.assertSetEqual(set(key for key, _ in finish_accepted.metadata),
- set((server_trailing_metadata_key.encode(),
- server_trailing_binary_metadata_key.encode(),)))
-
- self.assertSequenceEqual(test_data, server_data)
- self.assertSequenceEqual(test_data, client_data)
-
- def testNoEcho(self):
- self._perform_echo_test(())
-
- def testOneByteEcho(self):
- self._perform_echo_test([b'\x07'])
-
- def testOneManyByteEcho(self):
- self._perform_echo_test([_BYTE_SEQUENCE])
-
- def testManyOneByteEchoes(self):
- self._perform_echo_test(
- [_BYTE_SEQUENCE[i:i+1] for i in range(len(_BYTE_SEQUENCE))])
-
- def testManyManyByteEchoes(self):
- self._perform_echo_test(_BYTE_SEQUENCE_SEQUENCE)
-
-
-class CancellationTest(unittest.TestCase):
-
- def setUp(self):
- self.host = 'localhost'
-
- self.server_completion_queue = _low.CompletionQueue()
- self.server = _low.Server(self.server_completion_queue)
- port = self.server.add_http2_addr('[::]:0')
- self.server.start()
- self.server_events = queue.Queue()
- self.server_completion_queue_thread = threading.Thread(
- target=_drive_completion_queue,
- args=(self.server_completion_queue, self.server_events))
- self.server_completion_queue_thread.start()
-
- self.client_completion_queue = _low.CompletionQueue()
- self.channel = _low.Channel('%s:%d' % (self.host, port), None)
- self.client_events = queue.Queue()
- self.client_completion_queue_thread = threading.Thread(
- target=_drive_completion_queue,
- args=(self.client_completion_queue, self.client_events))
- self.client_completion_queue_thread.start()
-
- def tearDown(self):
- self.server.stop()
- self.server.cancel_all_calls()
- self.server_completion_queue.stop()
- self.client_completion_queue.stop()
- self.server_completion_queue_thread.join()
- self.client_completion_queue_thread.join()
- del self.server
-
- def testCancellation(self):
- method = 'test method'
- deadline = _FUTURE
- metadata_tag = object()
- finish_tag = object()
- write_tag = object()
- service_tag = object()
- read_tag = object()
- test_data = _BYTE_SEQUENCE_SEQUENCE
-
- server_data = []
- client_data = []
-
- client_call = _low.Call(self.channel, self.client_completion_queue,
- method, self.host, deadline)
-
- client_call.invoke(self.client_completion_queue, metadata_tag, finish_tag)
-
- self.server.service(service_tag)
- service_accepted = self.server_events.get()
- server_call = service_accepted.service_acceptance.call
-
- server_call.accept(self.server_completion_queue, finish_tag)
- server_call.premetadata()
-
- metadata_accepted = self.client_events.get()
- self.assertIsNotNone(metadata_accepted)
-
- for datum in test_data:
- client_call.write(datum, write_tag, 0)
- write_accepted = self.client_events.get()
-
- server_call.read(read_tag)
- read_accepted = self.server_events.get()
- server_data.append(read_accepted.bytes)
-
- server_call.write(read_accepted.bytes, write_tag, 0)
- write_accepted = self.server_events.get()
- self.assertIsNotNone(write_accepted)
-
- client_call.read(read_tag)
- read_accepted = self.client_events.get()
- client_data.append(read_accepted.bytes)
-
- client_call.cancel()
- # cancel() is idempotent.
- client_call.cancel()
- client_call.cancel()
- client_call.cancel()
-
- server_call.read(read_tag)
-
- server_terminal_event_one = self.server_events.get()
- server_terminal_event_two = self.server_events.get()
- if server_terminal_event_one.kind == _low.Event.Kind.READ_ACCEPTED:
- read_accepted = server_terminal_event_one
- rpc_accepted = server_terminal_event_two
- else:
- read_accepted = server_terminal_event_two
- rpc_accepted = server_terminal_event_one
- self.assertIsNotNone(read_accepted)
- self.assertIsNotNone(rpc_accepted)
- self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
- self.assertIsNone(read_accepted.bytes)
- self.assertEqual(_low.Event.Kind.FINISH, rpc_accepted.kind)
- self.assertEqual(_low.Status(_low.Code.CANCELLED, ''), rpc_accepted.status)
-
- finish_event = self.client_events.get()
- self.assertEqual(_low.Event.Kind.FINISH, finish_event.kind)
- self.assertEqual(_low.Status(_low.Code.CANCELLED, b'Cancelled'),
- finish_event.status)
-
- self.assertSequenceEqual(test_data, server_data)
- self.assertSequenceEqual(test_data, client_data)
-
-
-class ExpirationTest(unittest.TestCase):
-
- @unittest.skip('TODO(nathaniel): Expiration test!')
- def testExpiration(self):
- pass
-
-
-if __name__ == '__main__':
- unittest.main(verbosity=2)
-
diff --git a/src/python/grpcio/tests/unit/_adapter/_low_test.py b/src/python/grpcio/tests/unit/_adapter/_low_test.py
deleted file mode 100644
index e09a1f2564..0000000000
--- a/src/python/grpcio/tests/unit/_adapter/_low_test.py
+++ /dev/null
@@ -1,319 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-import threading
-import time
-import unittest
-
-from grpc import _grpcio_metadata
-from grpc._adapter import _types
-from grpc._adapter import _low
-from tests.unit import test_common
-
-
-def wait_for_events(completion_queues, deadline):
- """
- Args:
- completion_queues: list of completion queues to wait for events on
- deadline: absolute deadline to wait until
-
- Returns:
- a sequence of events of length len(completion_queues).
- """
-
- results = [None] * len(completion_queues)
- lock = threading.Lock()
- threads = []
- def set_ith_result(i, completion_queue):
- result = completion_queue.next(deadline)
- with lock:
- results[i] = result
- for i, completion_queue in enumerate(completion_queues):
- thread = threading.Thread(target=set_ith_result,
- args=[i, completion_queue])
- thread.start()
- threads.append(thread)
- for thread in threads:
- thread.join()
- return results
-
-
-class InsecureServerInsecureClient(unittest.TestCase):
-
- def setUp(self):
- self.server_completion_queue = _low.CompletionQueue()
- self.server = _low.Server(self.server_completion_queue, [])
- self.port = self.server.add_http2_port('[::]:0')
- self.client_completion_queue = _low.CompletionQueue()
- self.client_channel = _low.Channel('localhost:%d'%self.port, [])
-
- self.server.start()
-
- def tearDown(self):
- self.server.shutdown()
- del self.client_channel
-
- self.client_completion_queue.shutdown()
- while (self.client_completion_queue.next(float('+inf')).type !=
- _types.EventType.QUEUE_SHUTDOWN):
- pass
- self.server_completion_queue.shutdown()
- while (self.server_completion_queue.next(float('+inf')).type !=
- _types.EventType.QUEUE_SHUTDOWN):
- pass
-
- del self.client_completion_queue
- del self.server_completion_queue
- del self.server
-
- def testEcho(self):
- deadline = time.time() + 5
- event_time_tolerance = 2
- deadline_tolerance = 0.25
- client_metadata_ascii_key = 'key'
- client_metadata_ascii_value = 'val'
- client_metadata_bin_key = 'key-bin'
- client_metadata_bin_value = b'\0'*1000
- server_initial_metadata_key = 'init_me_me_me'
- server_initial_metadata_value = 'whodawha?'
- server_trailing_metadata_key = 'california_is_in_a_drought'
- server_trailing_metadata_value = 'zomg it is'
- server_status_code = _types.StatusCode.OK
- server_status_details = 'our work is never over'
- request = 'blarghaflargh'
- response = 'his name is robert paulson'
- method = 'twinkies'
- host = 'hostess'
- server_request_tag = object()
- request_call_result = self.server.request_call(self.server_completion_queue,
- server_request_tag)
-
- self.assertEqual(_types.CallError.OK, request_call_result)
-
- client_call_tag = object()
- client_call = self.client_channel.create_call(
- self.client_completion_queue, method, host, deadline)
- client_initial_metadata = [
- (client_metadata_ascii_key, client_metadata_ascii_value),
- (client_metadata_bin_key, client_metadata_bin_value)
- ]
- client_start_batch_result = client_call.start_batch([
- _types.OpArgs.send_initial_metadata(client_initial_metadata),
- _types.OpArgs.send_message(request, 0),
- _types.OpArgs.send_close_from_client(),
- _types.OpArgs.recv_initial_metadata(),
- _types.OpArgs.recv_message(),
- _types.OpArgs.recv_status_on_client()
- ], client_call_tag)
- self.assertEqual(_types.CallError.OK, client_start_batch_result)
-
- client_no_event, request_event, = wait_for_events(
- [self.client_completion_queue, self.server_completion_queue],
- time.time() + event_time_tolerance)
- self.assertEqual(client_no_event, None)
- self.assertEqual(_types.EventType.OP_COMPLETE, request_event.type)
- self.assertIsInstance(request_event.call, _low.Call)
- self.assertIs(server_request_tag, request_event.tag)
- self.assertEqual(1, len(request_event.results))
- received_initial_metadata = request_event.results[0].initial_metadata
- # Check that our metadata were transmitted
- self.assertTrue(test_common.metadata_transmitted(client_initial_metadata,
- received_initial_metadata))
- # Check that Python's user agent string is a part of the full user agent
- # string
- received_initial_metadata_dict = dict(received_initial_metadata)
- self.assertIn(b'user-agent', received_initial_metadata_dict)
- self.assertIn('Python-gRPC-{}'.format(_grpcio_metadata.__version__).encode(),
- received_initial_metadata_dict[b'user-agent'])
- self.assertEqual(method.encode(), request_event.call_details.method)
- self.assertEqual(host.encode(), request_event.call_details.host)
- self.assertLess(abs(deadline - request_event.call_details.deadline),
- deadline_tolerance)
-
- # Check that the channel is connected, and that both it and the call have
- # the proper target and peer; do this after the first flurry of messages to
- # avoid the possibility that connection was delayed by the core until the
- # first message was sent.
- self.assertEqual(_types.ConnectivityState.READY,
- self.client_channel.check_connectivity_state(False))
- self.assertIsNotNone(self.client_channel.target())
- self.assertIsNotNone(client_call.peer())
-
- server_call_tag = object()
- server_call = request_event.call
- server_initial_metadata = [
- (server_initial_metadata_key, server_initial_metadata_value)
- ]
- server_trailing_metadata = [
- (server_trailing_metadata_key, server_trailing_metadata_value)
- ]
- server_start_batch_result = server_call.start_batch([
- _types.OpArgs.send_initial_metadata(server_initial_metadata),
- _types.OpArgs.recv_message(),
- _types.OpArgs.send_message(response, 0),
- _types.OpArgs.recv_close_on_server(),
- _types.OpArgs.send_status_from_server(
- server_trailing_metadata, server_status_code, server_status_details)
- ], server_call_tag)
- self.assertEqual(_types.CallError.OK, server_start_batch_result)
-
- client_event, server_event, = wait_for_events(
- [self.client_completion_queue, self.server_completion_queue],
- time.time() + event_time_tolerance)
-
- self.assertEqual(6, len(client_event.results))
- found_client_op_types = set()
- for client_result in client_event.results:
- # we expect each op type to be unique
- self.assertNotIn(client_result.type, found_client_op_types)
- found_client_op_types.add(client_result.type)
- if client_result.type == _types.OpType.RECV_INITIAL_METADATA:
- self.assertTrue(
- test_common.metadata_transmitted(server_initial_metadata,
- client_result.initial_metadata))
- elif client_result.type == _types.OpType.RECV_MESSAGE:
- self.assertEqual(response.encode(), client_result.message)
- elif client_result.type == _types.OpType.RECV_STATUS_ON_CLIENT:
- self.assertTrue(
- test_common.metadata_transmitted(server_trailing_metadata,
- client_result.trailing_metadata))
- self.assertEqual(server_status_details.encode(), client_result.status.details)
- self.assertEqual(server_status_code, client_result.status.code)
- self.assertEqual(set([
- _types.OpType.SEND_INITIAL_METADATA,
- _types.OpType.SEND_MESSAGE,
- _types.OpType.SEND_CLOSE_FROM_CLIENT,
- _types.OpType.RECV_INITIAL_METADATA,
- _types.OpType.RECV_MESSAGE,
- _types.OpType.RECV_STATUS_ON_CLIENT
- ]), found_client_op_types)
-
- self.assertEqual(5, len(server_event.results))
- found_server_op_types = set()
- for server_result in server_event.results:
- self.assertNotIn(client_result.type, found_server_op_types)
- found_server_op_types.add(server_result.type)
- if server_result.type == _types.OpType.RECV_MESSAGE:
- self.assertEqual(request.encode(), server_result.message)
- elif server_result.type == _types.OpType.RECV_CLOSE_ON_SERVER:
- self.assertFalse(server_result.cancelled)
- self.assertEqual(set([
- _types.OpType.SEND_INITIAL_METADATA,
- _types.OpType.RECV_MESSAGE,
- _types.OpType.SEND_MESSAGE,
- _types.OpType.RECV_CLOSE_ON_SERVER,
- _types.OpType.SEND_STATUS_FROM_SERVER
- ]), found_server_op_types)
-
- del client_call
- del server_call
-
-
-class HangingServerShutdown(unittest.TestCase):
-
- def setUp(self):
- self.server_completion_queue = _low.CompletionQueue()
- self.server = _low.Server(self.server_completion_queue, [])
- self.port = self.server.add_http2_port('[::]:0')
- self.client_completion_queue = _low.CompletionQueue()
- self.client_channel = _low.Channel('localhost:%d'%self.port, [])
-
- self.server.start()
-
- def tearDown(self):
- self.server.shutdown()
- del self.client_channel
-
- self.client_completion_queue.shutdown()
- self.server_completion_queue.shutdown()
- while True:
- client_event, server_event = wait_for_events(
- [self.client_completion_queue, self.server_completion_queue],
- float("+inf"))
- if (client_event.type == _types.EventType.QUEUE_SHUTDOWN and
- server_event.type == _types.EventType.QUEUE_SHUTDOWN):
- break
-
- del self.client_completion_queue
- del self.server_completion_queue
- del self.server
-
- def testHangingServerCall(self):
- deadline = time.time() + 5
- deadline_tolerance = 0.25
- event_time_tolerance = 2
- cancel_all_calls_time_tolerance = 0.5
- request = 'blarghaflargh'
- method = 'twinkies'
- host = 'hostess'
- server_request_tag = object()
- request_call_result = self.server.request_call(self.server_completion_queue,
- server_request_tag)
-
- client_call_tag = object()
- client_call = self.client_channel.create_call(self.client_completion_queue,
- method, host, deadline)
- client_start_batch_result = client_call.start_batch([
- _types.OpArgs.send_initial_metadata([]),
- _types.OpArgs.send_message(request, 0),
- _types.OpArgs.send_close_from_client(),
- _types.OpArgs.recv_initial_metadata(),
- _types.OpArgs.recv_message(),
- _types.OpArgs.recv_status_on_client()
- ], client_call_tag)
-
- client_no_event, request_event, = wait_for_events(
- [self.client_completion_queue, self.server_completion_queue],
- time.time() + event_time_tolerance)
-
- # Now try to shutdown the server and expect that we see server shutdown
- # almost immediately after calling cancel_all_calls.
-
- # First attempt to cancel all calls before shutting down, and expect
- # our state machine to catch the erroneous API use.
- with self.assertRaises(RuntimeError):
- self.server.cancel_all_calls()
-
- shutdown_tag = object()
- self.server.shutdown(shutdown_tag)
- pre_cancel_timestamp = time.time()
- self.server.cancel_all_calls()
- finish_shutdown_timestamp = None
- client_call_event, server_shutdown_event = wait_for_events(
- [self.client_completion_queue, self.server_completion_queue],
- time.time() + event_time_tolerance)
- self.assertIs(shutdown_tag, server_shutdown_event.tag)
- self.assertGreater(pre_cancel_timestamp + cancel_all_calls_time_tolerance,
- time.time())
-
- del client_call
-
-
-if __name__ == '__main__':
- unittest.main(verbosity=2)
diff --git a/src/python/grpcio/tests/unit/_api_test.py b/src/python/grpcio/tests/unit/_api_test.py
new file mode 100644
index 0000000000..2fe89499f5
--- /dev/null
+++ b/src/python/grpcio/tests/unit/_api_test.py
@@ -0,0 +1,111 @@
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Test of gRPC Python's application-layer API."""
+
+import unittest
+
+import six
+
+import grpc
+
+from tests.unit import _from_grpc_import_star
+
+
+class AllTest(unittest.TestCase):
+
+ def testAll(self):
+ expected_grpc_code_elements = (
+ 'FutureTimeoutError',
+ 'FutureCancelledError',
+ 'Future',
+ 'ChannelConnectivity',
+ 'StatusCode',
+ 'RpcError',
+ 'RpcContext',
+ 'Call',
+ 'ChannelCredentials',
+ 'CallCredentials',
+ 'AuthMetadataContext',
+ 'AuthMetadataPluginCallback',
+ 'AuthMetadataPlugin',
+ 'ServerCredentials',
+ 'UnaryUnaryMultiCallable',
+ 'UnaryStreamMultiCallable',
+ 'StreamUnaryMultiCallable',
+ 'StreamStreamMultiCallable',
+ 'Channel',
+ 'ServicerContext',
+ 'RpcMethodHandler',
+ 'HandlerCallDetails',
+ 'GenericRpcHandler',
+ 'Server',
+ 'unary_unary_rpc_method_handler',
+ 'unary_stream_rpc_method_handler',
+ 'stream_unary_rpc_method_handler',
+ 'stream_stream_rpc_method_handler',
+ 'method_handlers_generic_handler',
+ 'ssl_channel_credentials',
+ 'metadata_call_credentials',
+ 'access_token_call_credentials',
+ 'composite_call_credentials',
+ 'composite_channel_credentials',
+ 'ssl_server_credentials',
+ 'channel_ready_future',
+ 'insecure_channel',
+ 'secure_channel',
+ 'server',
+ )
+
+ six.assertCountEqual(
+ self, expected_grpc_code_elements,
+ _from_grpc_import_star.GRPC_ELEMENTS)
+
+
+class ChannelConnectivityTest(unittest.TestCase):
+
+ def testChannelConnectivity(self):
+ self.assertSequenceEqual(
+ (grpc.ChannelConnectivity.IDLE,
+ grpc.ChannelConnectivity.CONNECTING,
+ grpc.ChannelConnectivity.READY,
+ grpc.ChannelConnectivity.TRANSIENT_FAILURE,
+ grpc.ChannelConnectivity.SHUTDOWN,),
+ tuple(grpc.ChannelConnectivity))
+
+
+class ChannelTest(unittest.TestCase):
+
+ def test_secure_channel(self):
+ channel_credentials = grpc.ssl_channel_credentials()
+ channel = grpc.secure_channel('google.com:443', channel_credentials)
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio/tests/unit/_channel_connectivity_test.py b/src/python/grpcio/tests/unit/_channel_connectivity_test.py
index a1575efada..ae8de523ec 100644
--- a/src/python/grpcio/tests/unit/_channel_connectivity_test.py
+++ b/src/python/grpcio/tests/unit/_channel_connectivity_test.py
@@ -135,12 +135,12 @@ class ChannelConnectivityTest(unittest.TestCase):
self.assertNotIn(
grpc.ChannelConnectivity.TRANSIENT_FAILURE, third_connectivities)
self.assertNotIn(
- grpc.ChannelConnectivity.FATAL_FAILURE, third_connectivities)
+ grpc.ChannelConnectivity.SHUTDOWN, third_connectivities)
self.assertNotIn(
grpc.ChannelConnectivity.TRANSIENT_FAILURE,
fourth_connectivities)
self.assertNotIn(
- grpc.ChannelConnectivity.FATAL_FAILURE, fourth_connectivities)
+ grpc.ChannelConnectivity.SHUTDOWN, fourth_connectivities)
def test_reachable_then_unreachable_channel_connectivity(self):
server = _server.Server((), futures.ThreadPoolExecutor(max_workers=0))
diff --git a/src/python/grpcio/tests/unit/_core_over_links_base_interface_test.py b/src/python/grpcio/tests/unit/_core_over_links_base_interface_test.py
deleted file mode 100644
index 2b8981c752..0000000000
--- a/src/python/grpcio/tests/unit/_core_over_links_base_interface_test.py
+++ /dev/null
@@ -1,157 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Tests Base interface compliance of the core-over-gRPC-links stack."""
-
-import collections
-import logging
-import random
-import time
-import unittest
-
-import six
-
-from grpc._adapter import _intermediary_low
-from grpc._links import invocation
-from grpc._links import service
-from grpc.beta import interfaces as beta_interfaces
-from grpc.framework.core import implementations
-from grpc.framework.interfaces.base import utilities
-from tests.unit import test_common as grpc_test_common
-from tests.unit.framework.common import test_constants
-from tests.unit.framework.interfaces.base import test_cases
-from tests.unit.framework.interfaces.base import test_interfaces
-
-
-class _SerializationBehaviors(
- collections.namedtuple(
- '_SerializationBehaviors',
- ('request_serializers', 'request_deserializers', 'response_serializers',
- 'response_deserializers',))):
- pass
-
-
-class _Links(
- collections.namedtuple(
- '_Links',
- ('invocation_end_link', 'invocation_grpc_link', 'service_grpc_link',
- 'service_end_link'))):
- pass
-
-
-def _serialization_behaviors_from_serializations(serializations):
- request_serializers = {}
- request_deserializers = {}
- response_serializers = {}
- response_deserializers = {}
- for (group, method), serialization in six.iteritems(serializations):
- request_serializers[group, method] = serialization.serialize_request
- request_deserializers[group, method] = serialization.deserialize_request
- response_serializers[group, method] = serialization.serialize_response
- response_deserializers[group, method] = serialization.deserialize_response
- return _SerializationBehaviors(
- request_serializers, request_deserializers, response_serializers,
- response_deserializers)
-
-
-class _Implementation(test_interfaces.Implementation):
-
- def instantiate(self, serializations, servicer):
- serialization_behaviors = _serialization_behaviors_from_serializations(
- serializations)
- invocation_end_link = implementations.invocation_end_link()
- service_end_link = implementations.service_end_link(
- servicer, test_constants.DEFAULT_TIMEOUT,
- test_constants.MAXIMUM_TIMEOUT)
- service_grpc_link = service.service_link(
- serialization_behaviors.request_deserializers,
- serialization_behaviors.response_serializers)
- port = service_grpc_link.add_port('[::]:0', None)
- channel = _intermediary_low.Channel('localhost:%d' % port, None)
- invocation_grpc_link = invocation.invocation_link(
- channel, b'localhost', None,
- serialization_behaviors.request_serializers,
- serialization_behaviors.response_deserializers)
-
- invocation_end_link.join_link(invocation_grpc_link)
- invocation_grpc_link.join_link(invocation_end_link)
- service_end_link.join_link(service_grpc_link)
- service_grpc_link.join_link(service_end_link)
- invocation_grpc_link.start()
- service_grpc_link.start()
- return invocation_end_link, service_end_link, (
- invocation_grpc_link, service_grpc_link)
-
- def destantiate(self, memo):
- invocation_grpc_link, service_grpc_link = memo
- invocation_grpc_link.stop()
- service_grpc_link.begin_stop()
- service_grpc_link.end_stop()
-
- def invocation_initial_metadata(self):
- return grpc_test_common.INVOCATION_INITIAL_METADATA
-
- def service_initial_metadata(self):
- return grpc_test_common.SERVICE_INITIAL_METADATA
-
- def invocation_completion(self):
- return utilities.completion(None, None, None)
-
- def service_completion(self):
- return utilities.completion(
- grpc_test_common.SERVICE_TERMINAL_METADATA,
- beta_interfaces.StatusCode.OK, grpc_test_common.DETAILS)
-
- def metadata_transmitted(self, original_metadata, transmitted_metadata):
- return original_metadata is None or grpc_test_common.metadata_transmitted(
- original_metadata, transmitted_metadata)
-
- def completion_transmitted(self, original_completion, transmitted_completion):
- if (original_completion.terminal_metadata is not None and
- not grpc_test_common.metadata_transmitted(
- original_completion.terminal_metadata,
- transmitted_completion.terminal_metadata)):
- return False
- elif original_completion.code is not transmitted_completion.code:
- return False
- elif original_completion.message != transmitted_completion.message:
- return False
- else:
- return True
-
-
-def load_tests(loader, tests, pattern):
- return unittest.TestSuite(
- tests=tuple(
- loader.loadTestsFromTestCase(test_case_class)
- for test_case_class in test_cases.test_cases(_Implementation())))
-
-
-if __name__ == '__main__':
- unittest.main(verbosity=2)
diff --git a/src/python/grpcio/tests/unit/_crust_over_core_over_links_face_interface_test.py b/src/python/grpcio/tests/unit/_crust_over_core_over_links_face_interface_test.py
deleted file mode 100644
index 50b9a5a824..0000000000
--- a/src/python/grpcio/tests/unit/_crust_over_core_over_links_face_interface_test.py
+++ /dev/null
@@ -1,163 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Tests Face compliance of the crust-over-core-over-gRPC-links stack."""
-
-import collections
-import unittest
-
-import six
-
-from grpc._adapter import _intermediary_low
-from grpc._links import invocation
-from grpc._links import service
-from grpc.beta import interfaces as beta_interfaces
-from grpc.framework.core import implementations as core_implementations
-from grpc.framework.crust import implementations as crust_implementations
-from grpc.framework.foundation import logging_pool
-from grpc.framework.interfaces.links import utilities
-from tests.unit import test_common as grpc_test_common
-from tests.unit.framework.common import test_constants
-from tests.unit.framework.interfaces.face import test_cases
-from tests.unit.framework.interfaces.face import test_interfaces
-
-
-class _SerializationBehaviors(
- collections.namedtuple(
- '_SerializationBehaviors',
- ('request_serializers', 'request_deserializers', 'response_serializers',
- 'response_deserializers',))):
- pass
-
-
-def _serialization_behaviors_from_test_methods(test_methods):
- request_serializers = {}
- request_deserializers = {}
- response_serializers = {}
- response_deserializers = {}
- for (group, method), test_method in six.iteritems(test_methods):
- request_serializers[group, method] = test_method.serialize_request
- request_deserializers[group, method] = test_method.deserialize_request
- response_serializers[group, method] = test_method.serialize_response
- response_deserializers[group, method] = test_method.deserialize_response
- return _SerializationBehaviors(
- request_serializers, request_deserializers, response_serializers,
- response_deserializers)
-
-
-class _Implementation(test_interfaces.Implementation):
-
- def instantiate(
- self, methods, method_implementations, multi_method_implementation):
- pool = logging_pool.pool(test_constants.POOL_SIZE)
- servicer = crust_implementations.servicer(
- method_implementations, multi_method_implementation, pool)
- serialization_behaviors = _serialization_behaviors_from_test_methods(
- methods)
- invocation_end_link = core_implementations.invocation_end_link()
- service_end_link = core_implementations.service_end_link(
- servicer, test_constants.DEFAULT_TIMEOUT,
- test_constants.MAXIMUM_TIMEOUT)
- service_grpc_link = service.service_link(
- serialization_behaviors.request_deserializers,
- serialization_behaviors.response_serializers)
- port = service_grpc_link.add_port('[::]:0', None)
- channel = _intermediary_low.Channel('localhost:%d' % port, None)
- invocation_grpc_link = invocation.invocation_link(
- channel, b'localhost', None,
- serialization_behaviors.request_serializers,
- serialization_behaviors.response_deserializers)
-
- invocation_end_link.join_link(invocation_grpc_link)
- invocation_grpc_link.join_link(invocation_end_link)
- service_grpc_link.join_link(service_end_link)
- service_end_link.join_link(service_grpc_link)
- service_end_link.start()
- invocation_end_link.start()
- invocation_grpc_link.start()
- service_grpc_link.start()
-
- generic_stub = crust_implementations.generic_stub(invocation_end_link, pool)
- # TODO(nathaniel): Add a "groups" attribute to _digest.TestServiceDigest.
- group = next(iter(methods))[0]
- # TODO(nathaniel): Add a "cardinalities_by_group" attribute to
- # _digest.TestServiceDigest.
- cardinalities = {
- method: method_object.cardinality()
- for (group, method), method_object in six.iteritems(methods)}
- dynamic_stub = crust_implementations.dynamic_stub(
- invocation_end_link, group, cardinalities, pool)
-
- return generic_stub, {group: dynamic_stub}, (
- invocation_end_link, invocation_grpc_link, service_grpc_link,
- service_end_link, pool)
-
- def destantiate(self, memo):
- (invocation_end_link, invocation_grpc_link, service_grpc_link,
- service_end_link, pool) = memo
- invocation_end_link.stop(0).wait()
- invocation_grpc_link.stop()
- service_grpc_link.begin_stop()
- service_end_link.stop(0).wait()
- service_grpc_link.end_stop()
- invocation_end_link.join_link(utilities.NULL_LINK)
- invocation_grpc_link.join_link(utilities.NULL_LINK)
- service_grpc_link.join_link(utilities.NULL_LINK)
- service_end_link.join_link(utilities.NULL_LINK)
- pool.shutdown(wait=True)
-
- def invocation_metadata(self):
- return grpc_test_common.INVOCATION_INITIAL_METADATA
-
- def initial_metadata(self):
- return grpc_test_common.SERVICE_INITIAL_METADATA
-
- def terminal_metadata(self):
- return grpc_test_common.SERVICE_TERMINAL_METADATA
-
- def code(self):
- return beta_interfaces.StatusCode.OK
-
- def details(self):
- return grpc_test_common.DETAILS
-
- def metadata_transmitted(self, original_metadata, transmitted_metadata):
- return original_metadata is None or grpc_test_common.metadata_transmitted(
- original_metadata, transmitted_metadata)
-
-
-def load_tests(loader, tests, pattern):
- return unittest.TestSuite(
- tests=tuple(
- loader.loadTestsFromTestCase(test_case_class)
- for test_case_class in test_cases.test_cases(_Implementation())))
-
-
-if __name__ == '__main__':
- unittest.main(verbosity=2)
diff --git a/src/python/grpcio/tests/unit/_from_grpc_import_star.py b/src/python/grpcio/tests/unit/_from_grpc_import_star.py
new file mode 100644
index 0000000000..78d2fb7dc5
--- /dev/null
+++ b/src/python/grpcio/tests/unit/_from_grpc_import_star.py
@@ -0,0 +1,38 @@
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+_BEFORE_IMPORT = tuple(globals())
+
+from grpc import *
+
+_AFTER_IMPORT = tuple(globals())
+
+GRPC_ELEMENTS = tuple(
+ element for element in _AFTER_IMPORT
+ if element not in _BEFORE_IMPORT and element != '_BEFORE_IMPORT')
diff --git a/src/python/grpcio/tests/unit/_links/_lonely_invocation_link_test.py b/src/python/grpcio/tests/unit/_links/_lonely_invocation_link_test.py
deleted file mode 100644
index 890755f81c..0000000000
--- a/src/python/grpcio/tests/unit/_links/_lonely_invocation_link_test.py
+++ /dev/null
@@ -1,88 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""A test of invocation-side code unconnected to an RPC server."""
-
-import unittest
-
-from grpc._adapter import _intermediary_low
-from grpc._links import invocation
-from grpc.framework.interfaces.links import links
-from tests.unit.framework.common import test_constants
-from tests.unit.framework.interfaces.links import test_cases
-from tests.unit.framework.interfaces.links import test_utilities
-
-_NULL_BEHAVIOR = lambda unused_argument: None
-
-
-class LonelyInvocationLinkTest(unittest.TestCase):
-
- def testUpAndDown(self):
- channel = _intermediary_low.Channel('nonexistent:54321', None)
- invocation_link = invocation.invocation_link(
- channel, 'nonexistent', None, {}, {})
-
- invocation_link.start()
- invocation_link.stop()
-
- def _test_lonely_invocation_with_termination(self, termination):
- test_operation_id = object()
- test_group = 'test package.Test Service'
- test_method = 'test method'
- invocation_link_mate = test_utilities.RecordingLink()
-
- channel = _intermediary_low.Channel('nonexistent:54321', None)
- invocation_link = invocation.invocation_link(
- channel, 'nonexistent', None, {}, {})
- invocation_link.join_link(invocation_link_mate)
- invocation_link.start()
-
- ticket = links.Ticket(
- test_operation_id, 0, test_group, test_method,
- links.Ticket.Subscription.FULL, test_constants.SHORT_TIMEOUT, 1, None,
- None, None, None, None, termination, None)
- invocation_link.accept_ticket(ticket)
- invocation_link_mate.block_until_tickets_satisfy(test_cases.terminated)
-
- invocation_link.stop()
-
- self.assertIsNot(
- invocation_link_mate.tickets()[-1].termination,
- links.Ticket.Termination.COMPLETION)
-
- def testLonelyInvocationLinkWithCommencementTicket(self):
- self._test_lonely_invocation_with_termination(None)
-
- def testLonelyInvocationLinkWithEntireTicket(self):
- self._test_lonely_invocation_with_termination(
- links.Ticket.Termination.COMPLETION)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/src/python/grpcio/tests/unit/_links/_transmission_test.py b/src/python/grpcio/tests/unit/_links/_transmission_test.py
deleted file mode 100644
index 1f6edd18ca..0000000000
--- a/src/python/grpcio/tests/unit/_links/_transmission_test.py
+++ /dev/null
@@ -1,239 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Tests transmission of tickets across gRPC-on-the-wire."""
-
-import unittest
-
-from grpc._adapter import _intermediary_low
-from grpc._links import invocation
-from grpc._links import service
-from grpc.beta import interfaces as beta_interfaces
-from grpc.framework.interfaces.links import links
-from tests.unit import test_common
-from tests.unit._links import _proto_scenarios
-from tests.unit.framework.common import test_constants
-from tests.unit.framework.interfaces.links import test_cases
-from tests.unit.framework.interfaces.links import test_utilities
-
-_IDENTITY = lambda x: x
-
-
-class TransmissionTest(test_cases.TransmissionTest, unittest.TestCase):
-
- def create_transmitting_links(self):
- service_link = service.service_link(
- {self.group_and_method(): self.deserialize_request},
- {self.group_and_method(): self.serialize_response})
- port = service_link.add_port('[::]:0', None)
- service_link.start()
- channel = _intermediary_low.Channel('localhost:%d' % port, None)
- invocation_link = invocation.invocation_link(
- channel, 'localhost', None,
- {self.group_and_method(): self.serialize_request},
- {self.group_and_method(): self.deserialize_response})
- invocation_link.start()
- return invocation_link, service_link
-
- def destroy_transmitting_links(self, invocation_side_link, service_side_link):
- invocation_side_link.stop()
- service_side_link.begin_stop()
- service_side_link.end_stop()
-
- def create_invocation_initial_metadata(self):
- return (
- ('first_invocation_initial_metadata_key', 'just a string value'),
- ('second_invocation_initial_metadata_key', '0123456789'),
- ('third_invocation_initial_metadata_key-bin', '\x00\x57' * 100),
- )
-
- def create_invocation_terminal_metadata(self):
- return None
-
- def create_service_initial_metadata(self):
- return (
- ('first_service_initial_metadata_key', 'just another string value'),
- ('second_service_initial_metadata_key', '9876543210'),
- ('third_service_initial_metadata_key-bin', '\x00\x59\x02' * 100),
- )
-
- def create_service_terminal_metadata(self):
- return (
- ('first_service_terminal_metadata_key', 'yet another string value'),
- ('second_service_terminal_metadata_key', 'abcdefghij'),
- ('third_service_terminal_metadata_key-bin', '\x00\x37' * 100),
- )
-
- def create_invocation_completion(self):
- return None, None
-
- def create_service_completion(self):
- return (
- beta_interfaces.StatusCode.OK, b'An exuberant test "details" message!')
-
- def assertMetadataTransmitted(self, original_metadata, transmitted_metadata):
- self.assertTrue(
- test_common.metadata_transmitted(
- original_metadata, transmitted_metadata),
- '%s erroneously transmitted as %s' % (
- original_metadata, transmitted_metadata))
-
-
-class RoundTripTest(unittest.TestCase):
-
- def testZeroMessageRoundTrip(self):
- test_operation_id = object()
- test_group = 'test package.Test Group'
- test_method = 'test method'
- identity_transformation = {(test_group, test_method): _IDENTITY}
- test_code = beta_interfaces.StatusCode.OK
- test_message = 'a test message'
-
- service_link = service.service_link(
- identity_transformation, identity_transformation)
- service_mate = test_utilities.RecordingLink()
- service_link.join_link(service_mate)
- port = service_link.add_port('[::]:0', None)
- service_link.start()
- channel = _intermediary_low.Channel('localhost:%d' % port, None)
- invocation_link = invocation.invocation_link(
- channel, None, None, identity_transformation, identity_transformation)
- invocation_mate = test_utilities.RecordingLink()
- invocation_link.join_link(invocation_mate)
- invocation_link.start()
-
- invocation_ticket = links.Ticket(
- test_operation_id, 0, test_group, test_method,
- links.Ticket.Subscription.FULL, test_constants.LONG_TIMEOUT, None, None,
- None, None, None, None, links.Ticket.Termination.COMPLETION, None)
- invocation_link.accept_ticket(invocation_ticket)
- service_mate.block_until_tickets_satisfy(test_cases.terminated)
-
- service_ticket = links.Ticket(
- service_mate.tickets()[-1].operation_id, 0, None, None, None, None,
- None, None, None, None, test_code, test_message,
- links.Ticket.Termination.COMPLETION, None)
- service_link.accept_ticket(service_ticket)
- invocation_mate.block_until_tickets_satisfy(test_cases.terminated)
-
- invocation_link.stop()
- service_link.begin_stop()
- service_link.end_stop()
-
- self.assertIs(
- service_mate.tickets()[-1].termination,
- links.Ticket.Termination.COMPLETION)
- self.assertIs(
- invocation_mate.tickets()[-1].termination,
- links.Ticket.Termination.COMPLETION)
- self.assertIs(invocation_mate.tickets()[-1].code, test_code)
- self.assertEqual(invocation_mate.tickets()[-1].message, test_message.encode())
-
- def _perform_scenario_test(self, scenario):
- test_operation_id = object()
- test_group, test_method = scenario.group_and_method()
- test_code = beta_interfaces.StatusCode.OK
- test_message = 'a scenario test message'
-
- service_link = service.service_link(
- {(test_group, test_method): scenario.deserialize_request},
- {(test_group, test_method): scenario.serialize_response})
- service_mate = test_utilities.RecordingLink()
- service_link.join_link(service_mate)
- port = service_link.add_port('[::]:0', None)
- service_link.start()
- channel = _intermediary_low.Channel('localhost:%d' % port, None)
- invocation_link = invocation.invocation_link(
- channel, 'localhost', None,
- {(test_group, test_method): scenario.serialize_request},
- {(test_group, test_method): scenario.deserialize_response})
- invocation_mate = test_utilities.RecordingLink()
- invocation_link.join_link(invocation_mate)
- invocation_link.start()
-
- invocation_ticket = links.Ticket(
- test_operation_id, 0, test_group, test_method,
- links.Ticket.Subscription.FULL, test_constants.LONG_TIMEOUT, None, None,
- None, None, None, None, None, None)
- invocation_link.accept_ticket(invocation_ticket)
- requests = scenario.requests()
- for request_index, request in enumerate(requests):
- request_ticket = links.Ticket(
- test_operation_id, 1 + request_index, None, None, None, None, 1, None,
- request, None, None, None, None, None)
- invocation_link.accept_ticket(request_ticket)
- service_mate.block_until_tickets_satisfy(
- test_cases.at_least_n_payloads_received_predicate(1 + request_index))
- response_ticket = links.Ticket(
- service_mate.tickets()[0].operation_id, request_index, None, None,
- None, None, 1, None, scenario.response_for_request(request), None,
- None, None, None, None)
- service_link.accept_ticket(response_ticket)
- invocation_mate.block_until_tickets_satisfy(
- test_cases.at_least_n_payloads_received_predicate(1 + request_index))
- request_count = len(requests)
- invocation_completion_ticket = links.Ticket(
- test_operation_id, request_count + 1, None, None, None, None, None,
- None, None, None, None, None, links.Ticket.Termination.COMPLETION,
- None)
- invocation_link.accept_ticket(invocation_completion_ticket)
- service_mate.block_until_tickets_satisfy(test_cases.terminated)
- service_completion_ticket = links.Ticket(
- service_mate.tickets()[0].operation_id, request_count, None, None, None,
- None, None, None, None, None, test_code, test_message,
- links.Ticket.Termination.COMPLETION, None)
- service_link.accept_ticket(service_completion_ticket)
- invocation_mate.block_until_tickets_satisfy(test_cases.terminated)
-
- invocation_link.stop()
- service_link.begin_stop()
- service_link.end_stop()
-
- observed_requests = tuple(
- ticket.payload for ticket in service_mate.tickets()
- if ticket.payload is not None)
- observed_responses = tuple(
- ticket.payload for ticket in invocation_mate.tickets()
- if ticket.payload is not None)
- self.assertTrue(scenario.verify_requests(observed_requests))
- self.assertTrue(scenario.verify_responses(observed_responses))
-
- def testEmptyScenario(self):
- self._perform_scenario_test(_proto_scenarios.EmptyScenario())
-
- def testBidirectionallyUnaryScenario(self):
- self._perform_scenario_test(_proto_scenarios.BidirectionallyUnaryScenario())
-
- def testBidirectionallyStreamingScenario(self):
- self._perform_scenario_test(
- _proto_scenarios.BidirectionallyStreamingScenario())
-
-
-if __name__ == '__main__':
- unittest.main(verbosity=2)
diff --git a/src/python/grpcio/tests/unit/_metadata_test.py b/src/python/grpcio/tests/unit/_metadata_test.py
new file mode 100644
index 0000000000..2cb13f236b
--- /dev/null
+++ b/src/python/grpcio/tests/unit/_metadata_test.py
@@ -0,0 +1,216 @@
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+"""Tests server and client side metadata API."""
+
+import unittest
+import weakref
+
+import grpc
+from grpc import _grpcio_metadata
+from grpc.framework.foundation import logging_pool
+
+from tests.unit import test_common
+from tests.unit.framework.common import test_constants
+
+_CHANNEL_ARGS = (('grpc.primary_user_agent', 'primary-agent'),
+ ('grpc.secondary_user_agent', 'secondary-agent'))
+
+_REQUEST = b'\x00\x00\x00'
+_RESPONSE = b'\x00\x00\x00'
+
+_UNARY_UNARY = b'/test/UnaryUnary'
+_UNARY_STREAM = b'/test/UnaryStream'
+_STREAM_UNARY = b'/test/StreamUnary'
+_STREAM_STREAM = b'/test/StreamStream'
+
+_USER_AGENT = 'Python-gRPC-{}'.format(_grpcio_metadata.__version__)
+
+_CLIENT_METADATA = (
+ (b'client-md-key', b'client-md-key'),
+ (b'client-md-key-bin', b'\x00\x01')
+)
+
+_SERVER_INITIAL_METADATA = (
+ (b'server-initial-md-key', b'server-initial-md-value'),
+ (b'server-initial-md-key-bin', b'\x00\x02')
+)
+
+_SERVER_TRAILING_METADATA = (
+ (b'server-trailing-md-key', b'server-trailing-md-value'),
+ (b'server-trailing-md-key-bin', b'\x00\x03')
+)
+
+
+def user_agent(metadata):
+ for key, val in metadata:
+ if key == b'user-agent':
+ return val.decode('ascii')
+ raise KeyError('No user agent!')
+
+
+def validate_client_metadata(test, servicer_context):
+ test.assertTrue(test_common.metadata_transmitted(
+ _CLIENT_METADATA, servicer_context.invocation_metadata()))
+ test.assertTrue(user_agent(servicer_context.invocation_metadata())
+ .startswith('primary-agent ' + _USER_AGENT))
+ test.assertTrue(user_agent(servicer_context.invocation_metadata())
+ .endswith('secondary-agent'))
+
+
+def handle_unary_unary(test, request, servicer_context):
+ validate_client_metadata(test, servicer_context)
+ servicer_context.send_initial_metadata(_SERVER_INITIAL_METADATA)
+ servicer_context.set_trailing_metadata(_SERVER_TRAILING_METADATA)
+ return _RESPONSE
+
+
+def handle_unary_stream(test, request, servicer_context):
+ validate_client_metadata(test, servicer_context)
+ servicer_context.send_initial_metadata(_SERVER_INITIAL_METADATA)
+ servicer_context.set_trailing_metadata(_SERVER_TRAILING_METADATA)
+ for _ in range(test_constants.STREAM_LENGTH):
+ yield _RESPONSE
+
+
+def handle_stream_unary(test, request_iterator, servicer_context):
+ validate_client_metadata(test, servicer_context)
+ servicer_context.send_initial_metadata(_SERVER_INITIAL_METADATA)
+ servicer_context.set_trailing_metadata(_SERVER_TRAILING_METADATA)
+ # TODO(issue:#6891) We should be able to remove this loop
+ for request in request_iterator:
+ pass
+ return _RESPONSE
+
+
+def handle_stream_stream(test, request_iterator, servicer_context):
+ validate_client_metadata(test, servicer_context)
+ servicer_context.send_initial_metadata(_SERVER_INITIAL_METADATA)
+ servicer_context.set_trailing_metadata(_SERVER_TRAILING_METADATA)
+ # TODO(issue:#6891) We should be able to remove this loop,
+ # and replace with return; yield
+ for request in request_iterator:
+ yield _RESPONSE
+
+
+class _MethodHandler(grpc.RpcMethodHandler):
+
+ def __init__(self, test, request_streaming, response_streaming):
+ self.request_streaming = request_streaming
+ self.response_streaming = response_streaming
+ self.request_deserializer = None
+ self.response_serializer = None
+ self.unary_unary = None
+ self.unary_stream = None
+ self.stream_unary = None
+ self.stream_stream = None
+ if self.request_streaming and self.response_streaming:
+ self.stream_stream = lambda x, y: handle_stream_stream(test, x, y)
+ elif self.request_streaming:
+ self.stream_unary = lambda x, y: handle_stream_unary(test, x, y)
+ elif self.response_streaming:
+ self.unary_stream = lambda x, y: handle_unary_stream(test, x, y)
+ else:
+ self.unary_unary = lambda x, y: handle_unary_unary(test, x, y)
+
+
+class _GenericHandler(grpc.GenericRpcHandler):
+
+ def __init__(self, test):
+ self._test = test
+
+ def service(self, handler_call_details):
+ if handler_call_details.method == _UNARY_UNARY:
+ return _MethodHandler(self._test, False, False)
+ elif handler_call_details.method == _UNARY_STREAM:
+ return _MethodHandler(self._test, False, True)
+ elif handler_call_details.method == _STREAM_UNARY:
+ return _MethodHandler(self._test, True, False)
+ elif handler_call_details.method == _STREAM_STREAM:
+ return _MethodHandler(self._test, True, True)
+ else:
+ return None
+
+
+class MetadataTest(unittest.TestCase):
+
+ def setUp(self):
+ self._server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
+ self._server = grpc.server((_GenericHandler(weakref.proxy(self)),),
+ self._server_pool)
+ port = self._server.add_insecure_port('[::]:0')
+ self._server.start()
+ self._channel = grpc.insecure_channel('localhost:%d' % port,
+ options=_CHANNEL_ARGS)
+
+ def tearDown(self):
+ self._server.stop(0)
+
+ def testUnaryUnary(self):
+ multi_callable = self._channel.unary_unary(_UNARY_UNARY)
+ unused_response, call = multi_callable.with_call(
+ _REQUEST, metadata=_CLIENT_METADATA)
+ self.assertTrue(test_common.metadata_transmitted(
+ _SERVER_INITIAL_METADATA, call.initial_metadata()))
+ self.assertTrue(test_common.metadata_transmitted(
+ _SERVER_TRAILING_METADATA, call.trailing_metadata()))
+
+ def testUnaryStream(self):
+ multi_callable = self._channel.unary_stream(_UNARY_STREAM)
+ call = multi_callable(_REQUEST, metadata=_CLIENT_METADATA)
+ self.assertTrue(test_common.metadata_transmitted(
+ _SERVER_INITIAL_METADATA, call.initial_metadata()))
+ for _ in call:
+ pass
+ self.assertTrue(test_common.metadata_transmitted(
+ _SERVER_TRAILING_METADATA, call.trailing_metadata()))
+
+ def testStreamUnary(self):
+ multi_callable = self._channel.stream_unary(_STREAM_UNARY)
+ unused_response, call = multi_callable.with_call(
+ [_REQUEST] * test_constants.STREAM_LENGTH,
+ metadata=_CLIENT_METADATA)
+ self.assertTrue(test_common.metadata_transmitted(
+ _SERVER_INITIAL_METADATA, call.initial_metadata()))
+ self.assertTrue(test_common.metadata_transmitted(
+ _SERVER_TRAILING_METADATA, call.trailing_metadata()))
+
+ def testStreamStream(self):
+ multi_callable = self._channel.stream_stream(_STREAM_STREAM)
+ call = multi_callable([_REQUEST] * test_constants.STREAM_LENGTH,
+ metadata=_CLIENT_METADATA)
+ self.assertTrue(test_common.metadata_transmitted(
+ _SERVER_INITIAL_METADATA, call.initial_metadata()))
+ for _ in call:
+ pass
+ self.assertTrue(test_common.metadata_transmitted(
+ _SERVER_TRAILING_METADATA, call.trailing_metadata()))
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio/tests/unit/_rpc_test.py b/src/python/grpcio/tests/unit/_rpc_test.py
index 8407593c86..9814504edf 100644
--- a/src/python/grpcio/tests/unit/_rpc_test.py
+++ b/src/python/grpcio/tests/unit/_rpc_test.py
@@ -27,7 +27,7 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-"""Test of gRPC Python's application-layer API."""
+"""Test of RPCs made against gRPC Python's application-layer API."""
import itertools
import threading
@@ -216,10 +216,9 @@ class RPCTest(unittest.TestCase):
expected_response = self._handler.handle_unary_unary(request, None)
multi_callable = _unary_unary_multi_callable(self._channel)
- response, call = multi_callable(
+ response, call = multi_callable.with_call(
request, metadata=(
- (b'test', b'SuccessfulUnaryRequestBlockingUnaryResponseWithCall'),),
- with_call=True)
+ (b'test', b'SuccessfulUnaryRequestBlockingUnaryResponseWithCall'),))
self.assertEqual(expected_response, response)
self.assertIs(grpc.StatusCode.OK, call.code())
@@ -266,11 +265,11 @@ class RPCTest(unittest.TestCase):
request_iterator = iter(requests)
multi_callable = _stream_unary_multi_callable(self._channel)
- response, call = multi_callable(
+ response, call = multi_callable.with_call(
request_iterator,
metadata=(
(b'test', b'SuccessfulStreamRequestBlockingUnaryResponseWithCall'),
- ), with_call=True)
+ ))
self.assertEqual(expected_response, response)
self.assertIs(grpc.StatusCode.OK, call.code())
@@ -525,10 +524,9 @@ class RPCTest(unittest.TestCase):
multi_callable = _unary_unary_multi_callable(self._channel)
with self._control.pause():
with self.assertRaises(grpc.RpcError) as exception_context:
- multi_callable(
+ multi_callable.with_call(
request, timeout=test_constants.SHORT_TIMEOUT,
- metadata=((b'test', b'ExpiredUnaryRequestBlockingUnaryResponse'),),
- with_call=True)
+ metadata=((b'test', b'ExpiredUnaryRequestBlockingUnaryResponse'),))
self.assertIsNotNone(exception_context.exception.initial_metadata())
self.assertIs(
@@ -640,10 +638,9 @@ class RPCTest(unittest.TestCase):
multi_callable = _unary_unary_multi_callable(self._channel)
with self._control.fail():
with self.assertRaises(grpc.RpcError) as exception_context:
- multi_callable(
+ multi_callable.with_call(
request,
- metadata=((b'test', b'FailedUnaryRequestBlockingUnaryResponse'),),
- with_call=True)
+ metadata=((b'test', b'FailedUnaryRequestBlockingUnaryResponse'),))
self.assertIs(grpc.StatusCode.UNKNOWN, exception_context.exception.code())
diff --git a/src/python/grpcio/tests/unit/beta/_connectivity_channel_test.py b/src/python/grpcio/tests/unit/beta/_connectivity_channel_test.py
index 5dc8720639..488f7d7141 100644
--- a/src/python/grpcio/tests/unit/beta/_connectivity_channel_test.py
+++ b/src/python/grpcio/tests/unit/beta/_connectivity_channel_test.py
@@ -187,5 +187,15 @@ class ChannelConnectivityTest(unittest.TestCase):
server_completion_queue_thread.join()
+class ConnectivityStatesTest(unittest.TestCase):
+
+ def testBetaConnectivityStates(self):
+ self.assertIsNotNone(interfaces.ChannelConnectivity.IDLE)
+ self.assertIsNotNone(interfaces.ChannelConnectivity.CONNECTING)
+ self.assertIsNotNone(interfaces.ChannelConnectivity.READY)
+ self.assertIsNotNone(interfaces.ChannelConnectivity.TRANSIENT_FAILURE)
+ self.assertIsNotNone(interfaces.ChannelConnectivity.FATAL_FAILURE)
+
+
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/src/python/grpcio/tests/unit/beta/test_utilities.py b/src/python/grpcio/tests/unit/beta/test_utilities.py
index e374b203ce..8ccad04e05 100644
--- a/src/python/grpcio/tests/unit/beta/test_utilities.py
+++ b/src/python/grpcio/tests/unit/beta/test_utilities.py
@@ -50,6 +50,6 @@ def not_really_secure_channel(
"""
target = '%s:%d' % (host, port)
channel = grpc.secure_channel(
- target, channel_credentials._credentials,
+ target, channel_credentials,
((b'grpc.ssl_target_name_override', server_host_override,),))
return implementations.Channel(channel)
diff --git a/src/python/grpcio/tests/unit/framework/_crust_over_core_face_interface_test.py b/src/python/grpcio/tests/unit/framework/_crust_over_core_face_interface_test.py
deleted file mode 100644
index 43457be362..0000000000
--- a/src/python/grpcio/tests/unit/framework/_crust_over_core_face_interface_test.py
+++ /dev/null
@@ -1,113 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Tests Face interface compliance of the crust-over-core stack."""
-
-import collections
-import unittest
-
-import six
-
-from grpc.framework.core import implementations as core_implementations
-from grpc.framework.crust import implementations as crust_implementations
-from grpc.framework.foundation import logging_pool
-from grpc.framework.interfaces.links import utilities
-from tests.unit.framework.common import test_constants
-from tests.unit.framework.interfaces.face import test_cases
-from tests.unit.framework.interfaces.face import test_interfaces
-from tests.unit.framework.interfaces.links import test_utilities
-
-
-class _Implementation(test_interfaces.Implementation):
-
- def instantiate(
- self, methods, method_implementations, multi_method_implementation):
- pool = logging_pool.pool(test_constants.POOL_SIZE)
- servicer = crust_implementations.servicer(
- method_implementations, multi_method_implementation, pool)
-
- service_end_link = core_implementations.service_end_link(
- servicer, test_constants.DEFAULT_TIMEOUT,
- test_constants.MAXIMUM_TIMEOUT)
- invocation_end_link = core_implementations.invocation_end_link()
- invocation_end_link.join_link(service_end_link)
- service_end_link.join_link(invocation_end_link)
- service_end_link.start()
- invocation_end_link.start()
-
- generic_stub = crust_implementations.generic_stub(invocation_end_link, pool)
- # TODO(nathaniel): Add a "groups" attribute to _digest.TestServiceDigest.
- group = next(iter(methods))[0]
- # TODO(nathaniel): Add a "cardinalities_by_group" attribute to
- # _digest.TestServiceDigest.
- cardinalities = {
- method: method_object.cardinality()
- for (group, method), method_object in six.iteritems(methods)}
- dynamic_stub = crust_implementations.dynamic_stub(
- invocation_end_link, group, cardinalities, pool)
-
- return generic_stub, {group: dynamic_stub}, (
- invocation_end_link, service_end_link, pool)
-
- def destantiate(self, memo):
- invocation_end_link, service_end_link, pool = memo
- invocation_end_link.stop(0).wait()
- service_end_link.stop(0).wait()
- invocation_end_link.join_link(utilities.NULL_LINK)
- service_end_link.join_link(utilities.NULL_LINK)
- pool.shutdown(wait=True)
-
- def invocation_metadata(self):
- return object()
-
- def initial_metadata(self):
- return object()
-
- def terminal_metadata(self):
- return object()
-
- def code(self):
- return object()
-
- def details(self):
- return object()
-
- def metadata_transmitted(self, original_metadata, transmitted_metadata):
- return original_metadata is transmitted_metadata
-
-
-def load_tests(loader, tests, pattern):
- return unittest.TestSuite(
- tests=tuple(
- loader.loadTestsFromTestCase(test_case_class)
- for test_case_class in test_cases.test_cases(_Implementation())))
-
-
-if __name__ == '__main__':
- unittest.main(verbosity=2)
diff --git a/src/python/grpcio/tests/unit/framework/core/_base_interface_test.py b/src/python/grpcio/tests/unit/framework/core/_base_interface_test.py
deleted file mode 100644
index 1310292306..0000000000
--- a/src/python/grpcio/tests/unit/framework/core/_base_interface_test.py
+++ /dev/null
@@ -1,96 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Tests the RPC Framework Core's implementation of the Base interface."""
-
-import logging
-import random
-import time
-import unittest
-
-from grpc.framework.core import implementations
-from grpc.framework.interfaces.base import utilities
-from tests.unit.framework.common import test_constants
-from tests.unit.framework.interfaces.base import test_cases
-from tests.unit.framework.interfaces.base import test_interfaces
-
-
-class _Implementation(test_interfaces.Implementation):
-
- def __init__(self):
- self._invocation_initial_metadata = object()
- self._service_initial_metadata = object()
- self._invocation_terminal_metadata = object()
- self._service_terminal_metadata = object()
-
- def instantiate(self, serializations, servicer):
- invocation = implementations.invocation_end_link()
- service = implementations.service_end_link(
- servicer, test_constants.DEFAULT_TIMEOUT,
- test_constants.MAXIMUM_TIMEOUT)
- invocation.join_link(service)
- service.join_link(invocation)
- return invocation, service, None
-
- def destantiate(self, memo):
- pass
-
- def invocation_initial_metadata(self):
- return self._invocation_initial_metadata
-
- def service_initial_metadata(self):
- return self._service_initial_metadata
-
- def invocation_completion(self):
- return utilities.completion(self._invocation_terminal_metadata, None, None)
-
- def service_completion(self):
- return utilities.completion(self._service_terminal_metadata, None, None)
-
- def metadata_transmitted(self, original_metadata, transmitted_metadata):
- return transmitted_metadata is original_metadata
-
- def completion_transmitted(self, original_completion, transmitted_completion):
- return (
- (original_completion.terminal_metadata is
- transmitted_completion.terminal_metadata) and
- original_completion.code is transmitted_completion.code and
- original_completion.message is transmitted_completion.message
- )
-
-
-def load_tests(loader, tests, pattern):
- return unittest.TestSuite(
- tests=tuple(
- loader.loadTestsFromTestCase(test_case_class)
- for test_case_class in test_cases.test_cases(_Implementation())))
-
-
-if __name__ == '__main__':
- unittest.main(verbosity=2)
diff --git a/src/python/grpcio/tests/unit/framework/foundation/_later_test.py b/src/python/grpcio/tests/unit/framework/foundation/_later_test.py
deleted file mode 100644
index 6c2459e185..0000000000
--- a/src/python/grpcio/tests/unit/framework/foundation/_later_test.py
+++ /dev/null
@@ -1,151 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Tests of the later module."""
-
-import threading
-import time
-import unittest
-
-from grpc.framework.foundation import later
-
-TICK = 0.1
-
-
-class LaterTest(unittest.TestCase):
-
- def test_simple_delay(self):
- lock = threading.Lock()
- cell = [0]
- return_value = object()
-
- def computation():
- with lock:
- cell[0] += 1
- return return_value
- computation_future = later.later(TICK * 2, computation)
-
- self.assertFalse(computation_future.done())
- self.assertFalse(computation_future.cancelled())
- time.sleep(TICK)
- self.assertFalse(computation_future.done())
- self.assertFalse(computation_future.cancelled())
- with lock:
- self.assertEqual(0, cell[0])
- time.sleep(TICK * 2)
- self.assertTrue(computation_future.done())
- self.assertFalse(computation_future.cancelled())
- with lock:
- self.assertEqual(1, cell[0])
- self.assertEqual(return_value, computation_future.result())
-
- def test_callback(self):
- lock = threading.Lock()
- cell = [0]
- callback_called = [False]
- future_passed_to_callback = [None]
- def computation():
- with lock:
- cell[0] += 1
- computation_future = later.later(TICK * 2, computation)
- def callback(outcome):
- with lock:
- callback_called[0] = True
- future_passed_to_callback[0] = outcome
- computation_future.add_done_callback(callback)
- time.sleep(TICK)
- with lock:
- self.assertFalse(callback_called[0])
- time.sleep(TICK * 2)
- with lock:
- self.assertTrue(callback_called[0])
- self.assertTrue(future_passed_to_callback[0].done())
-
- callback_called[0] = False
- future_passed_to_callback[0] = None
-
- computation_future.add_done_callback(callback)
- with lock:
- self.assertTrue(callback_called[0])
- self.assertTrue(future_passed_to_callback[0].done())
-
- def test_cancel(self):
- lock = threading.Lock()
- cell = [0]
- callback_called = [False]
- future_passed_to_callback = [None]
- def computation():
- with lock:
- cell[0] += 1
- computation_future = later.later(TICK * 2, computation)
- def callback(outcome):
- with lock:
- callback_called[0] = True
- future_passed_to_callback[0] = outcome
- computation_future.add_done_callback(callback)
- time.sleep(TICK)
- with lock:
- self.assertFalse(callback_called[0])
- computation_future.cancel()
- self.assertTrue(computation_future.cancelled())
- self.assertFalse(computation_future.running())
- self.assertTrue(computation_future.done())
- with lock:
- self.assertTrue(callback_called[0])
- self.assertTrue(future_passed_to_callback[0].cancelled())
-
- def test_result(self):
- lock = threading.Lock()
- cell = [0]
- callback_called = [False]
- future_passed_to_callback_cell = [None]
- return_value = object()
-
- def computation():
- with lock:
- cell[0] += 1
- return return_value
- computation_future = later.later(TICK * 2, computation)
-
- def callback(future_passed_to_callback):
- with lock:
- callback_called[0] = True
- future_passed_to_callback_cell[0] = future_passed_to_callback
- computation_future.add_done_callback(callback)
- returned_value = computation_future.result()
- self.assertEqual(return_value, returned_value)
-
- # The callback may not yet have been called! Sleep a tick.
- time.sleep(TICK)
- with lock:
- self.assertTrue(callback_called[0])
- self.assertEqual(return_value, future_passed_to_callback_cell[0].result())
-
-if __name__ == '__main__':
- unittest.main(verbosity=2)