diff options
Diffstat (limited to 'src/python')
32 files changed, 2607 insertions, 8 deletions
diff --git a/src/python/grpcio/grpc/_grpcio_metadata.py b/src/python/grpcio/grpc/_grpcio_metadata.py index dcb46459a2..a4eb358c4e 100644 --- a/src/python/grpcio/grpc/_grpcio_metadata.py +++ b/src/python/grpcio/grpc/_grpcio_metadata.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio/grpc/_grpcio_metadata.py.template`!!! -__version__ = """1.5.0.dev0""" +__version__ = """1.7.0.dev0""" diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 605044b65e..dc4d28f95b 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -92,6 +92,9 @@ CORE_SOURCE_FILES = [ 'src/core/lib/iomgr/ev_windows.c', 'src/core/lib/iomgr/exec_ctx.c', 'src/core/lib/iomgr/executor.c', + 'src/core/lib/iomgr/gethostname_fallback.c', + 'src/core/lib/iomgr/gethostname_host_name_max.c', + 'src/core/lib/iomgr/gethostname_sysconf.c', 'src/core/lib/iomgr/iocp_windows.c', 'src/core/lib/iomgr/iomgr.c', 'src/core/lib/iomgr/iomgr_posix.c', @@ -193,6 +196,7 @@ CORE_SOURCE_FILES = [ 'src/core/ext/transport/chttp2/transport/bin_encoder.c', 'src/core/ext/transport/chttp2/transport/chttp2_plugin.c', 'src/core/ext/transport/chttp2/transport/chttp2_transport.c', + 'src/core/ext/transport/chttp2/transport/flow_control.c', 'src/core/ext/transport/chttp2/transport/frame_data.c', 'src/core/ext/transport/chttp2/transport/frame_goaway.c', 'src/core/ext/transport/chttp2/transport/frame_ping.c', @@ -242,6 +246,7 @@ CORE_SOURCE_FILES = [ 'src/core/tsi/fake_transport_security.c', 'src/core/tsi/gts_transport_security.c', 'src/core/tsi/ssl_transport_security.c', + 'src/core/tsi/transport_security_grpc.c', 'src/core/tsi/transport_security.c', 'src/core/tsi/transport_security_adapter.c', 'src/core/ext/transport/chttp2/server/chttp2_server.c', diff --git a/src/python/grpcio/grpc_version.py b/src/python/grpcio/grpc_version.py index 1bbd1e149e..3194a893d7 100644 --- a/src/python/grpcio/grpc_version.py +++ b/src/python/grpcio/grpc_version.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio/grpc_version.py.template`!!! -VERSION='1.5.0.dev0' +VERSION='1.7.0.dev0' diff --git a/src/python/grpcio_health_checking/grpc_version.py b/src/python/grpcio_health_checking/grpc_version.py index 8aa5118f39..ef68bad17a 100644 --- a/src/python/grpcio_health_checking/grpc_version.py +++ b/src/python/grpcio_health_checking/grpc_version.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_health_checking/grpc_version.py.template`!!! -VERSION='1.5.0.dev0' +VERSION='1.7.0.dev0' diff --git a/src/python/grpcio_health_checking/setup.py b/src/python/grpcio_health_checking/setup.py index 83e0ead391..0299b4cca9 100644 --- a/src/python/grpcio_health_checking/setup.py +++ b/src/python/grpcio_health_checking/setup.py @@ -24,6 +24,18 @@ os.chdir(os.path.dirname(os.path.abspath(__file__))) import health_commands import grpc_version +CLASSIFIERS = [ + 'Development Status :: 5 - Production/Stable', + 'Programming Language :: Python', + 'Programming Language :: Python :: 2', + 'Programming Language :: Python :: 2.7', + 'Programming Language :: Python :: 3', + 'Programming Language :: Python :: 3.4', + 'Programming Language :: Python :: 3.5', + 'Programming Language :: Python :: 3.6', + 'License :: OSI Approved :: Apache Software License', +], + PACKAGE_DIRECTORIES = { '': '.', } @@ -48,6 +60,7 @@ setuptools.setup( author_email='grpc-io@googlegroups.com', url='https://grpc.io', license='Apache License 2.0', + classifiers=CLASSIFIERS, package_dir=PACKAGE_DIRECTORIES, packages=setuptools.find_packages('.'), install_requires=INSTALL_REQUIRES, diff --git a/src/python/grpcio_reflection/grpc_version.py b/src/python/grpcio_reflection/grpc_version.py index a23b9b5edb..55ab959cc5 100644 --- a/src/python/grpcio_reflection/grpc_version.py +++ b/src/python/grpcio_reflection/grpc_version.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_reflection/grpc_version.py.template`!!! -VERSION='1.5.0.dev0' +VERSION='1.7.0.dev0' diff --git a/src/python/grpcio_reflection/setup.py b/src/python/grpcio_reflection/setup.py index 20edbc4ec0..bed2311b59 100644 --- a/src/python/grpcio_reflection/setup.py +++ b/src/python/grpcio_reflection/setup.py @@ -25,6 +25,18 @@ os.chdir(os.path.dirname(os.path.abspath(__file__))) import reflection_commands import grpc_version +CLASSIFIERS = [ + 'Development Status :: 5 - Production/Stable', + 'Programming Language :: Python', + 'Programming Language :: Python :: 2', + 'Programming Language :: Python :: 2.7', + 'Programming Language :: Python :: 3', + 'Programming Language :: Python :: 3.4', + 'Programming Language :: Python :: 3.5', + 'Programming Language :: Python :: 3.6', + 'License :: OSI Approved :: Apache Software License', +], + PACKAGE_DIRECTORIES = { '': '.', } @@ -48,6 +60,7 @@ setuptools.setup( description='Standard Protobuf Reflection Service for gRPC', author='The gRPC Authors', author_email='grpc-io@googlegroups.com', + classifiers=CLASSIFIERS, url='https://grpc.io', package_dir=PACKAGE_DIRECTORIES, packages=setuptools.find_packages('.'), diff --git a/src/python/grpcio_testing/grpc_testing/__init__.py b/src/python/grpcio_testing/grpc_testing/__init__.py new file mode 100644 index 0000000000..14e25f09e2 --- /dev/null +++ b/src/python/grpcio_testing/grpc_testing/__init__.py @@ -0,0 +1,408 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Objects for use in testing gRPC Python-using application code.""" + +import abc + +from google.protobuf import descriptor +import six + +import grpc + + +class UnaryUnaryChannelRpc(six.with_metaclass(abc.ABCMeta)): + """Fixture for a unary-unary RPC invoked by a system under test. + + Enables users to "play server" for the RPC. + """ + + @abc.abstractmethod + def send_initial_metadata(self, initial_metadata): + """Sends the RPC's initial metadata to the system under test. + + Args: + initial_metadata: The RPC's initial metadata to be "sent" to + the system under test. + """ + raise NotImplementedError() + + @abc.abstractmethod + def cancelled(self): + """Blocks until the system under test has cancelled the RPC.""" + raise NotImplementedError() + + @abc.abstractmethod + def terminate(self, response, trailing_metadata, code, details): + """Terminates the RPC. + + Args: + response: The response for the RPC. + trailing_metadata: The RPC's trailing metadata. + code: The RPC's status code. + details: The RPC's status details. + """ + raise NotImplementedError() + + +class UnaryStreamChannelRpc(six.with_metaclass(abc.ABCMeta)): + """Fixture for a unary-stream RPC invoked by a system under test. + + Enables users to "play server" for the RPC. + """ + + @abc.abstractmethod + def send_initial_metadata(self, initial_metadata): + """Sends the RPC's initial metadata to the system under test. + + Args: + initial_metadata: The RPC's initial metadata to be "sent" to + the system under test. + """ + raise NotImplementedError() + + @abc.abstractmethod + def send_response(self, response): + """Sends a response to the system under test. + + Args: + response: A response message to be "sent" to the system under test. + """ + raise NotImplementedError() + + @abc.abstractmethod + def cancelled(self): + """Blocks until the system under test has cancelled the RPC.""" + raise NotImplementedError() + + @abc.abstractmethod + def terminate(self, trailing_metadata, code, details): + """Terminates the RPC. + + Args: + trailing_metadata: The RPC's trailing metadata. + code: The RPC's status code. + details: The RPC's status details. + """ + raise NotImplementedError() + + +class StreamUnaryChannelRpc(six.with_metaclass(abc.ABCMeta)): + """Fixture for a stream-unary RPC invoked by a system under test. + + Enables users to "play server" for the RPC. + """ + + @abc.abstractmethod + def send_initial_metadata(self, initial_metadata): + """Sends the RPC's initial metadata to the system under test. + + Args: + initial_metadata: The RPC's initial metadata to be "sent" to + the system under test. + """ + raise NotImplementedError() + + @abc.abstractmethod + def take_request(self): + """Draws one of the requests added to the RPC by the system under test. + + This method blocks until the system under test has added to the RPC + the request to be returned. + + Successive calls to this method return requests in the same order in + which the system under test added them to the RPC. + + Returns: + A request message added to the RPC by the system under test. + """ + raise NotImplementedError() + + @abc.abstractmethod + def requests_closed(self): + """Blocks until the system under test has closed the request stream.""" + raise NotImplementedError() + + @abc.abstractmethod + def cancelled(self): + """Blocks until the system under test has cancelled the RPC.""" + raise NotImplementedError() + + @abc.abstractmethod + def terminate(self, response, trailing_metadata, code, details): + """Terminates the RPC. + + Args: + response: The response for the RPC. + trailing_metadata: The RPC's trailing metadata. + code: The RPC's status code. + details: The RPC's status details. + """ + raise NotImplementedError() + + +class StreamStreamChannelRpc(six.with_metaclass(abc.ABCMeta)): + """Fixture for a stream-stream RPC invoked by a system under test. + + Enables users to "play server" for the RPC. + """ + + @abc.abstractmethod + def send_initial_metadata(self, initial_metadata): + """Sends the RPC's initial metadata to the system under test. + + Args: + initial_metadata: The RPC's initial metadata to be "sent" to the + system under test. + """ + raise NotImplementedError() + + @abc.abstractmethod + def take_request(self): + """Draws one of the requests added to the RPC by the system under test. + + This method blocks until the system under test has added to the RPC + the request to be returned. + + Successive calls to this method return requests in the same order in + which the system under test added them to the RPC. + + Returns: + A request message added to the RPC by the system under test. + """ + raise NotImplementedError() + + @abc.abstractmethod + def send_response(self, response): + """Sends a response to the system under test. + + Args: + response: A response messages to be "sent" to the system under test. + """ + raise NotImplementedError() + + @abc.abstractmethod + def requests_closed(self): + """Blocks until the system under test has closed the request stream.""" + raise NotImplementedError() + + @abc.abstractmethod + def cancelled(self): + """Blocks until the system under test has cancelled the RPC.""" + raise NotImplementedError() + + @abc.abstractmethod + def terminate(self, trailing_metadata, code, details): + """Terminates the RPC. + + Args: + trailing_metadata: The RPC's trailing metadata. + code: The RPC's status code. + details: The RPC's status details. + """ + raise NotImplementedError() + + +class Channel(six.with_metaclass(abc.ABCMeta), grpc.Channel): + """A grpc.Channel double with which to test a system that invokes RPCs.""" + + @abc.abstractmethod + def take_unary_unary(self, method_descriptor): + """Draws an RPC currently being made by the system under test. + + If the given descriptor does not identify any RPC currently being made + by the system under test, this method blocks until the system under + test invokes such an RPC. + + Args: + method_descriptor: A descriptor.MethodDescriptor describing a + unary-unary RPC method. + + Returns: + A (invocation_metadata, request, unary_unary_channel_rpc) tuple of + the RPC's invocation metadata, its request, and a + UnaryUnaryChannelRpc with which to "play server" for the RPC. + """ + raise NotImplementedError() + + @abc.abstractmethod + def take_unary_stream(self, method_descriptor): + """Draws an RPC currently being made by the system under test. + + If the given descriptor does not identify any RPC currently being made + by the system under test, this method blocks until the system under + test invokes such an RPC. + + Args: + method_descriptor: A descriptor.MethodDescriptor describing a + unary-stream RPC method. + + Returns: + A (invocation_metadata, request, unary_stream_channel_rpc) tuple of + the RPC's invocation metadata, its request, and a + UnaryStreamChannelRpc with which to "play server" for the RPC. + """ + raise NotImplementedError() + + @abc.abstractmethod + def take_stream_unary(self, method_descriptor): + """Draws an RPC currently being made by the system under test. + + If the given descriptor does not identify any RPC currently being made + by the system under test, this method blocks until the system under + test invokes such an RPC. + + Args: + method_descriptor: A descriptor.MethodDescriptor describing a + stream-unary RPC method. + + Returns: + A (invocation_metadata, stream_unary_channel_rpc) tuple of the RPC's + invocation metadata and a StreamUnaryChannelRpc with which to "play + server" for the RPC. + """ + raise NotImplementedError() + + @abc.abstractmethod + def take_stream_stream(self, method_descriptor): + """Draws an RPC currently being made by the system under test. + + If the given descriptor does not identify any RPC currently being made + by the system under test, this method blocks until the system under + test invokes such an RPC. + + Args: + method_descriptor: A descriptor.MethodDescriptor describing a + stream-stream RPC method. + + Returns: + A (invocation_metadata, stream_stream_channel_rpc) tuple of the RPC's + invocation metadata and a StreamStreamChannelRpc with which to + "play server" for the RPC. + """ + raise NotImplementedError() + + +class Time(six.with_metaclass(abc.ABCMeta)): + """A simulation of time. + + Implementations needn't be connected with real time as provided by the + Python interpreter, but as long as systems under test use + RpcContext.is_active and RpcContext.time_remaining for querying RPC liveness + implementations may be used to change passage of time in tests. + """ + + @abc.abstractmethod + def time(self): + """Accesses the current test time. + + Returns: + The current test time (over which this object has authority). + """ + raise NotImplementedError() + + @abc.abstractmethod + def call_in(self, behavior, delay): + """Adds a behavior to be called after some time. + + Args: + behavior: A behavior to be called with no arguments. + delay: A duration of time in seconds after which to call the behavior. + + Returns: + A grpc.Future with which the call of the behavior may be cancelled + before it is executed. + """ + raise NotImplementedError() + + @abc.abstractmethod + def call_at(self, behavior, time): + """Adds a behavior to be called at a specific time. + + Args: + behavior: A behavior to be called with no arguments. + time: The test time at which to call the behavior. + + Returns: + A grpc.Future with which the call of the behavior may be cancelled + before it is executed. + """ + raise NotImplementedError() + + @abc.abstractmethod + def sleep_for(self, duration): + """Blocks for some length of test time. + + Args: + duration: A duration of test time in seconds for which to block. + """ + raise NotImplementedError() + + @abc.abstractmethod + def sleep_until(self, time): + """Blocks until some test time. + + Args: + time: The test time until which to block. + """ + raise NotImplementedError() + + +def strict_real_time(): + """Creates a Time backed by the Python interpreter's time. + + The returned instance will be "strict" with respect to callbacks + submitted to it: it will ensure that all callbacks registered to + be called at time t have been called before it describes the time + as having advanced beyond t. + + Returns: + A Time backed by the "system" (Python interpreter's) time. + """ + from grpc_testing import _time + return _time.StrictRealTime() + + +def strict_fake_time(now): + """Creates a Time that can be manipulated by test code. + + The returned instance maintains an internal representation of time + independent of real time. This internal representation only advances + when user code calls the instance's sleep_for and sleep_until methods. + + The returned instance will be "strict" with respect to callbacks + submitted to it: it will ensure that all callbacks registered to + be called at time t have been called before it describes the time + as having advanced beyond t. + + Returns: + A Time that simulates the passage of time. + """ + from grpc_testing import _time + return _time.StrictFakeTime(now) + + +def channel(service_descriptors, time): + """Creates a Channel for use in tests of a gRPC Python-using system. + + Args: + service_descriptors: An iterable of descriptor.ServiceDescriptors + describing the RPCs that will be made on the returned Channel by the + system under test. + time: A Time to be used for tests. + + Returns: + A Channel for use in tests. + """ + from grpc_testing import _channel + return _channel.testing_channel(service_descriptors, time) diff --git a/src/python/grpcio_testing/grpc_testing/_channel/__init__.py b/src/python/grpcio_testing/grpc_testing/_channel/__init__.py new file mode 100644 index 0000000000..8011975d0a --- /dev/null +++ b/src/python/grpcio_testing/grpc_testing/_channel/__init__.py @@ -0,0 +1,23 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from grpc_testing._channel import _channel +from grpc_testing._channel import _channel_state + + +# descriptors is reserved for later use. +# pylint: disable=unused-argument +def testing_channel(descriptors, time): + return _channel.TestingChannel(time, _channel_state.State()) +# pylint: enable=unused-argument diff --git a/src/python/grpcio_testing/grpc_testing/_channel/_channel.py b/src/python/grpcio_testing/grpc_testing/_channel/_channel.py new file mode 100644 index 0000000000..fbd064db88 --- /dev/null +++ b/src/python/grpcio_testing/grpc_testing/_channel/_channel.py @@ -0,0 +1,62 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc_testing +from grpc_testing._channel import _channel_rpc +from grpc_testing._channel import _multi_callable + + +# All serializer and deserializer parameters are not (yet) used by this +# test infrastructure. +# pylint: disable=unused-argument +class TestingChannel(grpc_testing.Channel): + + def __init__(self, time, state): + self._time = time + self._state = state + + def subscribe(self, callback, try_to_connect=False): + raise NotImplementedError() + + def unsubscribe(self, callback): + raise NotImplementedError() + + def unary_unary( + self, method, request_serializer=None, response_deserializer=None): + return _multi_callable.UnaryUnary(method, self._state) + + def unary_stream( + self, method, request_serializer=None, response_deserializer=None): + return _multi_callable.UnaryStream(method, self._state) + + def stream_unary( + self, method, request_serializer=None, response_deserializer=None): + return _multi_callable.StreamUnary(method, self._state) + + def stream_stream( + self, method, request_serializer=None, response_deserializer=None): + return _multi_callable.StreamStream(method, self._state) + + def take_unary_unary(self, method_descriptor): + return _channel_rpc.unary_unary(self._state, method_descriptor) + + def take_unary_stream(self, method_descriptor): + return _channel_rpc.unary_stream(self._state, method_descriptor) + + def take_stream_unary(self, method_descriptor): + return _channel_rpc.stream_unary(self._state, method_descriptor) + + def take_stream_stream(self, method_descriptor): + return _channel_rpc.stream_stream(self._state, method_descriptor) +# pylint: enable=unused-argument diff --git a/src/python/grpcio_testing/grpc_testing/_channel/_channel_rpc.py b/src/python/grpcio_testing/grpc_testing/_channel/_channel_rpc.py new file mode 100644 index 0000000000..762b6a035b --- /dev/null +++ b/src/python/grpcio_testing/grpc_testing/_channel/_channel_rpc.py @@ -0,0 +1,119 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc_testing + + +class _UnaryUnary(grpc_testing.UnaryUnaryChannelRpc): + + def __init__(self, rpc_state): + self._rpc_state = rpc_state + + def send_initial_metadata(self, initial_metadata): + self._rpc_state.send_initial_metadata(initial_metadata) + + def cancelled(self): + self._rpc_state.cancelled() + + def terminate(self, response, trailing_metadata, code, details): + self._rpc_state.terminate_with_response( + response, trailing_metadata, code, details) + + +class _UnaryStream(grpc_testing.UnaryStreamChannelRpc): + + def __init__(self, rpc_state): + self._rpc_state = rpc_state + + def send_initial_metadata(self, initial_metadata): + self._rpc_state.send_initial_metadata(initial_metadata) + + def send_response(self, response): + self._rpc_state.send_response(response) + + def cancelled(self): + self._rpc_state.cancelled() + + def terminate(self, trailing_metadata, code, details): + self._rpc_state.terminate(trailing_metadata, code, details) + + +class _StreamUnary(grpc_testing.StreamUnaryChannelRpc): + + def __init__(self, rpc_state): + self._rpc_state = rpc_state + + def send_initial_metadata(self, initial_metadata): + self._rpc_state.send_initial_metadata(initial_metadata) + + def take_request(self): + return self._rpc_state.take_request() + + def requests_closed(self): + return self._rpc_state.requests_closed() + + def cancelled(self): + self._rpc_state.cancelled() + + def terminate(self, response, trailing_metadata, code, details): + self._rpc_state.terminate_with_response( + response, trailing_metadata, code, details) + + +class _StreamStream(grpc_testing.StreamStreamChannelRpc): + + def __init__(self, rpc_state): + self._rpc_state = rpc_state + + def send_initial_metadata(self, initial_metadata): + self._rpc_state.send_initial_metadata(initial_metadata) + + def take_request(self): + return self._rpc_state.take_request() + + def send_response(self, response): + self._rpc_state.send_response(response) + + def requests_closed(self): + return self._rpc_state.requests_closed() + + def cancelled(self): + self._rpc_state.cancelled() + + def terminate(self, trailing_metadata, code, details): + self._rpc_state.terminate(trailing_metadata, code, details) + + +def unary_unary(channel_state, method_descriptor): + rpc_state = channel_state.take_rpc_state(method_descriptor) + invocation_metadata, request = ( + rpc_state.take_invocation_metadata_and_request()) + return invocation_metadata, request, _UnaryUnary(rpc_state) + + +def unary_stream(channel_state, method_descriptor): + rpc_state = channel_state.take_rpc_state(method_descriptor) + invocation_metadata, request = ( + rpc_state.take_invocation_metadata_and_request()) + return invocation_metadata, request, _UnaryStream(rpc_state) + + +def stream_unary(channel_state, method_descriptor): + rpc_state = channel_state.take_rpc_state(method_descriptor) + return rpc_state.take_invocation_metadata(), _StreamUnary(rpc_state) + + +def stream_stream(channel_state, method_descriptor): + rpc_state = channel_state.take_rpc_state(method_descriptor) + return rpc_state.take_invocation_metadata(), _StreamStream(rpc_state) diff --git a/src/python/grpcio_testing/grpc_testing/_channel/_channel_state.py b/src/python/grpcio_testing/grpc_testing/_channel/_channel_state.py new file mode 100644 index 0000000000..569c41d79d --- /dev/null +++ b/src/python/grpcio_testing/grpc_testing/_channel/_channel_state.py @@ -0,0 +1,48 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections +import threading + +from grpc_testing import _common +from grpc_testing._channel import _rpc_state + + +class State(_common.ChannelHandler): + + def __init__(self): + self._condition = threading.Condition() + self._rpc_states = collections.defaultdict(list) + + def invoke_rpc( + self, method_full_rpc_name, invocation_metadata, requests, + requests_closed, timeout): + rpc_state = _rpc_state.State( + invocation_metadata, requests, requests_closed) + with self._condition: + self._rpc_states[method_full_rpc_name].append(rpc_state) + self._condition.notify_all() + return rpc_state + + def take_rpc_state(self, method_descriptor): + method_full_rpc_name = '/{}/{}'.format( + method_descriptor.containing_service.full_name, + method_descriptor.name) + with self._condition: + while True: + method_rpc_states = self._rpc_states[method_full_rpc_name] + if method_rpc_states: + return method_rpc_states.pop(0) + else: + self._condition.wait() diff --git a/src/python/grpcio_testing/grpc_testing/_channel/_invocation.py b/src/python/grpcio_testing/grpc_testing/_channel/_invocation.py new file mode 100644 index 0000000000..ebce652eeb --- /dev/null +++ b/src/python/grpcio_testing/grpc_testing/_channel/_invocation.py @@ -0,0 +1,322 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import threading + +import grpc + +_NOT_YET_OBSERVED = object() + + +def _cancel(handler): + return handler.cancel(grpc.StatusCode.CANCELLED, 'Locally cancelled!') + + +def _is_active(handler): + return handler.is_active() + + +def _time_remaining(unused_handler): + raise NotImplementedError() + + +def _add_callback(handler, callback): + return handler.add_callback(callback) + + +def _initial_metadata(handler): + return handler.initial_metadata() + + +def _trailing_metadata(handler): + trailing_metadata, unused_code, unused_details = handler.termination() + return trailing_metadata + + +def _code(handler): + unused_trailing_metadata, code, unused_details = handler.termination() + return code + + +def _details(handler): + unused_trailing_metadata, unused_code, details = handler.termination() + return details + + +class _Call(grpc.Call): + + def __init__(self, handler): + self._handler = handler + + def cancel(self): + _cancel(self._handler) + + def is_active(self): + return _is_active(self._handler) + + def time_remaining(self): + return _time_remaining(self._handler) + + def add_callback(self, callback): + return _add_callback(self._handler, callback) + + def initial_metadata(self): + return _initial_metadata(self._handler) + + def trailing_metadata(self): + return _trailing_metadata(self._handler) + + def code(self): + return _code(self._handler) + + def details(self): + return _details(self._handler) + + +class _RpcErrorCall(grpc.RpcError, grpc.Call): + + def __init__(self, handler): + self._handler = handler + + def cancel(self): + _cancel(self._handler) + + def is_active(self): + return _is_active(self._handler) + + def time_remaining(self): + return _time_remaining(self._handler) + + def add_callback(self, callback): + return _add_callback(self._handler, callback) + + def initial_metadata(self): + return _initial_metadata(self._handler) + + def trailing_metadata(self): + return _trailing_metadata(self._handler) + + def code(self): + return _code(self._handler) + + def details(self): + return _details(self._handler) + + +def _next(handler): + read = handler.take_response() + if read.code is None: + return read.response + elif read.code is grpc.StatusCode.OK: + raise StopIteration() + else: + raise _RpcErrorCall(handler) + + +class _HandlerExtras(object): + + def __init__(self): + self.condition = threading.Condition() + self.unary_response = _NOT_YET_OBSERVED + self.cancelled = False + + +def _with_extras_cancel(handler, extras): + with extras.condition: + if handler.cancel(grpc.StatusCode.CANCELLED, 'Locally cancelled!'): + extras.cancelled = True + return True + else: + return False + + +def _extras_without_cancelled(extras): + with extras.condition: + return extras.cancelled + + +def _running(handler): + return handler.is_active() + + +def _done(handler): + return not handler.is_active() + + +def _with_extras_unary_response(handler, extras): + with extras.condition: + if extras.unary_response is _NOT_YET_OBSERVED: + read = handler.take_response() + if read.code is None: + extras.unary_response = read.response + return read.response + else: + raise _RpcErrorCall(handler) + else: + return extras.unary_response + + +def _exception(unused_handler): + raise NotImplementedError('TODO!') + + +def _traceback(unused_handler): + raise NotImplementedError('TODO!') + + +def _add_done_callback(handler, callback, future): + adapted_callback = lambda: callback(future) + if not handler.add_callback(adapted_callback): + callback(future) + + +class _FutureCall(grpc.Future, grpc.Call): + + def __init__(self, handler, extras): + self._handler = handler + self._extras = extras + + def cancel(self): + return _with_extras_cancel(self._handler, self._extras) + + def cancelled(self): + return _extras_without_cancelled(self._extras) + + def running(self): + return _running(self._handler) + + def done(self): + return _done(self._handler) + + def result(self): + return _with_extras_unary_response(self._handler, self._extras) + + def exception(self): + return _exception(self._handler) + + def traceback(self): + return _traceback(self._handler) + + def add_done_callback(self, fn): + _add_done_callback(self._handler, fn, self) + + def is_active(self): + return _is_active(self._handler) + + def time_remaining(self): + return _time_remaining(self._handler) + + def add_callback(self, callback): + return _add_callback(self._handler, callback) + + def initial_metadata(self): + return _initial_metadata(self._handler) + + def trailing_metadata(self): + return _trailing_metadata(self._handler) + + def code(self): + return _code(self._handler) + + def details(self): + return _details(self._handler) + + +def consume_requests(request_iterator, handler): + + def _consume(): + while True: + try: + request = next(request_iterator) + added = handler.add_request(request) + if not added: + break + except StopIteration: + handler.close_requests() + break + except Exception: # pylint: disable=broad-except + details = 'Exception iterating requests!' + logging.exception(details) + handler.cancel(grpc.StatusCode.UNKNOWN, details) + + consumption = threading.Thread(target=_consume) + consumption.start() + + +def blocking_unary_response(handler): + read = handler.take_response() + if read.code is None: + unused_trailing_metadata, code, unused_details = handler.termination() + if code is grpc.StatusCode.OK: + return read.response + else: + raise _RpcErrorCall(handler) + else: + raise _RpcErrorCall(handler) + + +def blocking_unary_response_with_call(handler): + read = handler.take_response() + if read.code is None: + unused_trailing_metadata, code, unused_details = handler.termination() + if code is grpc.StatusCode.OK: + return read.response, _Call(handler) + else: + raise _RpcErrorCall(handler) + else: + raise _RpcErrorCall(handler) + + +def future_call(handler): + return _FutureCall(handler, _HandlerExtras()) + + +class ResponseIteratorCall(grpc.Call): + + def __init__(self, handler): + self._handler = handler + + def __iter__(self): + return self + + def __next__(self): + return _next(self._handler) + + def next(self): + return _next(self._handler) + + def cancel(self): + _cancel(self._handler) + + def is_active(self): + return _is_active(self._handler) + + def time_remaining(self): + return _time_remaining(self._handler) + + def add_callback(self, callback): + return _add_callback(self._handler, callback) + + def initial_metadata(self): + return _initial_metadata(self._handler) + + def trailing_metadata(self): + return _trailing_metadata(self._handler) + + def code(self): + return _code(self._handler) + + def details(self): + return _details(self._handler) diff --git a/src/python/grpcio_testing/grpc_testing/_channel/_multi_callable.py b/src/python/grpcio_testing/grpc_testing/_channel/_multi_callable.py new file mode 100644 index 0000000000..fe69257f5b --- /dev/null +++ b/src/python/grpcio_testing/grpc_testing/_channel/_multi_callable.py @@ -0,0 +1,115 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc +from grpc_testing import _common +from grpc_testing._channel import _invocation + +# All per-call credentials parameters are unused by this test infrastructure. +# pylint: disable=unused-argument +class UnaryUnary(grpc.UnaryUnaryMultiCallable): + + def __init__(self, method_full_rpc_name, channel_handler): + self._method_full_rpc_name = method_full_rpc_name + self._channel_handler = channel_handler + + def __call__(self, request, timeout=None, metadata=None, credentials=None): + rpc_handler = self._channel_handler.invoke_rpc( + self._method_full_rpc_name, _common.fuss_with_metadata(metadata), + [request], True, timeout) + return _invocation.blocking_unary_response(rpc_handler) + + def with_call(self, request, timeout=None, metadata=None, credentials=None): + rpc_handler = self._channel_handler.invoke_rpc( + self._method_full_rpc_name, _common.fuss_with_metadata(metadata), + [request], True, timeout) + return _invocation.blocking_unary_response_with_call(rpc_handler) + + def future(self, request, timeout=None, metadata=None, credentials=None): + rpc_handler = self._channel_handler.invoke_rpc( + self._method_full_rpc_name, _common.fuss_with_metadata(metadata), + [request], True, timeout) + return _invocation.future_call(rpc_handler) + + +class UnaryStream(grpc.StreamStreamMultiCallable): + + def __init__(self, method_full_rpc_name, channel_handler): + self._method_full_rpc_name = method_full_rpc_name + self._channel_handler = channel_handler + + def __call__(self, request, timeout=None, metadata=None, credentials=None): + rpc_handler = self._channel_handler.invoke_rpc( + self._method_full_rpc_name, + _common.fuss_with_metadata(metadata), [request], True, timeout) + return _invocation.ResponseIteratorCall(rpc_handler) + + +class StreamUnary(grpc.StreamUnaryMultiCallable): + + def __init__(self, method_full_rpc_name, channel_handler): + self._method_full_rpc_name = method_full_rpc_name + self._channel_handler = channel_handler + + def __call__(self, + request_iterator, + timeout=None, + metadata=None, + credentials=None): + rpc_handler = self._channel_handler.invoke_rpc( + self._method_full_rpc_name, + _common.fuss_with_metadata(metadata), [], False, timeout) + _invocation.consume_requests(request_iterator, rpc_handler) + return _invocation.blocking_unary_response(rpc_handler) + + def with_call(self, + request_iterator, + timeout=None, + metadata=None, + credentials=None): + rpc_handler = self._channel_handler.invoke_rpc( + self._method_full_rpc_name, + _common.fuss_with_metadata(metadata), [], False, timeout) + _invocation.consume_requests(request_iterator, rpc_handler) + return _invocation.blocking_unary_response_with_call(rpc_handler) + + def future(self, + request_iterator, + timeout=None, + metadata=None, + credentials=None): + rpc_handler = self._channel_handler.invoke_rpc( + self._method_full_rpc_name, + _common.fuss_with_metadata(metadata), [], False, timeout) + _invocation.consume_requests(request_iterator, rpc_handler) + return _invocation.future_call(rpc_handler) + + +class StreamStream(grpc.StreamStreamMultiCallable): + + def __init__(self, method_full_rpc_name, channel_handler): + self._method_full_rpc_name = method_full_rpc_name + self._channel_handler = channel_handler + + def __call__(self, + request_iterator, + timeout=None, + metadata=None, + credentials=None): + rpc_handler = self._channel_handler.invoke_rpc( + self._method_full_rpc_name, + _common.fuss_with_metadata(metadata), [], False, timeout) + _invocation.consume_requests(request_iterator, rpc_handler) + return _invocation.ResponseIteratorCall(rpc_handler) +# pylint: enable=unused-argument diff --git a/src/python/grpcio_testing/grpc_testing/_channel/_rpc_state.py b/src/python/grpcio_testing/grpc_testing/_channel/_rpc_state.py new file mode 100644 index 0000000000..e1fa49a2a8 --- /dev/null +++ b/src/python/grpcio_testing/grpc_testing/_channel/_rpc_state.py @@ -0,0 +1,193 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import threading + +import grpc +from grpc_testing import _common + + +class State(_common.ChannelRpcHandler): + + def __init__(self, invocation_metadata, requests, requests_closed): + self._condition = threading.Condition() + self._invocation_metadata = invocation_metadata + self._requests = requests + self._requests_closed = requests_closed + self._initial_metadata = None + self._responses = [] + self._trailing_metadata = None + self._code = None + self._details = None + + def initial_metadata(self): + with self._condition: + while True: + if self._initial_metadata is None: + if self._code is None: + self._condition.wait() + else: + return _common.FUSSED_EMPTY_METADATA + else: + return self._initial_metadata + + def add_request(self, request): + with self._condition: + if self._code is None and not self._requests_closed: + self._requests.append(request) + self._condition.notify_all() + return True + else: + return False + + def close_requests(self): + with self._condition: + if self._code is None and not self._requests_closed: + self._requests_closed = True + self._condition.notify_all() + + def take_response(self): + with self._condition: + while True: + if self._code is grpc.StatusCode.OK: + if self._responses: + response = self._responses.pop(0) + return _common.ChannelRpcRead( + response, None, None, None) + else: + return _common.ChannelRpcRead( + None, self._trailing_metadata, + grpc.StatusCode.OK, self._details) + elif self._code is None: + if self._responses: + response = self._responses.pop(0) + return _common.ChannelRpcRead( + response, None, None, None) + else: + self._condition.wait() + else: + return _common.ChannelRpcRead( + None, self._trailing_metadata, self._code, + self._details) + + def termination(self): + with self._condition: + while True: + if self._code is None: + self._condition.wait() + else: + return self._trailing_metadata, self._code, self._details + + def cancel(self, code, details): + with self._condition: + if self._code is None: + if self._initial_metadata is None: + self._initial_metadata = _common.FUSSED_EMPTY_METADATA + self._trailing_metadata = _common.FUSSED_EMPTY_METADATA + self._code = code + self._details = details + self._condition.notify_all() + return True + else: + return False + + def take_invocation_metadata(self): + with self._condition: + if self._invocation_metadata is None: + raise ValueError('Expected invocation metadata!') + else: + invocation_metadata = self._invocation_metadata + self._invocation_metadata = None + return invocation_metadata + + def take_invocation_metadata_and_request(self): + with self._condition: + if self._invocation_metadata is None: + raise ValueError('Expected invocation metadata!') + elif not self._requests: + raise ValueError('Expected at least one request!') + else: + invocation_metadata = self._invocation_metadata + self._invocation_metadata = None + return invocation_metadata, self._requests.pop(0) + + def send_initial_metadata(self, initial_metadata): + with self._condition: + self._initial_metadata = _common.fuss_with_metadata( + initial_metadata) + self._condition.notify_all() + + def take_request(self): + with self._condition: + while True: + if self._requests: + return self._requests.pop(0) + else: + self._condition.wait() + + def requests_closed(self): + with self._condition: + while True: + if self._requests_closed: + return + else: + self._condition.wait() + + def send_response(self, response): + with self._condition: + if self._code is None: + self._responses.append(response) + self._condition.notify_all() + + def terminate_with_response( + self, response, trailing_metadata, code, details): + with self._condition: + if self._initial_metadata is None: + self._initial_metadata = _common.FUSSED_EMPTY_METADATA + self._responses.append(response) + self._trailing_metadata = _common.fuss_with_metadata( + trailing_metadata) + self._code = code + self._details = details + self._condition.notify_all() + + def terminate(self, trailing_metadata, code, details): + with self._condition: + if self._initial_metadata is None: + self._initial_metadata = _common.FUSSED_EMPTY_METADATA + self._trailing_metadata = _common.fuss_with_metadata( + trailing_metadata) + self._code = code + self._details = details + self._condition.notify_all() + + def cancelled(self): + with self._condition: + while True: + if self._code is grpc.StatusCode.CANCELLED: + return + elif self._code is None: + self._condition.wait() + else: + raise ValueError( + 'Status code unexpectedly {}!'.format(self._code)) + + def is_active(self): + raise NotImplementedError() + + def time_remaining(self): + raise NotImplementedError() + + def add_callback(self, callback): + raise NotImplementedError() diff --git a/src/python/grpcio_testing/grpc_testing/_common.py b/src/python/grpcio_testing/grpc_testing/_common.py new file mode 100644 index 0000000000..cb4a7f5fa2 --- /dev/null +++ b/src/python/grpcio_testing/grpc_testing/_common.py @@ -0,0 +1,92 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Common interfaces and implementation.""" + +import abc +import collections + +import six + + +def _fuss(tuplified_metadata): + return tuplified_metadata + ( + ( + 'grpc.metadata_added_by_runtime', + 'gRPC is allowed to add metadata in transmission and does so.', + ), + ) + +FUSSED_EMPTY_METADATA = _fuss(()) + + +def fuss_with_metadata(metadata): + if metadata is None: + return FUSSED_EMPTY_METADATA + else: + return _fuss(tuple(metadata)) + + +class ChannelRpcRead( + collections.namedtuple( + 'ChannelRpcRead', + ('response', 'trailing_metadata', 'code', 'details',))): + pass + + +class ChannelRpcHandler(six.with_metaclass(abc.ABCMeta)): + + @abc.abstractmethod + def initial_metadata(self): + raise NotImplementedError() + + @abc.abstractmethod + def add_request(self, request): + raise NotImplementedError() + + @abc.abstractmethod + def close_requests(self): + raise NotImplementedError() + + @abc.abstractmethod + def take_response(self): + raise NotImplementedError() + + @abc.abstractmethod + def cancel(self, code, details): + raise NotImplementedError() + + @abc.abstractmethod + def termination(self): + raise NotImplementedError() + + @abc.abstractmethod + def is_active(self): + raise NotImplementedError() + + @abc.abstractmethod + def time_remaining(self): + raise NotImplementedError() + + @abc.abstractmethod + def add_callback(self, callback): + raise NotImplementedError() + + +class ChannelHandler(six.with_metaclass(abc.ABCMeta)): + + @abc.abstractmethod + def invoke_rpc( + self, method_full_rpc_name, invocation_metadata, requests, + requests_closed, timeout): + raise NotImplementedError() diff --git a/src/python/grpcio_testing/grpc_testing/_time.py b/src/python/grpcio_testing/grpc_testing/_time.py new file mode 100644 index 0000000000..3b1ab4bcd8 --- /dev/null +++ b/src/python/grpcio_testing/grpc_testing/_time.py @@ -0,0 +1,224 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Test times.""" + +import collections +import logging +import threading +import time as _time + +import grpc +import grpc_testing + + +def _call(behaviors): + for behavior in behaviors: + try: + behavior() + except Exception: # pylint: disable=broad-except + logging.exception('Exception calling behavior "%r"!', behavior) + + +def _call_in_thread(behaviors): + calling = threading.Thread(target=_call, args=(behaviors,)) + calling.start() + # NOTE(nathaniel): Because this function is called from "strict" Time + # implementations, it blocks until after all behaviors have terminated. + calling.join() + + +class _State(object): + + def __init__(self): + self.condition = threading.Condition() + self.times_to_behaviors = collections.defaultdict(list) + + +class _Delta( + collections.namedtuple('_Delta', + ('mature_behaviors', 'earliest_mature_time', + 'earliest_immature_time',))): + pass + + +def _process(state, now): + mature_behaviors = [] + earliest_mature_time = None + while state.times_to_behaviors: + earliest_time = min(state.times_to_behaviors) + if earliest_time <= now: + if earliest_mature_time is None: + earliest_mature_time = earliest_time + earliest_mature_behaviors = state.times_to_behaviors.pop( + earliest_time) + mature_behaviors.extend(earliest_mature_behaviors) + else: + earliest_immature_time = earliest_time + break + else: + earliest_immature_time = None + return _Delta(mature_behaviors, earliest_mature_time, + earliest_immature_time) + + +class _Future(grpc.Future): + + def __init__(self, state, behavior, time): + self._state = state + self._behavior = behavior + self._time = time + self._cancelled = False + + def cancel(self): + with self._state.condition: + if self._cancelled: + return True + else: + behaviors_at_time = self._state.times_to_behaviors.get( + self._time) + if behaviors_at_time is None: + return False + else: + behaviors_at_time.remove(self._behavior) + if not behaviors_at_time: + self._state.times_to_behaviors.pop(self._time) + self._state.condition.notify_all() + self._cancelled = True + return True + + def cancelled(self): + with self._state.condition: + return self._cancelled + + def running(self): + raise NotImplementedError() + + def done(self): + raise NotImplementedError() + + def result(self, timeout=None): + raise NotImplementedError() + + def exception(self, timeout=None): + raise NotImplementedError() + + def traceback(self, timeout=None): + raise NotImplementedError() + + def add_done_callback(self, fn): + raise NotImplementedError() + + +class StrictRealTime(grpc_testing.Time): + + def __init__(self): + self._state = _State() + self._active = False + self._calling = None + + def _activity(self): + while True: + with self._state.condition: + while True: + now = _time.time() + delta = _process(self._state, now) + self._state.condition.notify_all() + if delta.mature_behaviors: + self._calling = delta.earliest_mature_time + break + self._calling = None + if delta.earliest_immature_time is None: + self._active = False + return + else: + timeout = max(0, delta.earliest_immature_time - now) + self._state.condition.wait(timeout=timeout) + _call(delta.mature_behaviors) + + def _ensure_called_through(self, time): + with self._state.condition: + while ((self._state.times_to_behaviors and + min(self._state.times_to_behaviors) < time) or + (self._calling is not None and self._calling < time)): + self._state.condition.wait() + + def _call_at(self, behavior, time): + with self._state.condition: + self._state.times_to_behaviors[time].append(behavior) + if self._active: + self._state.condition.notify_all() + else: + activity = threading.Thread(target=self._activity) + activity.start() + self._active = True + return _Future(self._state, behavior, time) + + def time(self): + return _time.time() + + def call_in(self, behavior, delay): + return self._call_at(behavior, _time.time() + delay) + + def call_at(self, behavior, time): + return self._call_at(behavior, time) + + def sleep_for(self, duration): + time = _time.time() + duration + _time.sleep(duration) + self._ensure_called_through(time) + + def sleep_until(self, time): + _time.sleep(max(0, time - _time.time())) + self._ensure_called_through(time) + + +class StrictFakeTime(grpc_testing.Time): + + def __init__(self, time): + self._state = _State() + self._time = time + + def time(self): + return self._time + + def call_in(self, behavior, delay): + if delay <= 0: + _call_in_thread((behavior,)) + else: + with self._state.condition: + time = self._time + delay + self._state.times_to_behaviors[time].append(behavior) + return _Future(self._state, behavior, time) + + def call_at(self, behavior, time): + with self._state.condition: + if time <= self._time: + _call_in_thread((behavior,)) + else: + self._state.times_to_behaviors[time].append(behavior) + return _Future(self._state, behavior, time) + + def sleep_for(self, duration): + if 0 < duration: + with self._state.condition: + self._time += duration + delta = _process(self._state, self._time) + _call_in_thread(delta.mature_behaviors) + + def sleep_until(self, time): + with self._state.condition: + if self._time < time: + self._time = time + delta = _process(self._state, self._time) + _call_in_thread(delta.mature_behaviors) diff --git a/src/python/grpcio_testing/grpc_version.py b/src/python/grpcio_testing/grpc_version.py new file mode 100644 index 0000000000..41a75d46f6 --- /dev/null +++ b/src/python/grpcio_testing/grpc_version.py @@ -0,0 +1,17 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_reflection/grpc_version.py.template`!!! + +VERSION = '1.5.0.dev0' diff --git a/src/python/grpcio_testing/setup.py b/src/python/grpcio_testing/setup.py new file mode 100644 index 0000000000..0cc336abd1 --- /dev/null +++ b/src/python/grpcio_testing/setup.py @@ -0,0 +1,44 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Setup module for gRPC Python's testing package.""" + +import os +import sys + +import setuptools + +# Ensure we're in the proper directory whether or not we're being used by pip. +os.chdir(os.path.dirname(os.path.abspath(__file__))) + +# Break import style to ensure that we can find same-directory modules. +import grpc_version + +PACKAGE_DIRECTORIES = { + '': '.', +} + +INSTALL_REQUIRES = ('protobuf>=3.3.0', + 'grpcio>={version}'.format(version=grpc_version.VERSION),) + +setuptools.setup( + name='grpcio-testing', + version=grpc_version.VERSION, + license='Apache License 2.0', + description='Testing utilities for gRPC Python', + author='The gRPC Authors', + author_email='grpc-io@googlegroups.com', + url='https://grpc.io', + package_dir=PACKAGE_DIRECTORIES, + packages=setuptools.find_packages('.'), + install_requires=INSTALL_REQUIRES) diff --git a/src/python/grpcio_tests/grpc_version.py b/src/python/grpcio_tests/grpc_version.py index fd47b426ac..9e54dc9f75 100644 --- a/src/python/grpcio_tests/grpc_version.py +++ b/src/python/grpcio_tests/grpc_version.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_tests/grpc_version.py.template`!!! -VERSION='1.5.0.dev0' +VERSION='1.7.0.dev0' diff --git a/src/python/grpcio_tests/setup.py b/src/python/grpcio_tests/setup.py index adc909ccdc..debe14c40e 100644 --- a/src/python/grpcio_tests/setup.py +++ b/src/python/grpcio_tests/setup.py @@ -68,6 +68,10 @@ PACKAGE_DATA = { 'tests.protoc_plugin.protos.invocation_testing.split_services': [ 'services.proto', ], + 'tests.testing.proto': [ + 'requests.proto', + 'services.proto', + ], 'tests.unit': [ 'credentials/ca.pem', 'credentials/server1.key', diff --git a/src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py b/src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py index 299ce75e79..a86743fa5a 100644 --- a/src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py +++ b/src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py @@ -95,9 +95,6 @@ class ReflectionServicerTest(unittest.TestCase): )),) self.assertSequenceEqual(expected_responses, responses) - @unittest.skip( - 'TODO(mmx): enable when (pure) python protobuf issue is fixed' - '(see https://github.com/google/protobuf/issues/2882)') def testFileContainingExtension(self): requests = (reflection_pb2.ServerReflectionRequest( file_containing_extension=reflection_pb2.ExtensionRequest( diff --git a/src/python/grpcio_tests/tests/testing/__init__.py b/src/python/grpcio_tests/tests/testing/__init__.py new file mode 100644 index 0000000000..1e120359cf --- /dev/null +++ b/src/python/grpcio_tests/tests/testing/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/python/grpcio_tests/tests/testing/_application_common.py b/src/python/grpcio_tests/tests/testing/_application_common.py new file mode 100644 index 0000000000..4e98879607 --- /dev/null +++ b/src/python/grpcio_tests/tests/testing/_application_common.py @@ -0,0 +1,36 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""An example gRPC Python-using application's common code elements.""" + +from tests.testing.proto import requests_pb2 +from tests.testing.proto import services_pb2 + +SERVICE_NAME = 'tests_of_grpc_testing.FirstService' +UNARY_UNARY_METHOD_NAME = 'UnUn' +UNARY_STREAM_METHOD_NAME = 'UnStre' +STREAM_UNARY_METHOD_NAME = 'StreUn' +STREAM_STREAM_METHOD_NAME = 'StreStre' + +UNARY_UNARY_REQUEST = requests_pb2.Up(first_up_field=2) +ERRONEOUS_UNARY_UNARY_REQUEST = requests_pb2.Up(first_up_field=3) +UNARY_UNARY_RESPONSE = services_pb2.Down(first_down_field=5) +ERRONEOUS_UNARY_UNARY_RESPONSE = services_pb2.Down(first_down_field=7) +UNARY_STREAM_REQUEST = requests_pb2.Charm(first_charm_field=11) +STREAM_UNARY_REQUEST = requests_pb2.Charm(first_charm_field=13) +STREAM_UNARY_RESPONSE = services_pb2.Strange(first_strange_field=17) +STREAM_STREAM_REQUEST = requests_pb2.Top(first_top_field=19) +STREAM_STREAM_RESPONSE = services_pb2.Bottom(first_bottom_field=23) +TWO_STREAM_STREAM_RESPONSES = (STREAM_STREAM_RESPONSE,) * 2 + +INFINITE_REQUEST_STREAM_TIMEOUT = 0.2 diff --git a/src/python/grpcio_tests/tests/testing/_application_testing_common.py b/src/python/grpcio_tests/tests/testing/_application_testing_common.py new file mode 100644 index 0000000000..9c9e485a78 --- /dev/null +++ b/src/python/grpcio_tests/tests/testing/_application_testing_common.py @@ -0,0 +1,33 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc_testing + +from tests.testing.proto import requests_pb2 +from tests.testing.proto import services_pb2 + +# TODO(https://github.com/grpc/grpc/issues/11657): Eliminate this entirely. +# TODO(https://github.com/google/protobuf/issues/3452): Eliminate this if/else. +if services_pb2.DESCRIPTOR.services_by_name.get('FirstService') is None: + FIRST_SERVICE = 'Fix protobuf issue 3452!' + FIRST_SERVICE_UNUN = 'Fix protobuf issue 3452!' + FIRST_SERVICE_UNSTRE = 'Fix protobuf issue 3452!' + FIRST_SERVICE_STREUN = 'Fix protobuf issue 3452!' + FIRST_SERVICE_STRESTRE = 'Fix protobuf issue 3452!' +else: + FIRST_SERVICE = services_pb2.DESCRIPTOR.services_by_name['FirstService'] + FIRST_SERVICE_UNUN = FIRST_SERVICE.methods_by_name['UnUn'] + FIRST_SERVICE_UNSTRE = FIRST_SERVICE.methods_by_name['UnStre'] + FIRST_SERVICE_STREUN = FIRST_SERVICE.methods_by_name['StreUn'] + FIRST_SERVICE_STRESTRE = FIRST_SERVICE.methods_by_name['StreStre'] diff --git a/src/python/grpcio_tests/tests/testing/_client_application.py b/src/python/grpcio_tests/tests/testing/_client_application.py new file mode 100644 index 0000000000..aff32fb4dc --- /dev/null +++ b/src/python/grpcio_tests/tests/testing/_client_application.py @@ -0,0 +1,260 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""An example gRPC Python-using client-side application.""" + +import collections +import enum +import threading +import time + +import grpc +from tests.unit.framework.common import test_constants + +from tests.testing.proto import requests_pb2 +from tests.testing.proto import services_pb2 +from tests.testing.proto import services_pb2_grpc + +from tests.testing import _application_common + + +@enum.unique +class Scenario(enum.Enum): + UNARY_UNARY = 'unary unary' + UNARY_STREAM = 'unary stream' + STREAM_UNARY = 'stream unary' + STREAM_STREAM = 'stream stream' + CONCURRENT_STREAM_UNARY = 'concurrent stream unary' + CONCURRENT_STREAM_STREAM = 'concurrent stream stream' + CANCEL_UNARY_UNARY = 'cancel unary unary' + CANCEL_UNARY_STREAM = 'cancel unary stream' + INFINITE_REQUEST_STREAM = 'infinite request stream' + + +class Outcome(collections.namedtuple('Outcome', ('kind', 'code', 'details'))): + """Outcome of a client application scenario. + + Attributes: + kind: A Kind value describing the overall kind of scenario execution. + code: A grpc.StatusCode value. Only valid if kind is Kind.RPC_ERROR. + details: A status details string. Only valid if kind is Kind.RPC_ERROR. + """ + + @enum.unique + class Kind(enum.Enum): + SATISFACTORY = 'satisfactory' + UNSATISFACTORY = 'unsatisfactory' + RPC_ERROR = 'rpc error' + + +_SATISFACTORY_OUTCOME = Outcome(Outcome.Kind.SATISFACTORY, None, None) +_UNSATISFACTORY_OUTCOME = Outcome(Outcome.Kind.UNSATISFACTORY, None, None) + + +class _Pipe(object): + + def __init__(self): + self._condition = threading.Condition() + self._values = [] + self._open = True + + def __iter__(self): + return self + + def _next(self): + with self._condition: + while True: + if self._values: + return self._values.pop(0) + elif not self._open: + raise StopIteration() + else: + self._condition.wait() + + def __next__(self): # (Python 3 Iterator Protocol) + return self._next() + + def next(self): # (Python 2 Iterator Protocol) + return self._next() + + def add(self, value): + with self._condition: + self._values.append(value) + self._condition.notify_all() + + def close(self): + with self._condition: + self._open = False + self._condition.notify_all() + + +def _run_unary_unary(stub): + response = stub.UnUn(_application_common.UNARY_UNARY_REQUEST) + if _application_common.UNARY_UNARY_RESPONSE == response: + return _SATISFACTORY_OUTCOME + else: + return _UNSATISFACTORY_OUTCOME + + +def _run_unary_stream(stub): + response_iterator = stub.UnStre(_application_common.UNARY_STREAM_REQUEST) + try: + next(response_iterator) + except StopIteration: + return _SATISFACTORY_OUTCOME + else: + return _UNSATISFACTORY_OUTCOME + + +def _run_stream_unary(stub): + response, call = stub.StreUn.with_call( + iter((_application_common.STREAM_UNARY_REQUEST,) * 3)) + if (_application_common.STREAM_UNARY_RESPONSE == response and + call.code() is grpc.StatusCode.OK): + return _SATISFACTORY_OUTCOME + else: + return _UNSATISFACTORY_OUTCOME + + +def _run_stream_stream(stub): + request_pipe = _Pipe() + response_iterator = stub.StreStre(iter(request_pipe)) + request_pipe.add(_application_common.STREAM_STREAM_REQUEST) + first_responses = next(response_iterator), next(response_iterator), + request_pipe.add(_application_common.STREAM_STREAM_REQUEST) + second_responses = next(response_iterator), next(response_iterator), + request_pipe.close() + try: + next(response_iterator) + except StopIteration: + unexpected_extra_response = False + else: + unexpected_extra_response = True + if (first_responses == _application_common.TWO_STREAM_STREAM_RESPONSES and + second_responses == _application_common.TWO_STREAM_STREAM_RESPONSES + and not unexpected_extra_response): + return _SATISFACTORY_OUTCOME + else: + return _UNSATISFACTORY_OUTCOME + + +def _run_concurrent_stream_unary(stub): + future_calls = tuple( + stub.StreUn.future( + iter((_application_common.STREAM_UNARY_REQUEST,) * 3)) + for _ in range(test_constants.THREAD_CONCURRENCY)) + for future_call in future_calls: + if future_call.code() is grpc.StatusCode.OK: + response = future_call.result() + if _application_common.STREAM_UNARY_RESPONSE != response: + return _UNSATISFACTORY_OUTCOME + else: + return _UNSATISFACTORY_OUTCOME + else: + return _SATISFACTORY_OUTCOME + + +def _run_concurrent_stream_stream(stub): + condition = threading.Condition() + outcomes = [None] * test_constants.RPC_CONCURRENCY + + def run_stream_stream(index): + outcome = _run_stream_stream(stub) + with condition: + outcomes[index] = outcome + condition.notify() + + for index in range(test_constants.RPC_CONCURRENCY): + thread = threading.Thread(target=run_stream_stream, args=(index,)) + thread.start() + with condition: + while True: + if all(outcomes): + for outcome in outcomes: + if outcome.kind is not Outcome.Kind.SATISFACTORY: + return _UNSATISFACTORY_OUTCOME + else: + return _SATISFACTORY_OUTCOME + else: + condition.wait() + + +def _run_cancel_unary_unary(stub): + response_future_call = stub.UnUn.future( + _application_common.UNARY_UNARY_REQUEST) + initial_metadata = response_future_call.initial_metadata() + cancelled = response_future_call.cancel() + if initial_metadata is not None and cancelled: + return _SATISFACTORY_OUTCOME + else: + return _UNSATISFACTORY_OUTCOME + + +def _run_infinite_request_stream(stub): + + def infinite_request_iterator(): + while True: + yield _application_common.STREAM_UNARY_REQUEST + + response_future_call = stub.StreUn.future( + infinite_request_iterator(), + timeout=_application_common.INFINITE_REQUEST_STREAM_TIMEOUT) + if response_future_call.code() is grpc.StatusCode.DEADLINE_EXCEEDED: + return _SATISFACTORY_OUTCOME + else: + return _UNSATISFACTORY_OUTCOME + + +def run(scenario, channel): + stub = services_pb2_grpc.FirstServiceStub(channel) + try: + if scenario is Scenario.UNARY_UNARY: + return _run_unary_unary(stub) + elif scenario is Scenario.UNARY_STREAM: + return _run_unary_stream(stub) + elif scenario is Scenario.STREAM_UNARY: + return _run_stream_unary(stub) + elif scenario is Scenario.STREAM_STREAM: + return _run_stream_stream(stub) + elif scenario is Scenario.CONCURRENT_STREAM_UNARY: + return _run_concurrent_stream_unary(stub) + elif scenario is Scenario.CONCURRENT_STREAM_STREAM: + return _run_concurrent_stream_stream(stub) + elif scenario is Scenario.CANCEL_UNARY_UNARY: + return _run_cancel_unary_unary(stub) + elif scenario is Scenario.INFINITE_REQUEST_STREAM: + return _run_infinite_request_stream(stub) + except grpc.RpcError as rpc_error: + return Outcome(Outcome.Kind.RPC_ERROR, + rpc_error.code(), rpc_error.details()) + + +_IMPLEMENTATIONS = { + Scenario.UNARY_UNARY: _run_unary_unary, + Scenario.UNARY_STREAM: _run_unary_stream, + Scenario.STREAM_UNARY: _run_stream_unary, + Scenario.STREAM_STREAM: _run_stream_stream, + Scenario.CONCURRENT_STREAM_UNARY: _run_concurrent_stream_unary, + Scenario.CONCURRENT_STREAM_STREAM: _run_concurrent_stream_stream, + Scenario.CANCEL_UNARY_UNARY: _run_cancel_unary_unary, + Scenario.INFINITE_REQUEST_STREAM: _run_infinite_request_stream, +} + + +def run(scenario, channel): + stub = services_pb2_grpc.FirstServiceStub(channel) + try: + return _IMPLEMENTATIONS[scenario](stub) + except grpc.RpcError as rpc_error: + return Outcome(Outcome.Kind.RPC_ERROR, + rpc_error.code(), rpc_error.details()) diff --git a/src/python/grpcio_tests/tests/testing/_client_test.py b/src/python/grpcio_tests/tests/testing/_client_test.py new file mode 100644 index 0000000000..172f386d7b --- /dev/null +++ b/src/python/grpcio_tests/tests/testing/_client_test.py @@ -0,0 +1,306 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from concurrent import futures +import time +import unittest + +import grpc +from grpc.framework.foundation import logging_pool +from tests.unit.framework.common import test_constants +import grpc_testing + +from tests.testing import _application_common +from tests.testing import _application_testing_common +from tests.testing import _client_application +from tests.testing.proto import requests_pb2 +from tests.testing.proto import services_pb2 + + +# TODO(https://github.com/google/protobuf/issues/3452): Drop this skip. +@unittest.skipIf( + services_pb2.DESCRIPTOR.services_by_name.get('FirstService') is None, + 'Fix protobuf issue 3452!') +class ClientTest(unittest.TestCase): + + def setUp(self): + # In this test the client-side application under test executes in + # a separate thread while we retain use of the test thread to "play + # server". + self._client_execution_thread_pool = logging_pool.pool(1) + + self._fake_time = grpc_testing.strict_fake_time(time.time()) + self._real_time = grpc_testing.strict_real_time() + self._fake_time_channel = grpc_testing.channel( + services_pb2.DESCRIPTOR.services_by_name.values(), self._fake_time) + self._real_time_channel = grpc_testing.channel( + services_pb2.DESCRIPTOR.services_by_name.values(), self._real_time) + + def tearDown(self): + self._client_execution_thread_pool.shutdown(wait=True) + + def test_successful_unary_unary(self): + application_future = self._client_execution_thread_pool.submit( + _client_application.run, _client_application.Scenario.UNARY_UNARY, + self._real_time_channel) + invocation_metadata, request, rpc = ( + self._real_time_channel.take_unary_unary( + _application_testing_common.FIRST_SERVICE_UNUN)) + rpc.send_initial_metadata(()) + rpc.terminate(_application_common.UNARY_UNARY_RESPONSE, (), + grpc.StatusCode.OK, '') + application_return_value = application_future.result() + + self.assertEqual(_application_common.UNARY_UNARY_REQUEST, request) + self.assertIs(application_return_value.kind, + _client_application.Outcome.Kind.SATISFACTORY) + + def test_successful_unary_stream(self): + application_future = self._client_execution_thread_pool.submit( + _client_application.run, _client_application.Scenario.UNARY_STREAM, + self._fake_time_channel) + invocation_metadata, request, rpc = ( + self._fake_time_channel.take_unary_stream( + _application_testing_common.FIRST_SERVICE_UNSTRE)) + rpc.send_initial_metadata(()) + rpc.terminate((), grpc.StatusCode.OK, '') + application_return_value = application_future.result() + + self.assertEqual(_application_common.UNARY_STREAM_REQUEST, request) + self.assertIs(application_return_value.kind, + _client_application.Outcome.Kind.SATISFACTORY) + + def test_successful_stream_unary(self): + application_future = self._client_execution_thread_pool.submit( + _client_application.run, _client_application.Scenario.STREAM_UNARY, + self._real_time_channel) + invocation_metadata, rpc = self._real_time_channel.take_stream_unary( + _application_testing_common.FIRST_SERVICE_STREUN) + rpc.send_initial_metadata(()) + first_request = rpc.take_request() + second_request = rpc.take_request() + third_request = rpc.take_request() + rpc.requests_closed() + rpc.terminate(_application_common.STREAM_UNARY_RESPONSE, (), + grpc.StatusCode.OK, '') + application_return_value = application_future.result() + + self.assertEqual(_application_common.STREAM_UNARY_REQUEST, + first_request) + self.assertEqual(_application_common.STREAM_UNARY_REQUEST, + second_request) + self.assertEqual(_application_common.STREAM_UNARY_REQUEST, + third_request) + self.assertIs(application_return_value.kind, + _client_application.Outcome.Kind.SATISFACTORY) + + def test_successful_stream_stream(self): + application_future = self._client_execution_thread_pool.submit( + _client_application.run, _client_application.Scenario.STREAM_STREAM, + self._fake_time_channel) + invocation_metadata, rpc = self._fake_time_channel.take_stream_stream( + _application_testing_common.FIRST_SERVICE_STRESTRE) + first_request = rpc.take_request() + rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) + rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) + second_request = rpc.take_request() + rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) + rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) + rpc.requests_closed() + rpc.terminate((), grpc.StatusCode.OK, '') + application_return_value = application_future.result() + + self.assertEqual(_application_common.STREAM_STREAM_REQUEST, + first_request) + self.assertEqual(_application_common.STREAM_STREAM_REQUEST, + second_request) + self.assertIs(application_return_value.kind, + _client_application.Outcome.Kind.SATISFACTORY) + + def test_concurrent_stream_stream(self): + application_future = self._client_execution_thread_pool.submit( + _client_application.run, + _client_application.Scenario.CONCURRENT_STREAM_STREAM, + self._real_time_channel) + rpcs = [] + for _ in range(test_constants.RPC_CONCURRENCY): + invocation_metadata, rpc = ( + self._real_time_channel.take_stream_stream( + _application_testing_common.FIRST_SERVICE_STRESTRE)) + rpcs.append(rpc) + requests = {} + for rpc in rpcs: + requests[rpc] = [rpc.take_request()] + for rpc in rpcs: + rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) + rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) + for rpc in rpcs: + requests[rpc].append(rpc.take_request()) + for rpc in rpcs: + rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) + rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) + for rpc in rpcs: + rpc.requests_closed() + for rpc in rpcs: + rpc.terminate((), grpc.StatusCode.OK, '') + application_return_value = application_future.result() + + for requests_of_one_rpc in requests.values(): + for request in requests_of_one_rpc: + self.assertEqual(_application_common.STREAM_STREAM_REQUEST, + request) + self.assertIs(application_return_value.kind, + _client_application.Outcome.Kind.SATISFACTORY) + + def test_cancelled_unary_unary(self): + application_future = self._client_execution_thread_pool.submit( + _client_application.run, + _client_application.Scenario.CANCEL_UNARY_UNARY, + self._fake_time_channel) + invocation_metadata, request, rpc = ( + self._fake_time_channel.take_unary_unary( + _application_testing_common.FIRST_SERVICE_UNUN)) + rpc.send_initial_metadata(()) + rpc.cancelled() + application_return_value = application_future.result() + + self.assertEqual(_application_common.UNARY_UNARY_REQUEST, request) + self.assertIs(application_return_value.kind, + _client_application.Outcome.Kind.SATISFACTORY) + + def test_status_stream_unary(self): + application_future = self._client_execution_thread_pool.submit( + _client_application.run, + _client_application.Scenario.CONCURRENT_STREAM_UNARY, + self._fake_time_channel) + rpcs = tuple( + self._fake_time_channel.take_stream_unary( + _application_testing_common.FIRST_SERVICE_STREUN)[1] + for _ in range(test_constants.THREAD_CONCURRENCY)) + for rpc in rpcs: + rpc.take_request() + rpc.take_request() + rpc.take_request() + rpc.requests_closed() + rpc.send_initial_metadata(( + ('my_metadata_key', 'My Metadata Value!',),)) + for rpc in rpcs[:-1]: + rpc.terminate(_application_common.STREAM_UNARY_RESPONSE, (), + grpc.StatusCode.OK, '') + rpcs[-1].terminate(_application_common.STREAM_UNARY_RESPONSE, (), + grpc.StatusCode.RESOURCE_EXHAUSTED, + 'nope; not able to handle all those RPCs!') + application_return_value = application_future.result() + + self.assertIs(application_return_value.kind, + _client_application.Outcome.Kind.UNSATISFACTORY) + + def test_status_stream_stream(self): + code = grpc.StatusCode.DEADLINE_EXCEEDED + details = 'test deadline exceeded!' + + application_future = self._client_execution_thread_pool.submit( + _client_application.run, _client_application.Scenario.STREAM_STREAM, + self._real_time_channel) + invocation_metadata, rpc = self._real_time_channel.take_stream_stream( + _application_testing_common.FIRST_SERVICE_STRESTRE) + first_request = rpc.take_request() + rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) + rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) + second_request = rpc.take_request() + rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) + rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) + rpc.requests_closed() + rpc.terminate((), code, details) + application_return_value = application_future.result() + + self.assertEqual(_application_common.STREAM_STREAM_REQUEST, + first_request) + self.assertEqual(_application_common.STREAM_STREAM_REQUEST, + second_request) + self.assertIs(application_return_value.kind, + _client_application.Outcome.Kind.RPC_ERROR) + self.assertIs(application_return_value.code, code) + self.assertEqual(application_return_value.details, details) + + def test_misbehaving_server_unary_unary(self): + application_future = self._client_execution_thread_pool.submit( + _client_application.run, _client_application.Scenario.UNARY_UNARY, + self._fake_time_channel) + invocation_metadata, request, rpc = ( + self._fake_time_channel.take_unary_unary( + _application_testing_common.FIRST_SERVICE_UNUN)) + rpc.send_initial_metadata(()) + rpc.terminate(_application_common.ERRONEOUS_UNARY_UNARY_RESPONSE, (), + grpc.StatusCode.OK, '') + application_return_value = application_future.result() + + self.assertEqual(_application_common.UNARY_UNARY_REQUEST, request) + self.assertIs(application_return_value.kind, + _client_application.Outcome.Kind.UNSATISFACTORY) + + def test_misbehaving_server_stream_stream(self): + application_future = self._client_execution_thread_pool.submit( + _client_application.run, _client_application.Scenario.STREAM_STREAM, + self._real_time_channel) + invocation_metadata, rpc = self._real_time_channel.take_stream_stream( + _application_testing_common.FIRST_SERVICE_STRESTRE) + first_request = rpc.take_request() + rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) + rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) + rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) + second_request = rpc.take_request() + rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) + rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) + rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) + rpc.requests_closed() + rpc.terminate((), grpc.StatusCode.OK, '') + application_return_value = application_future.result() + + self.assertEqual(_application_common.STREAM_STREAM_REQUEST, + first_request) + self.assertEqual(_application_common.STREAM_STREAM_REQUEST, + second_request) + self.assertIs(application_return_value.kind, + _client_application.Outcome.Kind.UNSATISFACTORY) + + def test_infinite_request_stream_real_time(self): + application_future = self._client_execution_thread_pool.submit( + _client_application.run, + _client_application.Scenario.INFINITE_REQUEST_STREAM, + self._real_time_channel) + invocation_metadata, rpc = self._real_time_channel.take_stream_unary( + _application_testing_common.FIRST_SERVICE_STREUN) + rpc.send_initial_metadata(()) + first_request = rpc.take_request() + second_request = rpc.take_request() + third_request = rpc.take_request() + self._real_time.sleep_for( + _application_common.INFINITE_REQUEST_STREAM_TIMEOUT) + rpc.terminate(_application_common.STREAM_UNARY_RESPONSE, (), + grpc.StatusCode.DEADLINE_EXCEEDED, '') + application_return_value = application_future.result() + + self.assertEqual(_application_common.STREAM_UNARY_REQUEST, + first_request) + self.assertEqual(_application_common.STREAM_UNARY_REQUEST, + second_request) + self.assertEqual(_application_common.STREAM_UNARY_REQUEST, + third_request) + self.assertIs(application_return_value.kind, + _client_application.Outcome.Kind.SATISFACTORY) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/testing/_time_test.py b/src/python/grpcio_tests/tests/testing/_time_test.py new file mode 100644 index 0000000000..797394ae20 --- /dev/null +++ b/src/python/grpcio_tests/tests/testing/_time_test.py @@ -0,0 +1,165 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import random +import threading +import time +import unittest + +import grpc_testing + +_QUANTUM = 0.3 +_MANY = 10000 +# Tests that run in real time can either wait for the scheduler to +# eventually run what needs to be run (and risk timing out) or declare +# that the scheduler didn't schedule work reasonably fast enough. We +# choose the latter for this test. +_PATHOLOGICAL_SCHEDULING = 'pathological thread scheduling!' + + +class _TimeNoter(object): + + def __init__(self, time): + self._condition = threading.Condition() + self._time = time + self._call_times = [] + + def __call__(self): + with self._condition: + self._call_times.append(self._time.time()) + + def call_times(self): + with self._condition: + return tuple(self._call_times) + + +class TimeTest(object): + + def test_sleep_for(self): + start_time = self._time.time() + self._time.sleep_for(_QUANTUM) + end_time = self._time.time() + + self.assertLessEqual(start_time + _QUANTUM, end_time) + + def test_sleep_until(self): + start_time = self._time.time() + self._time.sleep_until(start_time + _QUANTUM) + end_time = self._time.time() + + self.assertLessEqual(start_time + _QUANTUM, end_time) + + def test_call_in(self): + time_noter = _TimeNoter(self._time) + + start_time = self._time.time() + self._time.call_in(time_noter, _QUANTUM) + self._time.sleep_for(_QUANTUM * 2) + call_times = time_noter.call_times() + + self.assertTrue(call_times, msg=_PATHOLOGICAL_SCHEDULING) + self.assertLessEqual(start_time + _QUANTUM, call_times[0]) + + def test_call_at(self): + time_noter = _TimeNoter(self._time) + + start_time = self._time.time() + self._time.call_at(time_noter, self._time.time() + _QUANTUM) + self._time.sleep_for(_QUANTUM * 2) + call_times = time_noter.call_times() + + self.assertTrue(call_times, msg=_PATHOLOGICAL_SCHEDULING) + self.assertLessEqual(start_time + _QUANTUM, call_times[0]) + + def test_cancel(self): + time_noter = _TimeNoter(self._time) + + future = self._time.call_in(time_noter, _QUANTUM * 2) + self._time.sleep_for(_QUANTUM) + cancelled = future.cancel() + self._time.sleep_for(_QUANTUM * 2) + call_times = time_noter.call_times() + + self.assertFalse(call_times, msg=_PATHOLOGICAL_SCHEDULING) + self.assertTrue(cancelled) + self.assertTrue(future.cancelled()) + + def test_many(self): + test_events = tuple(threading.Event() for _ in range(_MANY)) + possibly_cancelled_futures = {} + background_noise_futures = [] + + for test_event in test_events: + possibly_cancelled_futures[test_event] = self._time.call_in( + test_event.set, _QUANTUM * (2 + random.random())) + for _ in range(_MANY): + background_noise_futures.append( + self._time.call_in(threading.Event().set, _QUANTUM * 1000 * + random.random())) + self._time.sleep_for(_QUANTUM) + cancelled = set() + for test_event, test_future in possibly_cancelled_futures.items(): + if bool(random.randint(0, 1)) and test_future.cancel(): + cancelled.add(test_event) + self._time.sleep_for(_QUANTUM * 3) + + for test_event in test_events: + (self.assertFalse if test_event in cancelled else + self.assertTrue)(test_event.is_set()) + for background_noise_future in background_noise_futures: + background_noise_future.cancel() + + def test_same_behavior_used_several_times(self): + time_noter = _TimeNoter(self._time) + + start_time = self._time.time() + first_future_at_one = self._time.call_in(time_noter, _QUANTUM) + second_future_at_one = self._time.call_in(time_noter, _QUANTUM) + first_future_at_three = self._time.call_in(time_noter, _QUANTUM * 3) + second_future_at_three = self._time.call_in(time_noter, _QUANTUM * 3) + self._time.sleep_for(_QUANTUM * 2) + first_future_at_one_cancelled = first_future_at_one.cancel() + second_future_at_one_cancelled = second_future_at_one.cancel() + first_future_at_three_cancelled = first_future_at_three.cancel() + self._time.sleep_for(_QUANTUM * 2) + second_future_at_three_cancelled = second_future_at_three.cancel() + first_future_at_three_cancelled_again = first_future_at_three.cancel() + call_times = time_noter.call_times() + + self.assertEqual(3, len(call_times), msg=_PATHOLOGICAL_SCHEDULING) + self.assertFalse(first_future_at_one_cancelled) + self.assertFalse(second_future_at_one_cancelled) + self.assertTrue(first_future_at_three_cancelled) + self.assertFalse(second_future_at_three_cancelled) + self.assertTrue(first_future_at_three_cancelled_again) + self.assertLessEqual(start_time + _QUANTUM, call_times[0]) + self.assertLessEqual(start_time + _QUANTUM, call_times[1]) + self.assertLessEqual(start_time + _QUANTUM * 3, call_times[2]) + + +class StrictRealTimeTest(TimeTest, unittest.TestCase): + + def setUp(self): + self._time = grpc_testing.strict_real_time() + + +class StrictFakeTimeTest(TimeTest, unittest.TestCase): + + def setUp(self): + self._time = grpc_testing.strict_fake_time( + random.randint(0, int(time.time()))) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/testing/proto/__init__.py b/src/python/grpcio_tests/tests/testing/proto/__init__.py new file mode 100644 index 0000000000..1e120359cf --- /dev/null +++ b/src/python/grpcio_tests/tests/testing/proto/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/python/grpcio_tests/tests/testing/proto/requests.proto b/src/python/grpcio_tests/tests/testing/proto/requests.proto new file mode 100644 index 0000000000..54a60bff86 --- /dev/null +++ b/src/python/grpcio_tests/tests/testing/proto/requests.proto @@ -0,0 +1,29 @@ +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package tests_of_grpc_testing; + +message Up { + int32 first_up_field = 1; +} + +message Charm { + int32 first_charm_field = 1; +} + +message Top { + int32 first_top_field = 1; +} diff --git a/src/python/grpcio_tests/tests/testing/proto/services.proto b/src/python/grpcio_tests/tests/testing/proto/services.proto new file mode 100644 index 0000000000..cb15c0d1ce --- /dev/null +++ b/src/python/grpcio_tests/tests/testing/proto/services.proto @@ -0,0 +1,42 @@ +// Copyright 2017 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +import "tests/testing/proto/requests.proto"; + +package tests_of_grpc_testing; + +message Down { + int32 first_down_field = 1; +} + +message Strange { + int32 first_strange_field = 1; +} + +message Bottom { + int32 first_bottom_field = 1; +} + +service FirstService { + rpc UnUn(Up) returns (Down); + rpc UnStre(Charm) returns (stream Strange); + rpc StreUn(stream Charm) returns (Strange); + rpc StreStre(stream Top) returns (stream Bottom); +} + +service SecondService { + rpc UnStre(Strange) returns (stream Charm); +} diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json index 126e8ac60d..c10719b86f 100644 --- a/src/python/grpcio_tests/tests/tests.json +++ b/src/python/grpcio_tests/tests/tests.json @@ -9,6 +9,9 @@ "protoc_plugin._split_definitions_test.SplitSeparateTest", "protoc_plugin.beta_python_plugin_test.PythonPluginTest", "reflection._reflection_servicer_test.ReflectionServicerTest", + "testing._client_test.ClientTest", + "testing._time_test.StrictFakeTimeTest", + "testing._time_test.StrictRealTimeTest", "unit._api_test.AllTest", "unit._api_test.ChannelConnectivityTest", "unit._api_test.ChannelTest", |