aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio_testing/grpc_testing/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/grpcio_testing/grpc_testing/__init__.py')
-rw-r--r--src/python/grpcio_testing/grpc_testing/__init__.py289
1 files changed, 289 insertions, 0 deletions
diff --git a/src/python/grpcio_testing/grpc_testing/__init__.py b/src/python/grpcio_testing/grpc_testing/__init__.py
index c5a17f457a..14e25f09e2 100644
--- a/src/python/grpcio_testing/grpc_testing/__init__.py
+++ b/src/python/grpcio_testing/grpc_testing/__init__.py
@@ -15,11 +15,284 @@
import abc
+from google.protobuf import descriptor
import six
import grpc
+class UnaryUnaryChannelRpc(six.with_metaclass(abc.ABCMeta)):
+ """Fixture for a unary-unary RPC invoked by a system under test.
+
+ Enables users to "play server" for the RPC.
+ """
+
+ @abc.abstractmethod
+ def send_initial_metadata(self, initial_metadata):
+ """Sends the RPC's initial metadata to the system under test.
+
+ Args:
+ initial_metadata: The RPC's initial metadata to be "sent" to
+ the system under test.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def cancelled(self):
+ """Blocks until the system under test has cancelled the RPC."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def terminate(self, response, trailing_metadata, code, details):
+ """Terminates the RPC.
+
+ Args:
+ response: The response for the RPC.
+ trailing_metadata: The RPC's trailing metadata.
+ code: The RPC's status code.
+ details: The RPC's status details.
+ """
+ raise NotImplementedError()
+
+
+class UnaryStreamChannelRpc(six.with_metaclass(abc.ABCMeta)):
+ """Fixture for a unary-stream RPC invoked by a system under test.
+
+ Enables users to "play server" for the RPC.
+ """
+
+ @abc.abstractmethod
+ def send_initial_metadata(self, initial_metadata):
+ """Sends the RPC's initial metadata to the system under test.
+
+ Args:
+ initial_metadata: The RPC's initial metadata to be "sent" to
+ the system under test.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def send_response(self, response):
+ """Sends a response to the system under test.
+
+ Args:
+ response: A response message to be "sent" to the system under test.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def cancelled(self):
+ """Blocks until the system under test has cancelled the RPC."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def terminate(self, trailing_metadata, code, details):
+ """Terminates the RPC.
+
+ Args:
+ trailing_metadata: The RPC's trailing metadata.
+ code: The RPC's status code.
+ details: The RPC's status details.
+ """
+ raise NotImplementedError()
+
+
+class StreamUnaryChannelRpc(six.with_metaclass(abc.ABCMeta)):
+ """Fixture for a stream-unary RPC invoked by a system under test.
+
+ Enables users to "play server" for the RPC.
+ """
+
+ @abc.abstractmethod
+ def send_initial_metadata(self, initial_metadata):
+ """Sends the RPC's initial metadata to the system under test.
+
+ Args:
+ initial_metadata: The RPC's initial metadata to be "sent" to
+ the system under test.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def take_request(self):
+ """Draws one of the requests added to the RPC by the system under test.
+
+ This method blocks until the system under test has added to the RPC
+ the request to be returned.
+
+ Successive calls to this method return requests in the same order in
+ which the system under test added them to the RPC.
+
+ Returns:
+ A request message added to the RPC by the system under test.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def requests_closed(self):
+ """Blocks until the system under test has closed the request stream."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def cancelled(self):
+ """Blocks until the system under test has cancelled the RPC."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def terminate(self, response, trailing_metadata, code, details):
+ """Terminates the RPC.
+
+ Args:
+ response: The response for the RPC.
+ trailing_metadata: The RPC's trailing metadata.
+ code: The RPC's status code.
+ details: The RPC's status details.
+ """
+ raise NotImplementedError()
+
+
+class StreamStreamChannelRpc(six.with_metaclass(abc.ABCMeta)):
+ """Fixture for a stream-stream RPC invoked by a system under test.
+
+ Enables users to "play server" for the RPC.
+ """
+
+ @abc.abstractmethod
+ def send_initial_metadata(self, initial_metadata):
+ """Sends the RPC's initial metadata to the system under test.
+
+ Args:
+ initial_metadata: The RPC's initial metadata to be "sent" to the
+ system under test.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def take_request(self):
+ """Draws one of the requests added to the RPC by the system under test.
+
+ This method blocks until the system under test has added to the RPC
+ the request to be returned.
+
+ Successive calls to this method return requests in the same order in
+ which the system under test added them to the RPC.
+
+ Returns:
+ A request message added to the RPC by the system under test.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def send_response(self, response):
+ """Sends a response to the system under test.
+
+ Args:
+ response: A response messages to be "sent" to the system under test.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def requests_closed(self):
+ """Blocks until the system under test has closed the request stream."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def cancelled(self):
+ """Blocks until the system under test has cancelled the RPC."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def terminate(self, trailing_metadata, code, details):
+ """Terminates the RPC.
+
+ Args:
+ trailing_metadata: The RPC's trailing metadata.
+ code: The RPC's status code.
+ details: The RPC's status details.
+ """
+ raise NotImplementedError()
+
+
+class Channel(six.with_metaclass(abc.ABCMeta), grpc.Channel):
+ """A grpc.Channel double with which to test a system that invokes RPCs."""
+
+ @abc.abstractmethod
+ def take_unary_unary(self, method_descriptor):
+ """Draws an RPC currently being made by the system under test.
+
+ If the given descriptor does not identify any RPC currently being made
+ by the system under test, this method blocks until the system under
+ test invokes such an RPC.
+
+ Args:
+ method_descriptor: A descriptor.MethodDescriptor describing a
+ unary-unary RPC method.
+
+ Returns:
+ A (invocation_metadata, request, unary_unary_channel_rpc) tuple of
+ the RPC's invocation metadata, its request, and a
+ UnaryUnaryChannelRpc with which to "play server" for the RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def take_unary_stream(self, method_descriptor):
+ """Draws an RPC currently being made by the system under test.
+
+ If the given descriptor does not identify any RPC currently being made
+ by the system under test, this method blocks until the system under
+ test invokes such an RPC.
+
+ Args:
+ method_descriptor: A descriptor.MethodDescriptor describing a
+ unary-stream RPC method.
+
+ Returns:
+ A (invocation_metadata, request, unary_stream_channel_rpc) tuple of
+ the RPC's invocation metadata, its request, and a
+ UnaryStreamChannelRpc with which to "play server" for the RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def take_stream_unary(self, method_descriptor):
+ """Draws an RPC currently being made by the system under test.
+
+ If the given descriptor does not identify any RPC currently being made
+ by the system under test, this method blocks until the system under
+ test invokes such an RPC.
+
+ Args:
+ method_descriptor: A descriptor.MethodDescriptor describing a
+ stream-unary RPC method.
+
+ Returns:
+ A (invocation_metadata, stream_unary_channel_rpc) tuple of the RPC's
+ invocation metadata and a StreamUnaryChannelRpc with which to "play
+ server" for the RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def take_stream_stream(self, method_descriptor):
+ """Draws an RPC currently being made by the system under test.
+
+ If the given descriptor does not identify any RPC currently being made
+ by the system under test, this method blocks until the system under
+ test invokes such an RPC.
+
+ Args:
+ method_descriptor: A descriptor.MethodDescriptor describing a
+ stream-stream RPC method.
+
+ Returns:
+ A (invocation_metadata, stream_stream_channel_rpc) tuple of the RPC's
+ invocation metadata and a StreamStreamChannelRpc with which to
+ "play server" for the RPC.
+ """
+ raise NotImplementedError()
+
+
class Time(six.with_metaclass(abc.ABCMeta)):
"""A simulation of time.
@@ -117,3 +390,19 @@ def strict_fake_time(now):
"""
from grpc_testing import _time
return _time.StrictFakeTime(now)
+
+
+def channel(service_descriptors, time):
+ """Creates a Channel for use in tests of a gRPC Python-using system.
+
+ Args:
+ service_descriptors: An iterable of descriptor.ServiceDescriptors
+ describing the RPCs that will be made on the returned Channel by the
+ system under test.
+ time: A Time to be used for tests.
+
+ Returns:
+ A Channel for use in tests.
+ """
+ from grpc_testing import _channel
+ return _channel.testing_channel(service_descriptors, time)