aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python
diff options
context:
space:
mode:
authorGravatar Masood Malekghassemi <soltanmm@users.noreply.github.com>2015-02-25 12:34:28 -0800
committerGravatar Masood Malekghassemi <soltanmm@users.noreply.github.com>2015-02-25 12:34:28 -0800
commit02a74dd244d18390adcd76162cb853304dd44964 (patch)
treeae1d539e7a96b3268d256b02d9d9cbc7f1399ffc /src/python
parent08d27fcd115e6e9a88b195c919fb23afa0f810e9 (diff)
parent9280790a5d5817855463d8a940d386954dc62ce1 (diff)
Merge pull request #785 from nathanielmanistaatgoogle/early-adopter-changes
Final changes to the early_adopter API.
Diffstat (limited to 'src/python')
-rw-r--r--src/python/interop/interop/methods.py34
-rw-r--r--src/python/src/grpc/early_adopter/_assembly_utilities.py168
-rw-r--r--src/python/src/grpc/early_adopter/_face_utilities.py178
-rw-r--r--src/python/src/grpc/early_adopter/_reexport.py207
-rw-r--r--src/python/src/grpc/early_adopter/exceptions.py48
-rw-r--r--src/python/src/grpc/early_adopter/implementations.py145
-rw-r--r--src/python/src/grpc/early_adopter/implementations_test.py176
-rw-r--r--src/python/src/grpc/early_adopter/interfaces.py271
-rw-r--r--src/python/src/grpc/early_adopter/utilities.py132
9 files changed, 1012 insertions, 347 deletions
diff --git a/src/python/interop/interop/methods.py b/src/python/interop/interop/methods.py
index 26c1869f93..6d5990087e 100644
--- a/src/python/interop/interop/methods.py
+++ b/src/python/interop/interop/methods.py
@@ -34,47 +34,47 @@ from grpc.early_adopter import utilities
from interop import empty_pb2
from interop import messages_pb2
-def _empty_call(request):
+def _empty_call(request, unused_context):
return empty_pb2.Empty()
-_CLIENT_EMPTY_CALL = utilities.unary_unary_client_rpc_method(
+_CLIENT_EMPTY_CALL = utilities.unary_unary_invocation_description(
empty_pb2.Empty.SerializeToString, empty_pb2.Empty.FromString)
-_SERVER_EMPTY_CALL = utilities.unary_unary_server_rpc_method(
+_SERVER_EMPTY_CALL = utilities.unary_unary_service_description(
_empty_call, empty_pb2.Empty.FromString,
empty_pb2.Empty.SerializeToString)
-def _unary_call(request):
+def _unary_call(request, unused_context):
return messages_pb2.SimpleResponse(
payload=messages_pb2.Payload(
type=messages_pb2.COMPRESSABLE,
body=b'\x00' * request.response_size))
-_CLIENT_UNARY_CALL = utilities.unary_unary_client_rpc_method(
+_CLIENT_UNARY_CALL = utilities.unary_unary_invocation_description(
messages_pb2.SimpleRequest.SerializeToString,
messages_pb2.SimpleResponse.FromString)
-_SERVER_UNARY_CALL = utilities.unary_unary_server_rpc_method(
+_SERVER_UNARY_CALL = utilities.unary_unary_service_description(
_unary_call, messages_pb2.SimpleRequest.FromString,
messages_pb2.SimpleResponse.SerializeToString)
-def _streaming_output_call(request):
+def _streaming_output_call(request, unused_context):
for response_parameters in request.response_parameters:
yield messages_pb2.StreamingOutputCallResponse(
payload=messages_pb2.Payload(
type=request.response_type,
body=b'\x00' * response_parameters.size))
-_CLIENT_STREAMING_OUTPUT_CALL = utilities.unary_stream_client_rpc_method(
+_CLIENT_STREAMING_OUTPUT_CALL = utilities.unary_stream_invocation_description(
messages_pb2.StreamingOutputCallRequest.SerializeToString,
messages_pb2.StreamingOutputCallResponse.FromString)
-_SERVER_STREAMING_OUTPUT_CALL = utilities.unary_stream_server_rpc_method(
+_SERVER_STREAMING_OUTPUT_CALL = utilities.unary_stream_service_description(
_streaming_output_call,
messages_pb2.StreamingOutputCallRequest.FromString,
messages_pb2.StreamingOutputCallResponse.SerializeToString)
-def _streaming_input_call(request_iterator):
+def _streaming_input_call(request_iterator, unused_context):
aggregate_size = 0
for request in request_iterator:
if request.payload and request.payload.body:
@@ -82,35 +82,35 @@ def _streaming_input_call(request_iterator):
return messages_pb2.StreamingInputCallResponse(
aggregated_payload_size=aggregate_size)
-_CLIENT_STREAMING_INPUT_CALL = utilities.stream_unary_client_rpc_method(
+_CLIENT_STREAMING_INPUT_CALL = utilities.stream_unary_invocation_description(
messages_pb2.StreamingInputCallRequest.SerializeToString,
messages_pb2.StreamingInputCallResponse.FromString)
-_SERVER_STREAMING_INPUT_CALL = utilities.stream_unary_server_rpc_method(
+_SERVER_STREAMING_INPUT_CALL = utilities.stream_unary_service_description(
_streaming_input_call,
messages_pb2.StreamingInputCallRequest.FromString,
messages_pb2.StreamingInputCallResponse.SerializeToString)
-def _full_duplex_call(request_iterator):
+def _full_duplex_call(request_iterator, unused_context):
for request in request_iterator:
yield messages_pb2.StreamingOutputCallResponse(
payload=messages_pb2.Payload(
type=request.payload.type,
body=b'\x00' * request.response_parameters[0].size))
-_CLIENT_FULL_DUPLEX_CALL = utilities.stream_stream_client_rpc_method(
+_CLIENT_FULL_DUPLEX_CALL = utilities.stream_stream_invocation_description(
messages_pb2.StreamingOutputCallRequest.SerializeToString,
messages_pb2.StreamingOutputCallResponse.FromString)
-_SERVER_FULL_DUPLEX_CALL = utilities.stream_stream_server_rpc_method(
+_SERVER_FULL_DUPLEX_CALL = utilities.stream_stream_service_description(
_full_duplex_call,
messages_pb2.StreamingOutputCallRequest.FromString,
messages_pb2.StreamingOutputCallResponse.SerializeToString)
# NOTE(nathaniel): Apparently this is the same as the full-duplex call?
-_CLIENT_HALF_DUPLEX_CALL = utilities.stream_stream_client_rpc_method(
+_CLIENT_HALF_DUPLEX_CALL = utilities.stream_stream_invocation_description(
messages_pb2.StreamingOutputCallRequest.SerializeToString,
messages_pb2.StreamingOutputCallResponse.FromString)
-_SERVER_HALF_DUPLEX_CALL = utilities.stream_stream_server_rpc_method(
+_SERVER_HALF_DUPLEX_CALL = utilities.stream_stream_service_description(
_full_duplex_call,
messages_pb2.StreamingOutputCallRequest.FromString,
messages_pb2.StreamingOutputCallResponse.SerializeToString)
diff --git a/src/python/src/grpc/early_adopter/_assembly_utilities.py b/src/python/src/grpc/early_adopter/_assembly_utilities.py
new file mode 100644
index 0000000000..facfc2bf0e
--- /dev/null
+++ b/src/python/src/grpc/early_adopter/_assembly_utilities.py
@@ -0,0 +1,168 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import abc
+import collections
+
+# assembly_interfaces is referenced from specification in this module.
+from grpc.framework.assembly import interfaces as assembly_interfaces # pylint: disable=unused-import
+from grpc.framework.assembly import utilities as assembly_utilities
+from grpc.early_adopter import _reexport
+from grpc.early_adopter import interfaces
+
+
+# TODO(issue 726): Kill the "implementations" attribute of this in favor
+# of the same-information-less-bogusly-represented "cardinalities".
+class InvocationBreakdown(object):
+ """An intermediate representation of invocation-side views of RPC methods.
+
+ Attributes:
+ cardinalities: A dictionary from RPC method name to interfaces.Cardinality
+ value.
+ implementations: A dictionary from RPC method name to
+ assembly_interfaces.MethodImplementation describing the method.
+ request_serializers: A dictionary from RPC method name to callable
+ behavior to be used serializing request values for the RPC.
+ response_deserializers: A dictionary from RPC method name to callable
+ behavior to be used deserializing response values for the RPC.
+ """
+ __metaclass__ = abc.ABCMeta
+
+
+class _EasyInvocationBreakdown(
+ InvocationBreakdown,
+ collections.namedtuple(
+ '_EasyInvocationBreakdown',
+ ('cardinalities', 'implementations', 'request_serializers',
+ 'response_deserializers'))):
+ pass
+
+
+class ServiceBreakdown(object):
+ """An intermediate representation of service-side views of RPC methods.
+
+ Attributes:
+ implementations: A dictionary from RPC method name
+ assembly_interfaces.MethodImplementation implementing the RPC method.
+ request_deserializers: A dictionary from RPC method name to callable
+ behavior to be used deserializing request values for the RPC.
+ response_serializers: A dictionary from RPC method name to callable
+ behavior to be used serializing response values for the RPC.
+ """
+ __metaclass__ = abc.ABCMeta
+
+
+class _EasyServiceBreakdown(
+ ServiceBreakdown,
+ collections.namedtuple(
+ '_EasyServiceBreakdown',
+ ('implementations', 'request_deserializers', 'response_serializers'))):
+ pass
+
+
+def break_down_invocation(method_descriptions):
+ """Derives an InvocationBreakdown from several RPC method descriptions.
+
+ Args:
+ method_descriptions: A dictionary from RPC method name to
+ interfaces.RpcMethodInvocationDescription describing the RPCs.
+
+ Returns:
+ An InvocationBreakdown corresponding to the given method descriptions.
+ """
+ cardinalities = {}
+ implementations = {}
+ request_serializers = {}
+ response_deserializers = {}
+ for name, method_description in method_descriptions.iteritems():
+ cardinality = method_description.cardinality()
+ cardinalities[name] = cardinality
+ if cardinality is interfaces.Cardinality.UNARY_UNARY:
+ implementations[name] = assembly_utilities.unary_unary_inline(None)
+ elif cardinality is interfaces.Cardinality.UNARY_STREAM:
+ implementations[name] = assembly_utilities.unary_stream_inline(None)
+ elif cardinality is interfaces.Cardinality.STREAM_UNARY:
+ implementations[name] = assembly_utilities.stream_unary_inline(None)
+ elif cardinality is interfaces.Cardinality.STREAM_STREAM:
+ implementations[name] = assembly_utilities.stream_stream_inline(None)
+ request_serializers[name] = method_description.serialize_request
+ response_deserializers[name] = method_description.deserialize_response
+ return _EasyInvocationBreakdown(
+ cardinalities, implementations, request_serializers,
+ response_deserializers)
+
+
+def break_down_service(method_descriptions):
+ """Derives a ServiceBreakdown from several RPC method descriptions.
+
+ Args:
+ method_descriptions: A dictionary from RPC method name to
+ interfaces.RpcMethodServiceDescription describing the RPCs.
+
+ Returns:
+ A ServiceBreakdown corresponding to the given method descriptions.
+ """
+ implementations = {}
+ request_deserializers = {}
+ response_serializers = {}
+ for name, method_description in method_descriptions.iteritems():
+ cardinality = method_description.cardinality()
+ if cardinality is interfaces.Cardinality.UNARY_UNARY:
+ def service(
+ request, face_rpc_context,
+ service_behavior=method_description.service_unary_unary):
+ return service_behavior(
+ request, _reexport.rpc_context(face_rpc_context))
+ implementations[name] = assembly_utilities.unary_unary_inline(service)
+ elif cardinality is interfaces.Cardinality.UNARY_STREAM:
+ def service(
+ request, face_rpc_context,
+ service_behavior=method_description.service_unary_stream):
+ return service_behavior(
+ request, _reexport.rpc_context(face_rpc_context))
+ implementations[name] = assembly_utilities.unary_stream_inline(service)
+ elif cardinality is interfaces.Cardinality.STREAM_UNARY:
+ def service(
+ request_iterator, face_rpc_context,
+ service_behavior=method_description.service_stream_unary):
+ return service_behavior(
+ request_iterator, _reexport.rpc_context(face_rpc_context))
+ implementations[name] = assembly_utilities.stream_unary_inline(service)
+ elif cardinality is interfaces.Cardinality.STREAM_STREAM:
+ def service(
+ request_iterator, face_rpc_context,
+ service_behavior=method_description.service_stream_stream):
+ return service_behavior(
+ request_iterator, _reexport.rpc_context(face_rpc_context))
+ implementations[name] = assembly_utilities.stream_stream_inline(service)
+ request_deserializers[name] = method_description.deserialize_request
+ response_serializers[name] = method_description.serialize_response
+
+ return _EasyServiceBreakdown(
+ implementations, request_deserializers, response_serializers)
diff --git a/src/python/src/grpc/early_adopter/_face_utilities.py b/src/python/src/grpc/early_adopter/_face_utilities.py
deleted file mode 100644
index 3e37b08752..0000000000
--- a/src/python/src/grpc/early_adopter/_face_utilities.py
+++ /dev/null
@@ -1,178 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-import abc
-import collections
-
-from grpc.framework.face import interfaces as face_interfaces
-
-from grpc.early_adopter import interfaces
-
-
-class _InlineUnaryUnaryMethod(face_interfaces.InlineValueInValueOutMethod):
-
- def __init__(self, unary_unary_server_rpc_method):
- self._method = unary_unary_server_rpc_method
-
- def service(self, request, context):
- """See face_interfaces.InlineValueInValueOutMethod.service for spec."""
- return self._method.service_unary_unary(request)
-
-
-class _InlineUnaryStreamMethod(face_interfaces.InlineValueInStreamOutMethod):
-
- def __init__(self, unary_stream_server_rpc_method):
- self._method = unary_stream_server_rpc_method
-
- def service(self, request, context):
- """See face_interfaces.InlineValueInStreamOutMethod.service for spec."""
- return self._method.service_unary_stream(request)
-
-
-class _InlineStreamUnaryMethod(face_interfaces.InlineStreamInValueOutMethod):
-
- def __init__(self, stream_unary_server_rpc_method):
- self._method = stream_unary_server_rpc_method
-
- def service(self, request_iterator, context):
- """See face_interfaces.InlineStreamInValueOutMethod.service for spec."""
- return self._method.service_stream_unary(request_iterator)
-
-
-class _InlineStreamStreamMethod(face_interfaces.InlineStreamInStreamOutMethod):
-
- def __init__(self, stream_stream_server_rpc_method):
- self._method = stream_stream_server_rpc_method
-
- def service(self, request_iterator, context):
- """See face_interfaces.InlineStreamInStreamOutMethod.service for spec."""
- return self._method.service_stream_stream(request_iterator)
-
-
-class ClientBreakdown(object):
- """An intermediate representation of invocation-side views of RPC methods.
-
- Attributes:
- request_serializers: A dictionary from RPC method name to callable
- behavior to be used serializing request values for the RPC.
- response_deserializers: A dictionary from RPC method name to callable
- behavior to be used deserializing response values for the RPC.
- """
- __metaclass__ = abc.ABCMeta
-
-
-class _EasyClientBreakdown(
- ClientBreakdown,
- collections.namedtuple(
- '_EasyClientBreakdown',
- ('request_serializers', 'response_deserializers'))):
- pass
-
-
-class ServerBreakdown(object):
- """An intermediate representation of implementations of RPC methods.
-
- Attributes:
- unary_unary_methods: A dictionary from RPC method name to callable
- behavior implementing the RPC method for unary-unary RPC methods.
- unary_stream_methods: A dictionary from RPC method name to callable
- behavior implementing the RPC method for unary-stream RPC methods.
- stream_unary_methods: A dictionary from RPC method name to callable
- behavior implementing the RPC method for stream-unary RPC methods.
- stream_stream_methods: A dictionary from RPC method name to callable
- behavior implementing the RPC method for stream-stream RPC methods.
- request_deserializers: A dictionary from RPC method name to callable
- behavior to be used deserializing request values for the RPC.
- response_serializers: A dictionary from RPC method name to callable
- behavior to be used serializing response values for the RPC.
- """
- __metaclass__ = abc.ABCMeta
-
-
-
-class _EasyServerBreakdown(
- ServerBreakdown,
- collections.namedtuple(
- '_EasyServerBreakdown',
- ('unary_unary_methods', 'unary_stream_methods', 'stream_unary_methods',
- 'stream_stream_methods', 'request_deserializers',
- 'response_serializers'))):
- pass
-
-
-def client_break_down(methods):
- """Derives a ClientBreakdown from several interfaces.ClientRpcMethods.
-
- Args:
- methods: A dictionary from RPC mthod name to
- interfaces.ClientRpcMethod object describing the RPCs.
-
- Returns:
- A ClientBreakdown corresponding to the given methods.
- """
- request_serializers = {}
- response_deserializers = {}
- for name, method in methods.iteritems():
- request_serializers[name] = method.serialize_request
- response_deserializers[name] = method.deserialize_response
- return _EasyClientBreakdown(request_serializers, response_deserializers)
-
-
-def server_break_down(methods):
- """Derives a ServerBreakdown from several interfaces.ServerRpcMethods.
-
- Args:
- methods: A dictionary from RPC mthod name to
- interfaces.ServerRpcMethod object describing the RPCs.
-
- Returns:
- A ServerBreakdown corresponding to the given methods.
- """
- unary_unary = {}
- unary_stream = {}
- stream_unary = {}
- stream_stream = {}
- request_deserializers = {}
- response_serializers = {}
- for name, method in methods.iteritems():
- cardinality = method.cardinality()
- if cardinality is interfaces.Cardinality.UNARY_UNARY:
- unary_unary[name] = _InlineUnaryUnaryMethod(method)
- elif cardinality is interfaces.Cardinality.UNARY_STREAM:
- unary_stream[name] = _InlineUnaryStreamMethod(method)
- elif cardinality is interfaces.Cardinality.STREAM_UNARY:
- stream_unary[name] = _InlineStreamUnaryMethod(method)
- elif cardinality is interfaces.Cardinality.STREAM_STREAM:
- stream_stream[name] = _InlineStreamStreamMethod(method)
- request_deserializers[name] = method.deserialize_request
- response_serializers[name] = method.serialize_response
-
- return _EasyServerBreakdown(
- unary_unary, unary_stream, stream_unary, stream_stream,
- request_deserializers, response_serializers)
diff --git a/src/python/src/grpc/early_adopter/_reexport.py b/src/python/src/grpc/early_adopter/_reexport.py
new file mode 100644
index 0000000000..35855bc9c8
--- /dev/null
+++ b/src/python/src/grpc/early_adopter/_reexport.py
@@ -0,0 +1,207 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import abc
+import collections
+
+from grpc.framework.face import exceptions as face_exceptions
+from grpc.framework.face import interfaces as face_interfaces
+from grpc.framework.foundation import future
+from grpc.early_adopter import exceptions
+from grpc.early_adopter import interfaces
+
+_ABORTION_REEXPORT = {
+ face_interfaces.Abortion.CANCELLED: interfaces.Abortion.CANCELLED,
+ face_interfaces.Abortion.EXPIRED: interfaces.Abortion.EXPIRED,
+ face_interfaces.Abortion.NETWORK_FAILURE:
+ interfaces.Abortion.NETWORK_FAILURE,
+ face_interfaces.Abortion.SERVICED_FAILURE:
+ interfaces.Abortion.SERVICED_FAILURE,
+ face_interfaces.Abortion.SERVICER_FAILURE:
+ interfaces.Abortion.SERVICER_FAILURE,
+}
+
+
+class _RpcError(exceptions.RpcError):
+ pass
+
+
+def _reexport_error(face_rpc_error):
+ if isinstance(face_rpc_error, face_exceptions.CancellationError):
+ return exceptions.CancellationError()
+ elif isinstance(face_rpc_error, face_exceptions.ExpirationError):
+ return exceptions.ExpirationError()
+ else:
+ return _RpcError()
+
+
+def _as_face_abortion_callback(abortion_callback):
+ def face_abortion_callback(face_abortion):
+ abortion_callback(_ABORTION_REEXPORT[face_abortion])
+ return face_abortion_callback
+
+
+class _ReexportedFuture(future.Future):
+
+ def __init__(self, face_future):
+ self._face_future = face_future
+
+ def cancel(self):
+ return self._face_future.cancel()
+
+ def cancelled(self):
+ return self._face_future.cancelled()
+
+ def running(self):
+ return self._face_future.running()
+
+ def done(self):
+ return self._face_future.done()
+
+ def result(self, timeout=None):
+ try:
+ return self._face_future.result(timeout=timeout)
+ except face_exceptions.RpcError as e:
+ raise _reexport_error(e)
+
+ def exception(self, timeout=None):
+ face_error = self._face_future.exception(timeout=timeout)
+ return None if face_error is None else _reexport_error(face_error)
+
+ def traceback(self, timeout=None):
+ return self._face_future.traceback(timeout=timeout)
+
+ def add_done_callback(self, fn):
+ self._face_future.add_done_callback(lambda unused_face_future: fn(self))
+
+
+def _call_reexporting_errors(behavior, *args, **kwargs):
+ try:
+ return behavior(*args, **kwargs)
+ except face_exceptions.RpcError as e:
+ raise _reexport_error(e)
+
+
+def _reexported_future(face_future):
+ return _ReexportedFuture(face_future)
+
+
+class _CancellableIterator(interfaces.CancellableIterator):
+
+ def __init__(self, face_cancellable_iterator):
+ self._face_cancellable_iterator = face_cancellable_iterator
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ return _call_reexporting_errors(self._face_cancellable_iterator.next)
+
+ def cancel(self):
+ self._face_cancellable_iterator.cancel()
+
+
+class _RpcContext(interfaces.RpcContext):
+
+ def __init__(self, face_rpc_context):
+ self._face_rpc_context = face_rpc_context
+
+ def is_active(self):
+ return self._face_rpc_context.is_active()
+
+ def time_remaining(self):
+ return self._face_rpc_context.time_remaining()
+
+ def add_abortion_callback(self, abortion_callback):
+ self._face_rpc_context.add_abortion_callback(
+ _as_face_abortion_callback(abortion_callback))
+
+
+class _UnaryUnarySyncAsync(interfaces.UnaryUnarySyncAsync):
+
+ def __init__(self, face_unary_unary_sync_async):
+ self._underlying = face_unary_unary_sync_async
+
+ def __call__(self, request, timeout):
+ return _call_reexporting_errors(
+ self._underlying, request, timeout)
+
+ def async(self, request, timeout):
+ return _ReexportedFuture(self._underlying.async(request, timeout))
+
+
+class _StreamUnarySyncAsync(interfaces.StreamUnarySyncAsync):
+
+ def __init__(self, face_stream_unary_sync_async):
+ self._underlying = face_stream_unary_sync_async
+
+ def __call__(self, request_iterator, timeout):
+ return _call_reexporting_errors(
+ self._underlying, request_iterator, timeout)
+
+ def async(self, request_iterator, timeout):
+ return _ReexportedFuture(self._underlying.async(request_iterator, timeout))
+
+
+class _Stub(interfaces.Stub):
+
+ def __init__(self, assembly_stub, cardinalities):
+ self._assembly_stub = assembly_stub
+ self._cardinalities = cardinalities
+
+ def __enter__(self):
+ self._assembly_stub.__enter__()
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self._assembly_stub.__exit__(exc_type, exc_val, exc_tb)
+ return False
+
+ def __getattr__(self, attr):
+ underlying_attr = self._assembly_stub.__getattr__(attr)
+ cardinality = self._cardinalities.get(attr)
+ if cardinality is interfaces.Cardinality.UNARY_UNARY:
+ return _UnaryUnarySyncAsync(underlying_attr)
+ elif cardinality is interfaces.Cardinality.UNARY_STREAM:
+ return lambda request, timeout: _CancellableIterator(
+ underlying_attr(request, timeout))
+ elif cardinality is interfaces.Cardinality.STREAM_UNARY:
+ return _StreamUnarySyncAsync(underlying_attr)
+ elif cardinality is interfaces.Cardinality.STREAM_STREAM:
+ return lambda request_iterator, timeout: _CancellableIterator(
+ underlying_attr(request_iterator, timeout))
+ else:
+ raise AttributeError(attr)
+
+def rpc_context(face_rpc_context):
+ return _RpcContext(face_rpc_context)
+
+
+def stub(assembly_stub, cardinalities):
+ return _Stub(assembly_stub, cardinalities)
diff --git a/src/python/src/grpc/early_adopter/exceptions.py b/src/python/src/grpc/early_adopter/exceptions.py
new file mode 100644
index 0000000000..5234d3b91c
--- /dev/null
+++ b/src/python/src/grpc/early_adopter/exceptions.py
@@ -0,0 +1,48 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Exceptions raised by GRPC.
+
+Only GRPC should instantiate and raise these exceptions.
+"""
+
+import abc
+
+
+class RpcError(Exception):
+ """Common super type for all exceptions raised by GRPC."""
+ __metaclass__ = abc.ABCMeta
+
+
+class CancellationError(RpcError):
+ """Indicates that an RPC has been cancelled."""
+
+
+class ExpirationError(RpcError):
+ """Indicates that an RPC has expired ("timed out")."""
diff --git a/src/python/src/grpc/early_adopter/implementations.py b/src/python/src/grpc/early_adopter/implementations.py
index 1d76d0f9e0..241ed7dcdb 100644
--- a/src/python/src/grpc/early_adopter/implementations.py
+++ b/src/python/src/grpc/early_adopter/implementations.py
@@ -31,15 +31,12 @@
import threading
-from grpc._adapter import fore
-from grpc.framework.base.packets import implementations as _tickets_implementations
-from grpc.framework.face import implementations as _face_implementations
-from grpc.framework.foundation import logging_pool
-from grpc.early_adopter import _face_utilities
+from grpc._adapter import fore as _fore
+from grpc._adapter import rear as _rear
+from grpc.early_adopter import _assembly_utilities
+from grpc.early_adopter import _reexport
from grpc.early_adopter import interfaces
-
-_MEGA_TIMEOUT = 60 * 60 * 24
-_THREAD_POOL_SIZE = 80
+from grpc.framework.assembly import implementations as _assembly_implementations
class _Server(interfaces.Server):
@@ -48,63 +45,120 @@ class _Server(interfaces.Server):
self._lock = threading.Lock()
self._breakdown = breakdown
self._port = port
- self._private_key = private_key
- self._certificate_chain = certificate_chain
+ if private_key is None or certificate_chain is None:
+ self._key_chain_pairs = ()
+ else:
+ self._key_chain_pairs = ((private_key, certificate_chain),)
- self._pool = None
self._fore_link = None
- self._back = None
+ self._server = None
- def start(self):
- """See interfaces.Server.start for specification."""
+ def _start(self):
with self._lock:
- if self._pool is None:
- self._pool = logging_pool.pool(_THREAD_POOL_SIZE)
- servicer = _face_implementations.servicer(
- self._pool,
- inline_value_in_value_out_methods=self._breakdown.unary_unary_methods,
- inline_value_in_stream_out_methods=self._breakdown.unary_stream_methods,
- inline_stream_in_value_out_methods=self._breakdown.stream_unary_methods,
- inline_stream_in_stream_out_methods=self._breakdown.stream_stream_methods)
- self._fore_link = fore.ForeLink(
- self._pool, self._breakdown.request_deserializers,
- self._breakdown.response_serializers, None,
- ((self._private_key, self._certificate_chain),), port=self._port)
- self._fore_link.start()
- port = self._fore_link.port()
- self._back = _tickets_implementations.back(
- servicer, self._pool, self._pool, self._pool, _MEGA_TIMEOUT,
- _MEGA_TIMEOUT)
- self._fore_link.join_rear_link(self._back)
- self._back.join_fore_link(self._fore_link)
- return port
+ if self._server is None:
+ self._fore_link = _fore.activated_fore_link(
+ self._port, self._breakdown.request_deserializers,
+ self._breakdown.response_serializers, None, self._key_chain_pairs)
+
+ self._server = _assembly_implementations.assemble_service(
+ self._breakdown.implementations, self._fore_link)
+ self._server.start()
else:
raise ValueError('Server currently running!')
- def stop(self):
- """See interfaces.Server.stop for specification."""
+ def _stop(self):
with self._lock:
- if self._pool is None:
+ if self._server is None:
raise ValueError('Server not running!')
else:
- self._fore_link.stop()
- self._pool.shutdown(wait=True)
- self._pool = None
+ self._server.stop()
+ self._server = None
+ self._fore_link = None
+
+ def __enter__(self):
+ self._start()
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self._stop()
+ return False
+
+ def start(self):
+ self._start()
+
+ def stop(self):
+ self._stop()
+
+ def port(self):
+ with self._lock:
+ return self._fore_link.port()
+
+def _build_stub(
+ methods, host, port, root_certificates, private_key, certificate_chain):
+ breakdown = _assembly_utilities.break_down_invocation(methods)
+ # TODO(nathaniel): pass security values.
+ activated_rear_link = _rear.activated_rear_link(
+ host, port, breakdown.request_serializers,
+ breakdown.response_deserializers)
+ assembly_stub = _assembly_implementations.assemble_dynamic_inline_stub(
+ breakdown.implementations, activated_rear_link)
+ return _reexport.stub(assembly_stub, breakdown.cardinalities)
def _build_server(methods, port, private_key, certificate_chain):
- breakdown = _face_utilities.server_break_down(methods)
+ breakdown = _assembly_utilities.break_down_service(methods)
return _Server(breakdown, port, private_key, certificate_chain)
+def insecure_stub(methods, host, port):
+ """Constructs an insecure interfaces.Stub.
+
+ Args:
+ methods: A dictionary from RPC method name to
+ interfaces.RpcMethodInvocationDescription describing the RPCs to be
+ supported by the created stub.
+ host: The host to which to connect for RPC service.
+ port: The port to which to connect for RPC service.
+
+ Returns:
+ An interfaces.Stub affording RPC invocation.
+ """
+ return _build_stub(methods, host, port, None, None, None)
+
+
+def secure_stub(
+ methods, host, port, root_certificates, private_key, certificate_chain):
+ """Constructs an insecure interfaces.Stub.
+
+ Args:
+ methods: A dictionary from RPC method name to
+ interfaces.RpcMethodInvocationDescription describing the RPCs to be
+ supported by the created stub.
+ host: The host to which to connect for RPC service.
+ port: The port to which to connect for RPC service.
+ root_certificates: The PEM-encoded root certificates or None to ask for
+ them to be retrieved from a default location.
+ private_key: The PEM-encoded private key to use or None if no private key
+ should be used.
+ certificate_chain: The PEM-encoded certificate chain to use or None if no
+ certificate chain should be used.
+
+ Returns:
+ An interfaces.Stub affording RPC invocation.
+ """
+ return _build_stub(
+ methods, host, port, root_certificates, private_key, certificate_chain)
+
+
def insecure_server(methods, port):
"""Constructs an insecure interfaces.Server.
Args:
methods: A dictionary from RPC method name to
- interfaces.ServerRpcMethod object describing the RPCs to
+ interfaces.RpcMethodServiceDescription describing the RPCs to
be serviced by the created server.
- port: The port on which to serve.
+ port: The desired port on which to serve or zero to ask for a port to
+ be automatically selected.
Returns:
An interfaces.Server that will run with no security and
@@ -118,9 +172,10 @@ def secure_server(methods, port, private_key, certificate_chain):
Args:
methods: A dictionary from RPC method name to
- interfaces.ServerRpcMethod object describing the RPCs to
+ interfaces.RpcMethodServiceDescription describing the RPCs to
be serviced by the created server.
- port: The port on which to serve.
+ port: The port on which to serve or zero to ask for a port to be
+ automatically selected.
private_key: A pem-encoded private key.
certificate_chain: A pem-encoded certificate chain.
diff --git a/src/python/src/grpc/early_adopter/implementations_test.py b/src/python/src/grpc/early_adopter/implementations_test.py
new file mode 100644
index 0000000000..9ef06c32cb
--- /dev/null
+++ b/src/python/src/grpc/early_adopter/implementations_test.py
@@ -0,0 +1,176 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+# TODO(nathaniel): Expand this test coverage.
+
+"""Test of the GRPC-backed ForeLink and RearLink."""
+
+import unittest
+
+from grpc.early_adopter import implementations
+from grpc.early_adopter import utilities
+from grpc._junkdrawer import math_pb2
+
+DIV = 'Div'
+DIV_MANY = 'DivMany'
+FIB = 'Fib'
+SUM = 'Sum'
+
+def _fibbonacci(limit):
+ left, right = 0, 1
+ for _ in xrange(limit):
+ yield left
+ left, right = right, left + right
+
+
+def _div(request, unused_context):
+ return math_pb2.DivReply(
+ quotient=request.dividend / request.divisor,
+ remainder=request.dividend % request.divisor)
+
+
+def _div_many(request_iterator, unused_context):
+ for request in request_iterator:
+ yield math_pb2.DivReply(
+ quotient=request.dividend / request.divisor,
+ remainder=request.dividend % request.divisor)
+
+
+def _fib(request, unused_context):
+ for number in _fibbonacci(request.limit):
+ yield math_pb2.Num(num=number)
+
+
+def _sum(request_iterator, unused_context):
+ accumulation = 0
+ for request in request_iterator:
+ accumulation += request.num
+ return math_pb2.Num(num=accumulation)
+
+
+_INVOCATION_DESCRIPTIONS = {
+ DIV: utilities.unary_unary_invocation_description(
+ math_pb2.DivArgs.SerializeToString, math_pb2.DivReply.FromString),
+ DIV_MANY: utilities.stream_stream_invocation_description(
+ math_pb2.DivArgs.SerializeToString, math_pb2.DivReply.FromString),
+ FIB: utilities.unary_stream_invocation_description(
+ math_pb2.FibArgs.SerializeToString, math_pb2.Num.FromString),
+ SUM: utilities.stream_unary_invocation_description(
+ math_pb2.Num.SerializeToString, math_pb2.Num.FromString),
+}
+
+_SERVICE_DESCRIPTIONS = {
+ DIV: utilities.unary_unary_service_description(
+ _div, math_pb2.DivArgs.FromString,
+ math_pb2.DivReply.SerializeToString),
+ DIV_MANY: utilities.stream_stream_service_description(
+ _div_many, math_pb2.DivArgs.FromString,
+ math_pb2.DivReply.SerializeToString),
+ FIB: utilities.unary_stream_service_description(
+ _fib, math_pb2.FibArgs.FromString, math_pb2.Num.SerializeToString),
+ SUM: utilities.stream_unary_service_description(
+ _sum, math_pb2.Num.FromString, math_pb2.Num.SerializeToString),
+}
+
+_TIMEOUT = 3
+
+
+class EarlyAdopterImplementationsTest(unittest.TestCase):
+
+ def setUp(self):
+ self.server = implementations.insecure_server(_SERVICE_DESCRIPTIONS, 0)
+ self.server.start()
+ port = self.server.port()
+ self.stub = implementations.insecure_stub(_INVOCATION_DESCRIPTIONS, 'localhost', port)
+
+ def tearDown(self):
+ self.server.stop()
+
+ def testUpAndDown(self):
+ with self.stub:
+ pass
+
+ def testUnaryUnary(self):
+ divisor = 59
+ dividend = 973
+ expected_quotient = dividend / divisor
+ expected_remainder = dividend % divisor
+
+ with self.stub:
+ response = self.stub.Div(
+ math_pb2.DivArgs(divisor=divisor, dividend=dividend), _TIMEOUT)
+ self.assertEqual(expected_quotient, response.quotient)
+ self.assertEqual(expected_remainder, response.remainder)
+
+ def testUnaryStream(self):
+ stream_length = 43
+
+ with self.stub:
+ response_iterator = self.stub.Fib(
+ math_pb2.FibArgs(limit=stream_length), _TIMEOUT)
+ numbers = tuple(response.num for response in response_iterator)
+ for early, middle, later in zip(numbers, numbers[:1], numbers[:2]):
+ self.assertEqual(early + middle, later)
+ self.assertEqual(stream_length, len(numbers))
+
+ def testStreamUnary(self):
+ stream_length = 127
+
+ with self.stub:
+ response_future = self.stub.Sum.async(
+ (math_pb2.Num(num=index) for index in range(stream_length)),
+ _TIMEOUT)
+ self.assertEqual(
+ (stream_length * (stream_length - 1)) / 2,
+ response_future.result().num)
+
+ def testStreamStream(self):
+ stream_length = 179
+ divisor_offset = 71
+ dividend_offset = 1763
+
+ with self.stub:
+ response_iterator = self.stub.DivMany(
+ (math_pb2.DivArgs(
+ divisor=divisor_offset + index,
+ dividend=dividend_offset + index)
+ for index in range(stream_length)),
+ _TIMEOUT)
+ for index, response in enumerate(response_iterator):
+ self.assertEqual(
+ (dividend_offset + index) / (divisor_offset + index),
+ response.quotient)
+ self.assertEqual(
+ (dividend_offset + index) % (divisor_offset + index),
+ response.remainder)
+ self.assertEqual(stream_length, index + 1)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/src/python/src/grpc/early_adopter/interfaces.py b/src/python/src/grpc/early_adopter/interfaces.py
index 0ec371f8e8..b733873c1c 100644
--- a/src/python/src/grpc/early_adopter/interfaces.py
+++ b/src/python/src/grpc/early_adopter/interfaces.py
@@ -32,6 +32,11 @@
import abc
import enum
+# exceptions is referenced from specification in this module.
+from grpc.early_adopter import exceptions # pylint: disable=unused-import
+from grpc.framework.foundation import activated
+from grpc.framework.foundation import future
+
@enum.unique
class Cardinality(enum.Enum):
@@ -43,24 +48,166 @@ class Cardinality(enum.Enum):
STREAM_STREAM = 'request-streaming/response-streaming'
-class RpcMethod(object):
- """A type for the common aspects of RPC method specifications."""
+@enum.unique
+class Abortion(enum.Enum):
+ """Categories of RPC abortion."""
+
+ CANCELLED = 'cancelled'
+ EXPIRED = 'expired'
+ NETWORK_FAILURE = 'network failure'
+ SERVICED_FAILURE = 'serviced failure'
+ SERVICER_FAILURE = 'servicer failure'
+
+
+class CancellableIterator(object):
+ """Implements the Iterator protocol and affords a cancel method."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def __iter__(self):
+ """Returns the self object in accordance with the Iterator protocol."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def next(self):
+ """Returns a value or raises StopIteration per the Iterator protocol."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def cancel(self):
+ """Requests cancellation of whatever computation underlies this iterator."""
+ raise NotImplementedError()
+
+
+class RpcContext(object):
+ """Provides RPC-related information and action."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def is_active(self):
+ """Describes whether the RPC is active or has terminated."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def time_remaining(self):
+ """Describes the length of allowed time remaining for the RPC.
+ Returns:
+ A nonnegative float indicating the length of allowed time in seconds
+ remaining for the RPC to complete before it is considered to have timed
+ out.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def add_abortion_callback(self, abortion_callback):
+ """Registers a callback to be called if the RPC is aborted.
+ Args:
+ abortion_callback: A callable to be called and passed an Abortion value
+ in the event of RPC abortion.
+ """
+ raise NotImplementedError()
+
+
+class UnaryUnarySyncAsync(object):
+ """Affords invoking a unary-unary RPC synchronously or asynchronously.
+ Values implementing this interface are directly callable and present an
+ "async" method. Both calls take a request value and a numeric timeout.
+ Direct invocation of a value of this type invokes its associated RPC and
+ blocks until the RPC's response is available. Calling the "async" method
+ of a value of this type invokes its associated RPC and immediately returns a
+ future.Future bound to the asynchronous execution of the RPC.
+ """
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def __call__(self, request, timeout):
+ """Synchronously invokes the underlying RPC.
+ Args:
+ request: The request value for the RPC.
+ timeout: A duration of time in seconds to allow for the RPC.
+ Returns:
+ The response value for the RPC.
+ Raises:
+ exceptions.RpcError: Indicating that the RPC was aborted.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def async(self, request, timeout):
+ """Asynchronously invokes the underlying RPC.
+ Args:
+ request: The request value for the RPC.
+ timeout: A duration of time in seconds to allow for the RPC.
+ Returns:
+ A future.Future representing the RPC. In the event of RPC completion, the
+ returned Future's result value will be the response value of the RPC.
+ In the event of RPC abortion, the returned Future's exception value
+ will be an exceptions.RpcError.
+ """
+ raise NotImplementedError()
+
+
+class StreamUnarySyncAsync(object):
+ """Affords invoking a stream-unary RPC synchronously or asynchronously.
+ Values implementing this interface are directly callable and present an
+ "async" method. Both calls take an iterator of request values and a numeric
+ timeout. Direct invocation of a value of this type invokes its associated RPC
+ and blocks until the RPC's response is available. Calling the "async" method
+ of a value of this type invokes its associated RPC and immediately returns a
+ future.Future bound to the asynchronous execution of the RPC.
+ """
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def __call__(self, request_iterator, timeout):
+ """Synchronously invokes the underlying RPC.
+
+ Args:
+ request_iterator: An iterator that yields request values for the RPC.
+ timeout: A duration of time in seconds to allow for the RPC.
+
+ Returns:
+ The response value for the RPC.
+
+ Raises:
+ exceptions.RpcError: Indicating that the RPC was aborted.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def async(self, request_iterator, timeout):
+ """Asynchronously invokes the underlying RPC.
+
+ Args:
+ request_iterator: An iterator that yields request values for the RPC.
+ timeout: A duration of time in seconds to allow for the RPC.
+
+ Returns:
+ A future.Future representing the RPC. In the event of RPC completion, the
+ returned Future's result value will be the response value of the RPC.
+ In the event of RPC abortion, the returned Future's exception value
+ will be an exceptions.RpcError.
+ """
+ raise NotImplementedError()
+
+
+class RpcMethodDescription(object):
+ """A type for the common aspects of RPC method descriptions."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def cardinality(self):
- """Identifies the cardinality of this RpcMethod.
+ """Identifies the cardinality of this RpcMethodDescription.
Returns:
A Cardinality value identifying whether or not this
- RpcMethod is request-unary or request-streaming and
- whether or not it is response-unary or
- response-streaming.
+ RpcMethodDescription is request-unary or request-streaming and
+ whether or not it is response-unary or response-streaming.
"""
raise NotImplementedError()
-class ClientRpcMethod(RpcMethod):
+class RpcMethodInvocationDescription(RpcMethodDescription):
"""Invocation-side description of an RPC method."""
__metaclass__ = abc.ABCMeta
@@ -69,7 +216,8 @@ class ClientRpcMethod(RpcMethod):
"""Serializes a request value.
Args:
- request: A request value appropriate for this RpcMethod.
+ request: A request value appropriate for the RPC method described by this
+ RpcMethodInvocationDescription.
Returns:
The serialization of the given request value as a
@@ -82,9 +230,9 @@ class ClientRpcMethod(RpcMethod):
"""Deserializes a response value.
Args:
- serialized_response: A bytestring that is the
- serialization of a response value appropriate for this
- RpcMethod.
+ serialized_response: A bytestring that is the serialization of a response
+ value appropriate for the RPC method described by this
+ RpcMethodInvocationDescription.
Returns:
A response value corresponding to the given bytestring.
@@ -92,7 +240,7 @@ class ClientRpcMethod(RpcMethod):
raise NotImplementedError()
-class ServerRpcMethod(RpcMethod):
+class RpcMethodServiceDescription(RpcMethodDescription):
"""Service-side description of an RPC method."""
__metaclass__ = abc.ABCMeta
@@ -101,9 +249,9 @@ class ServerRpcMethod(RpcMethod):
"""Deserializes a request value.
Args:
- serialized_request: A bytestring that is the
- serialization of a request value appropriate for this
- RpcMethod.
+ serialized_request: A bytestring that is the serialization of a request
+ value appropriate for the RPC method described by this
+ RpcMethodServiceDescription.
Returns:
A request value corresponding to the given bytestring.
@@ -115,7 +263,8 @@ class ServerRpcMethod(RpcMethod):
"""Serializes a response value.
Args:
- response: A response value appropriate for this RpcMethod.
+ response: A response value appropriate for the RPC method described by
+ this RpcMethodServiceDescription.
Returns:
The serialization of the given response value as a
@@ -124,80 +273,116 @@ class ServerRpcMethod(RpcMethod):
raise NotImplementedError()
@abc.abstractmethod
- def service_unary_unary(self, request):
+ def service_unary_unary(self, request, context):
"""Carries out this RPC.
This method may only be called if the cardinality of this
- RpcMethod is Cardinality.UNARY_UNARY.
+ RpcMethodServiceDescription is Cardinality.UNARY_UNARY.
Args:
- request: A request value appropriate for this RpcMethod.
+ request: A request value appropriate for the RPC method described by this
+ RpcMethodServiceDescription.
+ context: An RpcContext object for the RPC.
Returns:
- A response value appropriate for this RpcMethod.
+ A response value appropriate for the RPC method described by this
+ RpcMethodServiceDescription.
"""
raise NotImplementedError()
@abc.abstractmethod
- def service_unary_stream(self, request):
+ def service_unary_stream(self, request, context):
"""Carries out this RPC.
This method may only be called if the cardinality of this
- RpcMethod is Cardinality.UNARY_STREAM.
+ RpcMethodServiceDescription is Cardinality.UNARY_STREAM.
Args:
- request: A request value appropriate for this RpcMethod.
+ request: A request value appropriate for the RPC method described by this
+ RpcMethodServiceDescription.
+ context: An RpcContext object for the RPC.
Yields:
- Zero or more response values appropriate for this
- RpcMethod.
+ Zero or more response values appropriate for the RPC method described by
+ this RpcMethodServiceDescription.
"""
raise NotImplementedError()
@abc.abstractmethod
- def service_stream_unary(self, request_iterator):
+ def service_stream_unary(self, request_iterator, context):
"""Carries out this RPC.
This method may only be called if the cardinality of this
- RpcMethod is Cardinality.STREAM_UNARY.
+ RpcMethodServiceDescription is Cardinality.STREAM_UNARY.
Args:
- request_iterator: An iterator of request values
- appropriate for this RpcMethod.
+ request_iterator: An iterator of request values appropriate for the RPC
+ method described by this RpcMethodServiceDescription.
+ context: An RpcContext object for the RPC.
Returns:
- A response value appropriate for this RpcMethod.
+ A response value appropriate for the RPC method described by this
+ RpcMethodServiceDescription.
"""
raise NotImplementedError()
@abc.abstractmethod
- def service_stream_stream(self, request_iterator):
+ def service_stream_stream(self, request_iterator, context):
"""Carries out this RPC.
This method may only be called if the cardinality of this
- RpcMethod is Cardinality.STREAM_STREAM.
+ RpcMethodServiceDescription is Cardinality.STREAM_STREAM.
Args:
- request_iterator: An iterator of request values
- appropriate for this RpcMethod.
+ request_iterator: An iterator of request values appropriate for the RPC
+ method described by this RpcMethodServiceDescription.
+ context: An RpcContext object for the RPC.
Yields:
- Zero or more response values appropraite for this
- RpcMethod.
+ Zero or more response values appropriate for the RPC method described by
+ this RpcMethodServiceDescription.
"""
raise NotImplementedError()
-class Server(object):
+class Stub(object):
+ """A stub with callable RPC method names for attributes.
+
+ Instances of this type are context managers and only afford RPC invocation
+ when used in context.
+
+ Instances of this type, when used in context, respond to attribute access
+ as follows: if the requested attribute is the name of a unary-unary RPC
+ method, the value of the attribute will be a UnaryUnarySyncAsync with which
+ to invoke the RPC method. If the requested attribute is the name of a
+ unary-stream RPC method, the value of the attribute will be a callable taking
+ a request object and a timeout parameter and returning a CancellableIterator
+ that yields the response values of the RPC. If the requested attribute is the
+ name of a stream-unary RPC method, the value of the attribute will be a
+ StreamUnarySyncAsync with which to invoke the RPC method. If the requested
+ attribute is the name of a stream-stream RPC method, the value of the
+ attribute will be a callable taking an iterator of request objects and a
+ timeout and returning a CancellableIterator that yields the response values
+ of the RPC.
+
+ In all cases indication of abortion is indicated by raising of
+ exceptions.RpcError, exceptions.CancellationError,
+ and exceptions.ExpirationError.
+ """
+ __metaclass__ = abc.ABCMeta
+
+
+class Server(activated.Activated):
"""A GRPC Server."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
- def start(self):
- """Instructs this server to commence service of RPCs."""
- raise NotImplementedError()
+ def port(self):
+ """Reports the port on which the server is serving.
- @abc.abstractmethod
- def stop(self):
- """Instructs this server to halt service of RPCs."""
+ This method may only be called while the server is activated.
+
+ Returns:
+ The port on which the server is serving.
+ """
raise NotImplementedError()
diff --git a/src/python/src/grpc/early_adopter/utilities.py b/src/python/src/grpc/early_adopter/utilities.py
index 9277d3f6ad..da8ef825aa 100644
--- a/src/python/src/grpc/early_adopter/utilities.py
+++ b/src/python/src/grpc/early_adopter/utilities.py
@@ -32,7 +32,9 @@
from grpc.early_adopter import interfaces
-class _RpcMethod(interfaces.ClientRpcMethod, interfaces.ServerRpcMethod):
+class _RpcMethodDescription(
+ interfaces.RpcMethodInvocationDescription,
+ interfaces.RpcMethodServiceDescription):
def __init__(
self, cardinality, unary_unary, unary_stream, stream_unary,
@@ -49,44 +51,45 @@ class _RpcMethod(interfaces.ClientRpcMethod, interfaces.ServerRpcMethod):
self._response_deserializer = response_deserializer
def cardinality(self):
- """See interfaces.RpcMethod.cardinality for specification."""
+ """See interfaces.RpcMethodDescription.cardinality for specification."""
return self._cardinality
def serialize_request(self, request):
- """See interfaces.RpcMethod.serialize_request for specification."""
+ """See interfaces.RpcMethodInvocationDescription.serialize_request."""
return self._request_serializer(request)
def deserialize_request(self, serialized_request):
- """See interfaces.RpcMethod.deserialize_request for specification."""
+ """See interfaces.RpcMethodServiceDescription.deserialize_request."""
return self._request_deserializer(serialized_request)
def serialize_response(self, response):
- """See interfaces.RpcMethod.serialize_response for specification."""
+ """See interfaces.RpcMethodServiceDescription.serialize_response."""
return self._response_serializer(response)
def deserialize_response(self, serialized_response):
- """See interfaces.RpcMethod.deserialize_response for specification."""
+ """See interfaces.RpcMethodInvocationDescription.deserialize_response."""
return self._response_deserializer(serialized_response)
- def service_unary_unary(self, request):
- """See interfaces.RpcMethod.service_unary_unary for specification."""
- return self._unary_unary(request)
+ def service_unary_unary(self, request, context):
+ """See interfaces.RpcMethodServiceDescription.service_unary_unary."""
+ return self._unary_unary(request, context)
- def service_unary_stream(self, request):
- """See interfaces.RpcMethod.service_unary_stream for specification."""
- return self._unary_stream(request)
+ def service_unary_stream(self, request, context):
+ """See interfaces.RpcMethodServiceDescription.service_unary_stream."""
+ return self._unary_stream(request, context)
- def service_stream_unary(self, request_iterator):
- """See interfaces.RpcMethod.service_stream_unary for specification."""
- return self._stream_unary(request_iterator)
+ def service_stream_unary(self, request_iterator, context):
+ """See interfaces.RpcMethodServiceDescription.service_stream_unary."""
+ return self._stream_unary(request_iterator, context)
- def service_stream_stream(self, request_iterator):
- """See interfaces.RpcMethod.service_stream_stream for specification."""
- return self._stream_stream(request_iterator)
+ def service_stream_stream(self, request_iterator, context):
+ """See interfaces.RpcMethodServiceDescription.service_stream_stream."""
+ return self._stream_stream(request_iterator, context)
-def unary_unary_client_rpc_method(request_serializer, response_deserializer):
- """Constructs an interfaces.ClientRpcMethod for a unary-unary RPC method.
+def unary_unary_invocation_description(
+ request_serializer, response_deserializer):
+ """Creates an interfaces.RpcMethodInvocationDescription for an RPC method.
Args:
request_serializer: A callable that when called on a request
@@ -96,17 +99,17 @@ def unary_unary_client_rpc_method(request_serializer, response_deserializer):
that bytestring.
Returns:
- An interfaces.ClientRpcMethod constructed from the given
- arguments representing a unary-request/unary-response RPC
- method.
+ An interfaces.RpcMethodInvocationDescription constructed from the given
+ arguments representing a unary-request/unary-response RPC method.
"""
- return _RpcMethod(
+ return _RpcMethodDescription(
interfaces.Cardinality.UNARY_UNARY, None, None, None, None,
request_serializer, None, None, response_deserializer)
-def unary_stream_client_rpc_method(request_serializer, response_deserializer):
- """Constructs an interfaces.ClientRpcMethod for a unary-stream RPC method.
+def unary_stream_invocation_description(
+ request_serializer, response_deserializer):
+ """Creates an interfaces.RpcMethodInvocationDescription for an RPC method.
Args:
request_serializer: A callable that when called on a request
@@ -116,17 +119,17 @@ def unary_stream_client_rpc_method(request_serializer, response_deserializer):
that bytestring.
Returns:
- An interfaces.ClientRpcMethod constructed from the given
- arguments representing a unary-request/streaming-response
- RPC method.
+ An interfaces.RpcMethodInvocationDescription constructed from the given
+ arguments representing a unary-request/streaming-response RPC method.
"""
- return _RpcMethod(
+ return _RpcMethodDescription(
interfaces.Cardinality.UNARY_STREAM, None, None, None, None,
request_serializer, None, None, response_deserializer)
-def stream_unary_client_rpc_method(request_serializer, response_deserializer):
- """Constructs an interfaces.ClientRpcMethod for a stream-unary RPC method.
+def stream_unary_invocation_description(
+ request_serializer, response_deserializer):
+ """Creates an interfaces.RpcMethodInvocationDescription for an RPC method.
Args:
request_serializer: A callable that when called on a request
@@ -136,17 +139,17 @@ def stream_unary_client_rpc_method(request_serializer, response_deserializer):
that bytestring.
Returns:
- An interfaces.ClientRpcMethod constructed from the given
- arguments representing a streaming-request/unary-response
- RPC method.
+ An interfaces.RpcMethodInvocationDescription constructed from the given
+ arguments representing a streaming-request/unary-response RPC method.
"""
- return _RpcMethod(
+ return _RpcMethodDescription(
interfaces.Cardinality.STREAM_UNARY, None, None, None, None,
request_serializer, None, None, response_deserializer)
-def stream_stream_client_rpc_method(request_serializer, response_deserializer):
- """Constructs an interfaces.ClientRpcMethod for a stream-stream RPC method.
+def stream_stream_invocation_description(
+ request_serializer, response_deserializer):
+ """Creates an interfaces.RpcMethodInvocationDescription for an RPC method.
Args:
request_serializer: A callable that when called on a request
@@ -156,23 +159,23 @@ def stream_stream_client_rpc_method(request_serializer, response_deserializer):
that bytestring.
Returns:
- An interfaces.ClientRpcMethod constructed from the given
- arguments representing a
- streaming-request/streaming-response RPC method.
+ An interfaces.RpcMethodInvocationDescription constructed from the given
+ arguments representing a streaming-request/streaming-response RPC
+ method.
"""
- return _RpcMethod(
+ return _RpcMethodDescription(
interfaces.Cardinality.STREAM_STREAM, None, None, None, None,
request_serializer, None, None, response_deserializer)
-def unary_unary_server_rpc_method(
+def unary_unary_service_description(
behavior, request_deserializer, response_serializer):
- """Constructs an interfaces.ServerRpcMethod for the given behavior.
+ """Creates an interfaces.RpcMethodServiceDescription for the given behavior.
Args:
behavior: A callable that implements a unary-unary RPC
- method that accepts a single request and returns a single
- response.
+ method that accepts a single request and an interfaces.RpcContext and
+ returns a single response.
request_deserializer: A callable that when called on a
bytestring returns the request value corresponding to that
bytestring.
@@ -181,23 +184,23 @@ def unary_unary_server_rpc_method(
that value.
Returns:
- An interfaces.ServerRpcMethod constructed from the given
+ An interfaces.RpcMethodServiceDescription constructed from the given
arguments representing a unary-request/unary-response RPC
method.
"""
- return _RpcMethod(
+ return _RpcMethodDescription(
interfaces.Cardinality.UNARY_UNARY, behavior, None, None, None,
None, request_deserializer, response_serializer, None)
-def unary_stream_server_rpc_method(
+def unary_stream_service_description(
behavior, request_deserializer, response_serializer):
- """Constructs an interfaces.ServerRpcMethod for the given behavior.
+ """Creates an interfaces.RpcMethodServiceDescription for the given behavior.
Args:
behavior: A callable that implements a unary-stream RPC
- method that accepts a single request and returns an
- iterator of zero or more responses.
+ method that accepts a single request and an interfaces.RpcContext
+ and returns an iterator of zero or more responses.
request_deserializer: A callable that when called on a
bytestring returns the request value corresponding to that
bytestring.
@@ -206,23 +209,23 @@ def unary_stream_server_rpc_method(
that value.
Returns:
- An interfaces.ServerRpcMethod constructed from the given
+ An interfaces.RpcMethodServiceDescription constructed from the given
arguments representing a unary-request/streaming-response
RPC method.
"""
- return _RpcMethod(
+ return _RpcMethodDescription(
interfaces.Cardinality.UNARY_STREAM, None, behavior, None, None,
None, request_deserializer, response_serializer, None)
-def stream_unary_server_rpc_method(
+def stream_unary_service_description(
behavior, request_deserializer, response_serializer):
- """Constructs an interfaces.ServerRpcMethod for the given behavior.
+ """Creates an interfaces.RpcMethodServiceDescription for the given behavior.
Args:
behavior: A callable that implements a stream-unary RPC
method that accepts an iterator of zero or more requests
- and returns a single response.
+ and an interfaces.RpcContext and returns a single response.
request_deserializer: A callable that when called on a
bytestring returns the request value corresponding to that
bytestring.
@@ -231,23 +234,24 @@ def stream_unary_server_rpc_method(
that value.
Returns:
- An interfaces.ServerRpcMethod constructed from the given
+ An interfaces.RpcMethodServiceDescription constructed from the given
arguments representing a streaming-request/unary-response
RPC method.
"""
- return _RpcMethod(
+ return _RpcMethodDescription(
interfaces.Cardinality.STREAM_UNARY, None, None, behavior, None,
None, request_deserializer, response_serializer, None)
-def stream_stream_server_rpc_method(
+def stream_stream_service_description(
behavior, request_deserializer, response_serializer):
- """Constructs an interfaces.ServerRpcMethod for the given behavior.
+ """Creates an interfaces.RpcMethodServiceDescription for the given behavior.
Args:
behavior: A callable that implements a stream-stream RPC
method that accepts an iterator of zero or more requests
- and returns an iterator of zero or more responses.
+ and an interfaces.RpcContext and returns an iterator of
+ zero or more responses.
request_deserializer: A callable that when called on a
bytestring returns the request value corresponding to that
bytestring.
@@ -256,10 +260,10 @@ def stream_stream_server_rpc_method(
that value.
Returns:
- An interfaces.ServerRpcMethod constructed from the given
+ An interfaces.RpcMethodServiceDescription constructed from the given
arguments representing a
streaming-request/streaming-response RPC method.
"""
- return _RpcMethod(
+ return _RpcMethodDescription(
interfaces.Cardinality.STREAM_STREAM, None, None, None, behavior,
None, request_deserializer, response_serializer, None)