aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Nathaniel Manista <nathaniel@google.com>2015-02-19 22:03:26 +0000
committerGravatar Nathaniel Manista <nathaniel@google.com>2015-02-20 18:54:30 +0000
commit331fcb9dc314ab4560adada3813b34d3ee33c7eb (patch)
tree4b82e0c1efbd83976d1d86c831d4058750512fce /src
parent04e3a6e3327e4851a4eaa15fbf1eaec74a5abe3a (diff)
Add SyncAsync interfaces and implementations.
These invocation-side values allow users to choose between synchronous and asynchronous execution of their RPCs with a single attribute access.
Diffstat (limited to 'src')
-rw-r--r--src/python/src/grpc/framework/face/implementations.py38
-rw-r--r--src/python/src/grpc/framework/face/interfaces.py114
-rw-r--r--src/python/src/grpc/framework/face/testing/blocking_invocation_inline_service_test_case.py6
-rw-r--r--src/python/src/grpc/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py8
4 files changed, 160 insertions, 6 deletions
diff --git a/src/python/src/grpc/framework/face/implementations.py b/src/python/src/grpc/framework/face/implementations.py
index c499b90720..86948b386f 100644
--- a/src/python/src/grpc/framework/face/implementations.py
+++ b/src/python/src/grpc/framework/face/implementations.py
@@ -56,6 +56,38 @@ class _BaseServicer(base_interfaces.Servicer):
raise _base_exceptions.NoSuchMethodError()
+class _UnaryUnarySyncAsync(interfaces.UnaryUnarySyncAsync):
+
+ def __init__(self, front, name):
+ self._front = front
+ self._name = name
+
+ def __call__(self, request, timeout):
+ return _calls.blocking_value_in_value_out(
+ self._front, self._name, request, timeout, 'unused trace ID')
+
+ def async(self, request, timeout):
+ return _calls.future_value_in_value_out(
+ self._front, self._name, request, timeout, 'unused trace ID')
+
+
+class _StreamUnarySyncAsync(interfaces.StreamUnarySyncAsync):
+
+ def __init__(self, front, name, pool):
+ self._front = front
+ self._name = name
+ self._pool = pool
+
+ def __call__(self, request_iterator, timeout):
+ return _calls.blocking_stream_in_value_out(
+ self._front, self._name, request_iterator, timeout, 'unused trace ID')
+
+ def async(self, request_iterator, timeout):
+ return _calls.future_stream_in_value_out(
+ self._front, self._name, request_iterator, timeout, 'unused trace ID',
+ self._pool)
+
+
class _Server(interfaces.Server):
"""An interfaces.Server implementation."""
@@ -117,6 +149,12 @@ class _Stub(interfaces.Stub):
self._front, name, response_consumer, abortion_callback, timeout,
'unused trace ID')
+ def unary_unary_sync_async(self, name):
+ return _UnaryUnarySyncAsync(self._front, name)
+
+ def stream_unary_sync_async(self, name):
+ return _StreamUnarySyncAsync(self._front, name, self._pool)
+
def _aggregate_methods(
pool,
diff --git a/src/python/src/grpc/framework/face/interfaces.py b/src/python/src/grpc/framework/face/interfaces.py
index 548e9ce4db..9e19106e6f 100644
--- a/src/python/src/grpc/framework/face/interfaces.py
+++ b/src/python/src/grpc/framework/face/interfaces.py
@@ -59,6 +59,96 @@ class CancellableIterator(object):
raise NotImplementedError()
+class UnaryUnarySyncAsync(object):
+ """Affords invoking a unary-unary RPC synchronously or asynchronously.
+
+ Values implementing this interface are directly callable and present an
+ "async" method. Both calls take a request value and a numeric timeout.
+ Direct invocation of a value of this type invokes its associated RPC and
+ blocks until the RPC's response is available. Calling the "async" method
+ of a value of this type invokes its associated RPC and immediately returns a
+ future.Future bound to the asynchronous execution of the RPC.
+ """
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def __call__(self, request, timeout):
+ """Synchronously invokes the underlying RPC.
+
+ Args:
+ request: The request value for the RPC.
+ timeout: A duration of time in seconds to allow for the RPC.
+
+ Returns:
+ The response value for the RPC.
+
+ Raises:
+ exceptions.RpcError: Indicating that the RPC was aborted.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def async(self, request, timeout):
+ """Asynchronously invokes the underlying RPC.
+
+ Args:
+ request: The request value for the RPC.
+ timeout: A duration of time in seconds to allow for the RPC.
+
+ Returns:
+ A future.Future representing the RPC. In the event of RPC completion, the
+ returned Future's result value will be the response value of the RPC.
+ In the event of RPC abortion, the returned Future's exception value
+ will be an exceptions.RpcError.
+ """
+ raise NotImplementedError()
+
+
+class StreamUnarySyncAsync(object):
+ """Affords invoking a stream-unary RPC synchronously or asynchronously.
+
+ Values implementing this interface are directly callable and present an
+ "async" method. Both calls take an iterator of request values and a numeric
+ timeout. Direct invocation of a value of this type invokes its associated RPC
+ and blocks until the RPC's response is available. Calling the "async" method
+ of a value of this type invokes its associated RPC and immediately returns a
+ future.Future bound to the asynchronous execution of the RPC.
+ """
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def __call__(self, request_iterator, timeout):
+ """Synchronously invokes the underlying RPC.
+
+ Args:
+ request_iterator: An iterator that yields request values for the RPC.
+ timeout: A duration of time in seconds to allow for the RPC.
+
+ Returns:
+ The response value for the RPC.
+
+ Raises:
+ exceptions.RpcError: Indicating that the RPC was aborted.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def async(self, request, timeout):
+ """Asynchronously invokes the underlying RPC.
+
+ Args:
+ request_iterator: An iterator that yields request values for the RPC.
+ timeout: A duration of time in seconds to allow for the RPC.
+
+ Returns:
+ A future.Future representing the RPC. In the event of RPC completion, the
+ returned Future's result value will be the response value of the RPC.
+ In the event of RPC abortion, the returned Future's exception value
+ will be an exceptions.RpcError.
+ """
+ raise NotImplementedError()
+
+
@enum.unique
class Abortion(enum.Enum):
"""Categories of RPC abortion."""
@@ -540,3 +630,27 @@ class Stub(object):
request values of the RPC should be passed.
"""
raise NotImplementedError()
+
+ @abc.abstractmethod
+ def unary_unary_sync_async(self, name):
+ """Creates a UnaryUnarySyncAsync value for a unary-unary RPC method.
+
+ Args:
+ name: The RPC method name.
+
+ Returns:
+ A UnaryUnarySyncAsync value for the named unary-unary RPC method.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def stream_unary_sync_async(self, name):
+ """Creates a StreamUnarySyncAsync value for a stream-unary RPC method.
+
+ Args:
+ name: The RPC method name.
+
+ Returns:
+ A StreamUnarySyncAsync value for the named stream-unary RPC method.
+ """
+ raise NotImplementedError()
diff --git a/src/python/src/grpc/framework/face/testing/blocking_invocation_inline_service_test_case.py b/src/python/src/grpc/framework/face/testing/blocking_invocation_inline_service_test_case.py
index 993098f4ae..30ff4a3290 100644
--- a/src/python/src/grpc/framework/face/testing/blocking_invocation_inline_service_test_case.py
+++ b/src/python/src/grpc/framework/face/testing/blocking_invocation_inline_service_test_case.py
@@ -146,7 +146,8 @@ class BlockingInvocationInlineServiceTestCase(
with self.control.pause(), self.assertRaises(
exceptions.ExpirationError):
- self.stub.blocking_value_in_value_out(name, request, _TIMEOUT)
+ sync_async = self.stub.unary_unary_sync_async(name)
+ sync_async(request, _TIMEOUT)
def testExpiredUnaryRequestStreamResponse(self):
for name, test_messages_sequence in (
@@ -168,7 +169,8 @@ class BlockingInvocationInlineServiceTestCase(
with self.control.pause(), self.assertRaises(
exceptions.ExpirationError):
- self.stub.blocking_stream_in_value_out(name, iter(requests), _TIMEOUT)
+ sync_async = self.stub.stream_unary_sync_async(name)
+ sync_async(iter(requests), _TIMEOUT)
def testExpiredStreamRequestStreamResponse(self):
for name, test_messages_sequence in (
diff --git a/src/python/src/grpc/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py b/src/python/src/grpc/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py
index 42db3050e1..c87846f2ef 100644
--- a/src/python/src/grpc/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py
+++ b/src/python/src/grpc/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py
@@ -190,8 +190,8 @@ class FutureInvocationAsynchronousEventServiceTestCase(
request = test_messages.request()
with self.control.pause():
- response_future = self.stub.future_value_in_value_out(
- name, request, _TIMEOUT)
+ sync_async = self.stub.unary_unary_sync_async(name)
+ response_future = sync_async.async(request, _TIMEOUT)
self.assertIsInstance(
response_future.exception(), exceptions.ExpirationError)
with self.assertRaises(exceptions.ExpirationError):
@@ -216,8 +216,8 @@ class FutureInvocationAsynchronousEventServiceTestCase(
requests = test_messages.requests()
with self.control.pause():
- response_future = self.stub.future_stream_in_value_out(
- name, iter(requests), _TIMEOUT)
+ sync_async = self.stub.stream_unary_sync_async(name)
+ response_future = sync_async.async(iter(requests), _TIMEOUT)
self.assertIsInstance(
response_future.exception(), exceptions.ExpirationError)
with self.assertRaises(exceptions.ExpirationError):