diff options
author | Nathaniel Manista <nathaniel@google.com> | 2017-07-28 01:24:52 +0000 |
---|---|---|
committer | Nathaniel Manista <nathaniel@google.com> | 2017-08-03 22:38:40 +0000 |
commit | 2010985ab269c8df0443e4f3782cbdffb083e9d4 (patch) | |
tree | 06a2a9a8a3f061542eeeac42edf2089b78c179be /src/python/grpcio_testing | |
parent | 69b7231776dc42c87abad33430c66e7b302bf00c (diff) |
gRPC Python test infrastructure
(The channel-related second part of it.)
Diffstat (limited to 'src/python/grpcio_testing')
9 files changed, 1263 insertions, 0 deletions
diff --git a/src/python/grpcio_testing/grpc_testing/__init__.py b/src/python/grpcio_testing/grpc_testing/__init__.py index c5a17f457a..14e25f09e2 100644 --- a/src/python/grpcio_testing/grpc_testing/__init__.py +++ b/src/python/grpcio_testing/grpc_testing/__init__.py @@ -15,11 +15,284 @@ import abc +from google.protobuf import descriptor import six import grpc +class UnaryUnaryChannelRpc(six.with_metaclass(abc.ABCMeta)): + """Fixture for a unary-unary RPC invoked by a system under test. + + Enables users to "play server" for the RPC. + """ + + @abc.abstractmethod + def send_initial_metadata(self, initial_metadata): + """Sends the RPC's initial metadata to the system under test. + + Args: + initial_metadata: The RPC's initial metadata to be "sent" to + the system under test. + """ + raise NotImplementedError() + + @abc.abstractmethod + def cancelled(self): + """Blocks until the system under test has cancelled the RPC.""" + raise NotImplementedError() + + @abc.abstractmethod + def terminate(self, response, trailing_metadata, code, details): + """Terminates the RPC. + + Args: + response: The response for the RPC. + trailing_metadata: The RPC's trailing metadata. + code: The RPC's status code. + details: The RPC's status details. + """ + raise NotImplementedError() + + +class UnaryStreamChannelRpc(six.with_metaclass(abc.ABCMeta)): + """Fixture for a unary-stream RPC invoked by a system under test. + + Enables users to "play server" for the RPC. + """ + + @abc.abstractmethod + def send_initial_metadata(self, initial_metadata): + """Sends the RPC's initial metadata to the system under test. + + Args: + initial_metadata: The RPC's initial metadata to be "sent" to + the system under test. + """ + raise NotImplementedError() + + @abc.abstractmethod + def send_response(self, response): + """Sends a response to the system under test. + + Args: + response: A response message to be "sent" to the system under test. + """ + raise NotImplementedError() + + @abc.abstractmethod + def cancelled(self): + """Blocks until the system under test has cancelled the RPC.""" + raise NotImplementedError() + + @abc.abstractmethod + def terminate(self, trailing_metadata, code, details): + """Terminates the RPC. + + Args: + trailing_metadata: The RPC's trailing metadata. + code: The RPC's status code. + details: The RPC's status details. + """ + raise NotImplementedError() + + +class StreamUnaryChannelRpc(six.with_metaclass(abc.ABCMeta)): + """Fixture for a stream-unary RPC invoked by a system under test. + + Enables users to "play server" for the RPC. + """ + + @abc.abstractmethod + def send_initial_metadata(self, initial_metadata): + """Sends the RPC's initial metadata to the system under test. + + Args: + initial_metadata: The RPC's initial metadata to be "sent" to + the system under test. + """ + raise NotImplementedError() + + @abc.abstractmethod + def take_request(self): + """Draws one of the requests added to the RPC by the system under test. + + This method blocks until the system under test has added to the RPC + the request to be returned. + + Successive calls to this method return requests in the same order in + which the system under test added them to the RPC. + + Returns: + A request message added to the RPC by the system under test. + """ + raise NotImplementedError() + + @abc.abstractmethod + def requests_closed(self): + """Blocks until the system under test has closed the request stream.""" + raise NotImplementedError() + + @abc.abstractmethod + def cancelled(self): + """Blocks until the system under test has cancelled the RPC.""" + raise NotImplementedError() + + @abc.abstractmethod + def terminate(self, response, trailing_metadata, code, details): + """Terminates the RPC. + + Args: + response: The response for the RPC. + trailing_metadata: The RPC's trailing metadata. + code: The RPC's status code. + details: The RPC's status details. + """ + raise NotImplementedError() + + +class StreamStreamChannelRpc(six.with_metaclass(abc.ABCMeta)): + """Fixture for a stream-stream RPC invoked by a system under test. + + Enables users to "play server" for the RPC. + """ + + @abc.abstractmethod + def send_initial_metadata(self, initial_metadata): + """Sends the RPC's initial metadata to the system under test. + + Args: + initial_metadata: The RPC's initial metadata to be "sent" to the + system under test. + """ + raise NotImplementedError() + + @abc.abstractmethod + def take_request(self): + """Draws one of the requests added to the RPC by the system under test. + + This method blocks until the system under test has added to the RPC + the request to be returned. + + Successive calls to this method return requests in the same order in + which the system under test added them to the RPC. + + Returns: + A request message added to the RPC by the system under test. + """ + raise NotImplementedError() + + @abc.abstractmethod + def send_response(self, response): + """Sends a response to the system under test. + + Args: + response: A response messages to be "sent" to the system under test. + """ + raise NotImplementedError() + + @abc.abstractmethod + def requests_closed(self): + """Blocks until the system under test has closed the request stream.""" + raise NotImplementedError() + + @abc.abstractmethod + def cancelled(self): + """Blocks until the system under test has cancelled the RPC.""" + raise NotImplementedError() + + @abc.abstractmethod + def terminate(self, trailing_metadata, code, details): + """Terminates the RPC. + + Args: + trailing_metadata: The RPC's trailing metadata. + code: The RPC's status code. + details: The RPC's status details. + """ + raise NotImplementedError() + + +class Channel(six.with_metaclass(abc.ABCMeta), grpc.Channel): + """A grpc.Channel double with which to test a system that invokes RPCs.""" + + @abc.abstractmethod + def take_unary_unary(self, method_descriptor): + """Draws an RPC currently being made by the system under test. + + If the given descriptor does not identify any RPC currently being made + by the system under test, this method blocks until the system under + test invokes such an RPC. + + Args: + method_descriptor: A descriptor.MethodDescriptor describing a + unary-unary RPC method. + + Returns: + A (invocation_metadata, request, unary_unary_channel_rpc) tuple of + the RPC's invocation metadata, its request, and a + UnaryUnaryChannelRpc with which to "play server" for the RPC. + """ + raise NotImplementedError() + + @abc.abstractmethod + def take_unary_stream(self, method_descriptor): + """Draws an RPC currently being made by the system under test. + + If the given descriptor does not identify any RPC currently being made + by the system under test, this method blocks until the system under + test invokes such an RPC. + + Args: + method_descriptor: A descriptor.MethodDescriptor describing a + unary-stream RPC method. + + Returns: + A (invocation_metadata, request, unary_stream_channel_rpc) tuple of + the RPC's invocation metadata, its request, and a + UnaryStreamChannelRpc with which to "play server" for the RPC. + """ + raise NotImplementedError() + + @abc.abstractmethod + def take_stream_unary(self, method_descriptor): + """Draws an RPC currently being made by the system under test. + + If the given descriptor does not identify any RPC currently being made + by the system under test, this method blocks until the system under + test invokes such an RPC. + + Args: + method_descriptor: A descriptor.MethodDescriptor describing a + stream-unary RPC method. + + Returns: + A (invocation_metadata, stream_unary_channel_rpc) tuple of the RPC's + invocation metadata and a StreamUnaryChannelRpc with which to "play + server" for the RPC. + """ + raise NotImplementedError() + + @abc.abstractmethod + def take_stream_stream(self, method_descriptor): + """Draws an RPC currently being made by the system under test. + + If the given descriptor does not identify any RPC currently being made + by the system under test, this method blocks until the system under + test invokes such an RPC. + + Args: + method_descriptor: A descriptor.MethodDescriptor describing a + stream-stream RPC method. + + Returns: + A (invocation_metadata, stream_stream_channel_rpc) tuple of the RPC's + invocation metadata and a StreamStreamChannelRpc with which to + "play server" for the RPC. + """ + raise NotImplementedError() + + class Time(six.with_metaclass(abc.ABCMeta)): """A simulation of time. @@ -117,3 +390,19 @@ def strict_fake_time(now): """ from grpc_testing import _time return _time.StrictFakeTime(now) + + +def channel(service_descriptors, time): + """Creates a Channel for use in tests of a gRPC Python-using system. + + Args: + service_descriptors: An iterable of descriptor.ServiceDescriptors + describing the RPCs that will be made on the returned Channel by the + system under test. + time: A Time to be used for tests. + + Returns: + A Channel for use in tests. + """ + from grpc_testing import _channel + return _channel.testing_channel(service_descriptors, time) diff --git a/src/python/grpcio_testing/grpc_testing/_channel/__init__.py b/src/python/grpcio_testing/grpc_testing/_channel/__init__.py new file mode 100644 index 0000000000..8011975d0a --- /dev/null +++ b/src/python/grpcio_testing/grpc_testing/_channel/__init__.py @@ -0,0 +1,23 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from grpc_testing._channel import _channel +from grpc_testing._channel import _channel_state + + +# descriptors is reserved for later use. +# pylint: disable=unused-argument +def testing_channel(descriptors, time): + return _channel.TestingChannel(time, _channel_state.State()) +# pylint: enable=unused-argument diff --git a/src/python/grpcio_testing/grpc_testing/_channel/_channel.py b/src/python/grpcio_testing/grpc_testing/_channel/_channel.py new file mode 100644 index 0000000000..fbd064db88 --- /dev/null +++ b/src/python/grpcio_testing/grpc_testing/_channel/_channel.py @@ -0,0 +1,62 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc_testing +from grpc_testing._channel import _channel_rpc +from grpc_testing._channel import _multi_callable + + +# All serializer and deserializer parameters are not (yet) used by this +# test infrastructure. +# pylint: disable=unused-argument +class TestingChannel(grpc_testing.Channel): + + def __init__(self, time, state): + self._time = time + self._state = state + + def subscribe(self, callback, try_to_connect=False): + raise NotImplementedError() + + def unsubscribe(self, callback): + raise NotImplementedError() + + def unary_unary( + self, method, request_serializer=None, response_deserializer=None): + return _multi_callable.UnaryUnary(method, self._state) + + def unary_stream( + self, method, request_serializer=None, response_deserializer=None): + return _multi_callable.UnaryStream(method, self._state) + + def stream_unary( + self, method, request_serializer=None, response_deserializer=None): + return _multi_callable.StreamUnary(method, self._state) + + def stream_stream( + self, method, request_serializer=None, response_deserializer=None): + return _multi_callable.StreamStream(method, self._state) + + def take_unary_unary(self, method_descriptor): + return _channel_rpc.unary_unary(self._state, method_descriptor) + + def take_unary_stream(self, method_descriptor): + return _channel_rpc.unary_stream(self._state, method_descriptor) + + def take_stream_unary(self, method_descriptor): + return _channel_rpc.stream_unary(self._state, method_descriptor) + + def take_stream_stream(self, method_descriptor): + return _channel_rpc.stream_stream(self._state, method_descriptor) +# pylint: enable=unused-argument diff --git a/src/python/grpcio_testing/grpc_testing/_channel/_channel_rpc.py b/src/python/grpcio_testing/grpc_testing/_channel/_channel_rpc.py new file mode 100644 index 0000000000..762b6a035b --- /dev/null +++ b/src/python/grpcio_testing/grpc_testing/_channel/_channel_rpc.py @@ -0,0 +1,119 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc_testing + + +class _UnaryUnary(grpc_testing.UnaryUnaryChannelRpc): + + def __init__(self, rpc_state): + self._rpc_state = rpc_state + + def send_initial_metadata(self, initial_metadata): + self._rpc_state.send_initial_metadata(initial_metadata) + + def cancelled(self): + self._rpc_state.cancelled() + + def terminate(self, response, trailing_metadata, code, details): + self._rpc_state.terminate_with_response( + response, trailing_metadata, code, details) + + +class _UnaryStream(grpc_testing.UnaryStreamChannelRpc): + + def __init__(self, rpc_state): + self._rpc_state = rpc_state + + def send_initial_metadata(self, initial_metadata): + self._rpc_state.send_initial_metadata(initial_metadata) + + def send_response(self, response): + self._rpc_state.send_response(response) + + def cancelled(self): + self._rpc_state.cancelled() + + def terminate(self, trailing_metadata, code, details): + self._rpc_state.terminate(trailing_metadata, code, details) + + +class _StreamUnary(grpc_testing.StreamUnaryChannelRpc): + + def __init__(self, rpc_state): + self._rpc_state = rpc_state + + def send_initial_metadata(self, initial_metadata): + self._rpc_state.send_initial_metadata(initial_metadata) + + def take_request(self): + return self._rpc_state.take_request() + + def requests_closed(self): + return self._rpc_state.requests_closed() + + def cancelled(self): + self._rpc_state.cancelled() + + def terminate(self, response, trailing_metadata, code, details): + self._rpc_state.terminate_with_response( + response, trailing_metadata, code, details) + + +class _StreamStream(grpc_testing.StreamStreamChannelRpc): + + def __init__(self, rpc_state): + self._rpc_state = rpc_state + + def send_initial_metadata(self, initial_metadata): + self._rpc_state.send_initial_metadata(initial_metadata) + + def take_request(self): + return self._rpc_state.take_request() + + def send_response(self, response): + self._rpc_state.send_response(response) + + def requests_closed(self): + return self._rpc_state.requests_closed() + + def cancelled(self): + self._rpc_state.cancelled() + + def terminate(self, trailing_metadata, code, details): + self._rpc_state.terminate(trailing_metadata, code, details) + + +def unary_unary(channel_state, method_descriptor): + rpc_state = channel_state.take_rpc_state(method_descriptor) + invocation_metadata, request = ( + rpc_state.take_invocation_metadata_and_request()) + return invocation_metadata, request, _UnaryUnary(rpc_state) + + +def unary_stream(channel_state, method_descriptor): + rpc_state = channel_state.take_rpc_state(method_descriptor) + invocation_metadata, request = ( + rpc_state.take_invocation_metadata_and_request()) + return invocation_metadata, request, _UnaryStream(rpc_state) + + +def stream_unary(channel_state, method_descriptor): + rpc_state = channel_state.take_rpc_state(method_descriptor) + return rpc_state.take_invocation_metadata(), _StreamUnary(rpc_state) + + +def stream_stream(channel_state, method_descriptor): + rpc_state = channel_state.take_rpc_state(method_descriptor) + return rpc_state.take_invocation_metadata(), _StreamStream(rpc_state) diff --git a/src/python/grpcio_testing/grpc_testing/_channel/_channel_state.py b/src/python/grpcio_testing/grpc_testing/_channel/_channel_state.py new file mode 100644 index 0000000000..569c41d79d --- /dev/null +++ b/src/python/grpcio_testing/grpc_testing/_channel/_channel_state.py @@ -0,0 +1,48 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections +import threading + +from grpc_testing import _common +from grpc_testing._channel import _rpc_state + + +class State(_common.ChannelHandler): + + def __init__(self): + self._condition = threading.Condition() + self._rpc_states = collections.defaultdict(list) + + def invoke_rpc( + self, method_full_rpc_name, invocation_metadata, requests, + requests_closed, timeout): + rpc_state = _rpc_state.State( + invocation_metadata, requests, requests_closed) + with self._condition: + self._rpc_states[method_full_rpc_name].append(rpc_state) + self._condition.notify_all() + return rpc_state + + def take_rpc_state(self, method_descriptor): + method_full_rpc_name = '/{}/{}'.format( + method_descriptor.containing_service.full_name, + method_descriptor.name) + with self._condition: + while True: + method_rpc_states = self._rpc_states[method_full_rpc_name] + if method_rpc_states: + return method_rpc_states.pop(0) + else: + self._condition.wait() diff --git a/src/python/grpcio_testing/grpc_testing/_channel/_invocation.py b/src/python/grpcio_testing/grpc_testing/_channel/_invocation.py new file mode 100644 index 0000000000..ebce652eeb --- /dev/null +++ b/src/python/grpcio_testing/grpc_testing/_channel/_invocation.py @@ -0,0 +1,322 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import threading + +import grpc + +_NOT_YET_OBSERVED = object() + + +def _cancel(handler): + return handler.cancel(grpc.StatusCode.CANCELLED, 'Locally cancelled!') + + +def _is_active(handler): + return handler.is_active() + + +def _time_remaining(unused_handler): + raise NotImplementedError() + + +def _add_callback(handler, callback): + return handler.add_callback(callback) + + +def _initial_metadata(handler): + return handler.initial_metadata() + + +def _trailing_metadata(handler): + trailing_metadata, unused_code, unused_details = handler.termination() + return trailing_metadata + + +def _code(handler): + unused_trailing_metadata, code, unused_details = handler.termination() + return code + + +def _details(handler): + unused_trailing_metadata, unused_code, details = handler.termination() + return details + + +class _Call(grpc.Call): + + def __init__(self, handler): + self._handler = handler + + def cancel(self): + _cancel(self._handler) + + def is_active(self): + return _is_active(self._handler) + + def time_remaining(self): + return _time_remaining(self._handler) + + def add_callback(self, callback): + return _add_callback(self._handler, callback) + + def initial_metadata(self): + return _initial_metadata(self._handler) + + def trailing_metadata(self): + return _trailing_metadata(self._handler) + + def code(self): + return _code(self._handler) + + def details(self): + return _details(self._handler) + + +class _RpcErrorCall(grpc.RpcError, grpc.Call): + + def __init__(self, handler): + self._handler = handler + + def cancel(self): + _cancel(self._handler) + + def is_active(self): + return _is_active(self._handler) + + def time_remaining(self): + return _time_remaining(self._handler) + + def add_callback(self, callback): + return _add_callback(self._handler, callback) + + def initial_metadata(self): + return _initial_metadata(self._handler) + + def trailing_metadata(self): + return _trailing_metadata(self._handler) + + def code(self): + return _code(self._handler) + + def details(self): + return _details(self._handler) + + +def _next(handler): + read = handler.take_response() + if read.code is None: + return read.response + elif read.code is grpc.StatusCode.OK: + raise StopIteration() + else: + raise _RpcErrorCall(handler) + + +class _HandlerExtras(object): + + def __init__(self): + self.condition = threading.Condition() + self.unary_response = _NOT_YET_OBSERVED + self.cancelled = False + + +def _with_extras_cancel(handler, extras): + with extras.condition: + if handler.cancel(grpc.StatusCode.CANCELLED, 'Locally cancelled!'): + extras.cancelled = True + return True + else: + return False + + +def _extras_without_cancelled(extras): + with extras.condition: + return extras.cancelled + + +def _running(handler): + return handler.is_active() + + +def _done(handler): + return not handler.is_active() + + +def _with_extras_unary_response(handler, extras): + with extras.condition: + if extras.unary_response is _NOT_YET_OBSERVED: + read = handler.take_response() + if read.code is None: + extras.unary_response = read.response + return read.response + else: + raise _RpcErrorCall(handler) + else: + return extras.unary_response + + +def _exception(unused_handler): + raise NotImplementedError('TODO!') + + +def _traceback(unused_handler): + raise NotImplementedError('TODO!') + + +def _add_done_callback(handler, callback, future): + adapted_callback = lambda: callback(future) + if not handler.add_callback(adapted_callback): + callback(future) + + +class _FutureCall(grpc.Future, grpc.Call): + + def __init__(self, handler, extras): + self._handler = handler + self._extras = extras + + def cancel(self): + return _with_extras_cancel(self._handler, self._extras) + + def cancelled(self): + return _extras_without_cancelled(self._extras) + + def running(self): + return _running(self._handler) + + def done(self): + return _done(self._handler) + + def result(self): + return _with_extras_unary_response(self._handler, self._extras) + + def exception(self): + return _exception(self._handler) + + def traceback(self): + return _traceback(self._handler) + + def add_done_callback(self, fn): + _add_done_callback(self._handler, fn, self) + + def is_active(self): + return _is_active(self._handler) + + def time_remaining(self): + return _time_remaining(self._handler) + + def add_callback(self, callback): + return _add_callback(self._handler, callback) + + def initial_metadata(self): + return _initial_metadata(self._handler) + + def trailing_metadata(self): + return _trailing_metadata(self._handler) + + def code(self): + return _code(self._handler) + + def details(self): + return _details(self._handler) + + +def consume_requests(request_iterator, handler): + + def _consume(): + while True: + try: + request = next(request_iterator) + added = handler.add_request(request) + if not added: + break + except StopIteration: + handler.close_requests() + break + except Exception: # pylint: disable=broad-except + details = 'Exception iterating requests!' + logging.exception(details) + handler.cancel(grpc.StatusCode.UNKNOWN, details) + + consumption = threading.Thread(target=_consume) + consumption.start() + + +def blocking_unary_response(handler): + read = handler.take_response() + if read.code is None: + unused_trailing_metadata, code, unused_details = handler.termination() + if code is grpc.StatusCode.OK: + return read.response + else: + raise _RpcErrorCall(handler) + else: + raise _RpcErrorCall(handler) + + +def blocking_unary_response_with_call(handler): + read = handler.take_response() + if read.code is None: + unused_trailing_metadata, code, unused_details = handler.termination() + if code is grpc.StatusCode.OK: + return read.response, _Call(handler) + else: + raise _RpcErrorCall(handler) + else: + raise _RpcErrorCall(handler) + + +def future_call(handler): + return _FutureCall(handler, _HandlerExtras()) + + +class ResponseIteratorCall(grpc.Call): + + def __init__(self, handler): + self._handler = handler + + def __iter__(self): + return self + + def __next__(self): + return _next(self._handler) + + def next(self): + return _next(self._handler) + + def cancel(self): + _cancel(self._handler) + + def is_active(self): + return _is_active(self._handler) + + def time_remaining(self): + return _time_remaining(self._handler) + + def add_callback(self, callback): + return _add_callback(self._handler, callback) + + def initial_metadata(self): + return _initial_metadata(self._handler) + + def trailing_metadata(self): + return _trailing_metadata(self._handler) + + def code(self): + return _code(self._handler) + + def details(self): + return _details(self._handler) diff --git a/src/python/grpcio_testing/grpc_testing/_channel/_multi_callable.py b/src/python/grpcio_testing/grpc_testing/_channel/_multi_callable.py new file mode 100644 index 0000000000..fe69257f5b --- /dev/null +++ b/src/python/grpcio_testing/grpc_testing/_channel/_multi_callable.py @@ -0,0 +1,115 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc +from grpc_testing import _common +from grpc_testing._channel import _invocation + +# All per-call credentials parameters are unused by this test infrastructure. +# pylint: disable=unused-argument +class UnaryUnary(grpc.UnaryUnaryMultiCallable): + + def __init__(self, method_full_rpc_name, channel_handler): + self._method_full_rpc_name = method_full_rpc_name + self._channel_handler = channel_handler + + def __call__(self, request, timeout=None, metadata=None, credentials=None): + rpc_handler = self._channel_handler.invoke_rpc( + self._method_full_rpc_name, _common.fuss_with_metadata(metadata), + [request], True, timeout) + return _invocation.blocking_unary_response(rpc_handler) + + def with_call(self, request, timeout=None, metadata=None, credentials=None): + rpc_handler = self._channel_handler.invoke_rpc( + self._method_full_rpc_name, _common.fuss_with_metadata(metadata), + [request], True, timeout) + return _invocation.blocking_unary_response_with_call(rpc_handler) + + def future(self, request, timeout=None, metadata=None, credentials=None): + rpc_handler = self._channel_handler.invoke_rpc( + self._method_full_rpc_name, _common.fuss_with_metadata(metadata), + [request], True, timeout) + return _invocation.future_call(rpc_handler) + + +class UnaryStream(grpc.StreamStreamMultiCallable): + + def __init__(self, method_full_rpc_name, channel_handler): + self._method_full_rpc_name = method_full_rpc_name + self._channel_handler = channel_handler + + def __call__(self, request, timeout=None, metadata=None, credentials=None): + rpc_handler = self._channel_handler.invoke_rpc( + self._method_full_rpc_name, + _common.fuss_with_metadata(metadata), [request], True, timeout) + return _invocation.ResponseIteratorCall(rpc_handler) + + +class StreamUnary(grpc.StreamUnaryMultiCallable): + + def __init__(self, method_full_rpc_name, channel_handler): + self._method_full_rpc_name = method_full_rpc_name + self._channel_handler = channel_handler + + def __call__(self, + request_iterator, + timeout=None, + metadata=None, + credentials=None): + rpc_handler = self._channel_handler.invoke_rpc( + self._method_full_rpc_name, + _common.fuss_with_metadata(metadata), [], False, timeout) + _invocation.consume_requests(request_iterator, rpc_handler) + return _invocation.blocking_unary_response(rpc_handler) + + def with_call(self, + request_iterator, + timeout=None, + metadata=None, + credentials=None): + rpc_handler = self._channel_handler.invoke_rpc( + self._method_full_rpc_name, + _common.fuss_with_metadata(metadata), [], False, timeout) + _invocation.consume_requests(request_iterator, rpc_handler) + return _invocation.blocking_unary_response_with_call(rpc_handler) + + def future(self, + request_iterator, + timeout=None, + metadata=None, + credentials=None): + rpc_handler = self._channel_handler.invoke_rpc( + self._method_full_rpc_name, + _common.fuss_with_metadata(metadata), [], False, timeout) + _invocation.consume_requests(request_iterator, rpc_handler) + return _invocation.future_call(rpc_handler) + + +class StreamStream(grpc.StreamStreamMultiCallable): + + def __init__(self, method_full_rpc_name, channel_handler): + self._method_full_rpc_name = method_full_rpc_name + self._channel_handler = channel_handler + + def __call__(self, + request_iterator, + timeout=None, + metadata=None, + credentials=None): + rpc_handler = self._channel_handler.invoke_rpc( + self._method_full_rpc_name, + _common.fuss_with_metadata(metadata), [], False, timeout) + _invocation.consume_requests(request_iterator, rpc_handler) + return _invocation.ResponseIteratorCall(rpc_handler) +# pylint: enable=unused-argument diff --git a/src/python/grpcio_testing/grpc_testing/_channel/_rpc_state.py b/src/python/grpcio_testing/grpc_testing/_channel/_rpc_state.py new file mode 100644 index 0000000000..e1fa49a2a8 --- /dev/null +++ b/src/python/grpcio_testing/grpc_testing/_channel/_rpc_state.py @@ -0,0 +1,193 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import threading + +import grpc +from grpc_testing import _common + + +class State(_common.ChannelRpcHandler): + + def __init__(self, invocation_metadata, requests, requests_closed): + self._condition = threading.Condition() + self._invocation_metadata = invocation_metadata + self._requests = requests + self._requests_closed = requests_closed + self._initial_metadata = None + self._responses = [] + self._trailing_metadata = None + self._code = None + self._details = None + + def initial_metadata(self): + with self._condition: + while True: + if self._initial_metadata is None: + if self._code is None: + self._condition.wait() + else: + return _common.FUSSED_EMPTY_METADATA + else: + return self._initial_metadata + + def add_request(self, request): + with self._condition: + if self._code is None and not self._requests_closed: + self._requests.append(request) + self._condition.notify_all() + return True + else: + return False + + def close_requests(self): + with self._condition: + if self._code is None and not self._requests_closed: + self._requests_closed = True + self._condition.notify_all() + + def take_response(self): + with self._condition: + while True: + if self._code is grpc.StatusCode.OK: + if self._responses: + response = self._responses.pop(0) + return _common.ChannelRpcRead( + response, None, None, None) + else: + return _common.ChannelRpcRead( + None, self._trailing_metadata, + grpc.StatusCode.OK, self._details) + elif self._code is None: + if self._responses: + response = self._responses.pop(0) + return _common.ChannelRpcRead( + response, None, None, None) + else: + self._condition.wait() + else: + return _common.ChannelRpcRead( + None, self._trailing_metadata, self._code, + self._details) + + def termination(self): + with self._condition: + while True: + if self._code is None: + self._condition.wait() + else: + return self._trailing_metadata, self._code, self._details + + def cancel(self, code, details): + with self._condition: + if self._code is None: + if self._initial_metadata is None: + self._initial_metadata = _common.FUSSED_EMPTY_METADATA + self._trailing_metadata = _common.FUSSED_EMPTY_METADATA + self._code = code + self._details = details + self._condition.notify_all() + return True + else: + return False + + def take_invocation_metadata(self): + with self._condition: + if self._invocation_metadata is None: + raise ValueError('Expected invocation metadata!') + else: + invocation_metadata = self._invocation_metadata + self._invocation_metadata = None + return invocation_metadata + + def take_invocation_metadata_and_request(self): + with self._condition: + if self._invocation_metadata is None: + raise ValueError('Expected invocation metadata!') + elif not self._requests: + raise ValueError('Expected at least one request!') + else: + invocation_metadata = self._invocation_metadata + self._invocation_metadata = None + return invocation_metadata, self._requests.pop(0) + + def send_initial_metadata(self, initial_metadata): + with self._condition: + self._initial_metadata = _common.fuss_with_metadata( + initial_metadata) + self._condition.notify_all() + + def take_request(self): + with self._condition: + while True: + if self._requests: + return self._requests.pop(0) + else: + self._condition.wait() + + def requests_closed(self): + with self._condition: + while True: + if self._requests_closed: + return + else: + self._condition.wait() + + def send_response(self, response): + with self._condition: + if self._code is None: + self._responses.append(response) + self._condition.notify_all() + + def terminate_with_response( + self, response, trailing_metadata, code, details): + with self._condition: + if self._initial_metadata is None: + self._initial_metadata = _common.FUSSED_EMPTY_METADATA + self._responses.append(response) + self._trailing_metadata = _common.fuss_with_metadata( + trailing_metadata) + self._code = code + self._details = details + self._condition.notify_all() + + def terminate(self, trailing_metadata, code, details): + with self._condition: + if self._initial_metadata is None: + self._initial_metadata = _common.FUSSED_EMPTY_METADATA + self._trailing_metadata = _common.fuss_with_metadata( + trailing_metadata) + self._code = code + self._details = details + self._condition.notify_all() + + def cancelled(self): + with self._condition: + while True: + if self._code is grpc.StatusCode.CANCELLED: + return + elif self._code is None: + self._condition.wait() + else: + raise ValueError( + 'Status code unexpectedly {}!'.format(self._code)) + + def is_active(self): + raise NotImplementedError() + + def time_remaining(self): + raise NotImplementedError() + + def add_callback(self, callback): + raise NotImplementedError() diff --git a/src/python/grpcio_testing/grpc_testing/_common.py b/src/python/grpcio_testing/grpc_testing/_common.py new file mode 100644 index 0000000000..cb4a7f5fa2 --- /dev/null +++ b/src/python/grpcio_testing/grpc_testing/_common.py @@ -0,0 +1,92 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Common interfaces and implementation.""" + +import abc +import collections + +import six + + +def _fuss(tuplified_metadata): + return tuplified_metadata + ( + ( + 'grpc.metadata_added_by_runtime', + 'gRPC is allowed to add metadata in transmission and does so.', + ), + ) + +FUSSED_EMPTY_METADATA = _fuss(()) + + +def fuss_with_metadata(metadata): + if metadata is None: + return FUSSED_EMPTY_METADATA + else: + return _fuss(tuple(metadata)) + + +class ChannelRpcRead( + collections.namedtuple( + 'ChannelRpcRead', + ('response', 'trailing_metadata', 'code', 'details',))): + pass + + +class ChannelRpcHandler(six.with_metaclass(abc.ABCMeta)): + + @abc.abstractmethod + def initial_metadata(self): + raise NotImplementedError() + + @abc.abstractmethod + def add_request(self, request): + raise NotImplementedError() + + @abc.abstractmethod + def close_requests(self): + raise NotImplementedError() + + @abc.abstractmethod + def take_response(self): + raise NotImplementedError() + + @abc.abstractmethod + def cancel(self, code, details): + raise NotImplementedError() + + @abc.abstractmethod + def termination(self): + raise NotImplementedError() + + @abc.abstractmethod + def is_active(self): + raise NotImplementedError() + + @abc.abstractmethod + def time_remaining(self): + raise NotImplementedError() + + @abc.abstractmethod + def add_callback(self, callback): + raise NotImplementedError() + + +class ChannelHandler(six.with_metaclass(abc.ABCMeta)): + + @abc.abstractmethod + def invoke_rpc( + self, method_full_rpc_name, invocation_metadata, requests, + requests_closed, timeout): + raise NotImplementedError() |