aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Nathaniel Manista <nathaniel@google.com>2015-02-20 20:42:01 +0000
committerGravatar Nathaniel Manista <nathaniel@google.com>2015-02-20 23:05:32 +0000
commit7efe54e3367ead56d46f83efece67685ad03fd05 (patch)
treee135e2a5e22c5820d72edada2622389e1c47cc9c /src
parentb681d0adba74e8f870082a15103b71038dd5beb0 (diff)
The framework.assembly package.
This provides for now what should be a nicer interface for code generation than that of the Face layer. In terms of abstraction it's conceptually very similar so the two should probably be merged as soon as is reasonable.
Diffstat (limited to 'src')
-rw-r--r--src/python/src/grpc/_adapter/_face_test_case.py3
-rw-r--r--src/python/src/grpc/_adapter/_links_test.py9
-rw-r--r--src/python/src/grpc/_adapter/fore.py50
-rw-r--r--src/python/src/grpc/_adapter/rear.py25
-rw-r--r--src/python/src/grpc/early_adopter/implementations.py3
-rw-r--r--src/python/src/grpc/framework/assembly/__init__.py30
-rw-r--r--src/python/src/grpc/framework/assembly/implementations.py305
-rw-r--r--src/python/src/grpc/framework/assembly/implementations_test.py284
-rw-r--r--src/python/src/grpc/framework/assembly/interfaces.py91
-rw-r--r--src/python/src/grpc/framework/assembly/utilities.py179
-rw-r--r--src/python/src/grpc/framework/common/style.py40
-rw-r--r--src/python/src/grpc/framework/face/utilities.py221
-rw-r--r--src/python/src/grpc/framework/foundation/activated.py65
-rw-r--r--src/python/src/setup.py3
14 files changed, 1290 insertions, 18 deletions
diff --git a/src/python/src/grpc/_adapter/_face_test_case.py b/src/python/src/grpc/_adapter/_face_test_case.py
index da73366f44..8cce322d30 100644
--- a/src/python/src/grpc/_adapter/_face_test_case.py
+++ b/src/python/src/grpc/_adapter/_face_test_case.py
@@ -81,7 +81,8 @@ class FaceTestCase(test_case.FaceTestCase, coverage.BlockingCoverage):
fore_link = fore.ForeLink(
pool, serialization.request_deserializers,
serialization.response_serializers, None, ())
- port = fore_link.start()
+ fore_link.start()
+ port = fore_link.port()
rear_link = rear.RearLink(
'localhost', port, pool,
serialization.request_serializers, serialization.response_deserializers)
diff --git a/src/python/src/grpc/_adapter/_links_test.py b/src/python/src/grpc/_adapter/_links_test.py
index ba7660bb92..6b3bcee9fa 100644
--- a/src/python/src/grpc/_adapter/_links_test.py
+++ b/src/python/src/grpc/_adapter/_links_test.py
@@ -70,7 +70,8 @@ class RoundTripTest(unittest.TestCase):
self.fore_link_pool, {test_method: None}, {test_method: None}, None, ())
fore_link.join_rear_link(test_rear_link)
test_rear_link.join_fore_link(fore_link)
- port = fore_link.start()
+ fore_link.start()
+ port = fore_link.port()
rear_link = rear.RearLink(
'localhost', port, self.rear_link_pool, {test_method: None},
@@ -123,7 +124,8 @@ class RoundTripTest(unittest.TestCase):
{test_method: _IDENTITY}, None, ())
fore_link.join_rear_link(test_rear_link)
test_rear_link.join_fore_link(fore_link)
- port = fore_link.start()
+ fore_link.start()
+ port = fore_link.port()
rear_link = rear.RearLink(
'localhost', port, self.rear_link_pool, {test_method: _IDENTITY},
@@ -185,7 +187,8 @@ class RoundTripTest(unittest.TestCase):
{test_method: scenario.serialize_response}, None, ())
fore_link.join_rear_link(test_rear_link)
test_rear_link.join_fore_link(fore_link)
- port = fore_link.start()
+ fore_link.start()
+ port = fore_link.port()
rear_link = rear.RearLink(
'localhost', port, self.rear_link_pool,
diff --git a/src/python/src/grpc/_adapter/fore.py b/src/python/src/grpc/_adapter/fore.py
index f72b2fd5a5..051fc083f1 100644
--- a/src/python/src/grpc/_adapter/fore.py
+++ b/src/python/src/grpc/_adapter/fore.py
@@ -40,6 +40,7 @@ from grpc.framework.base import interfaces
from grpc.framework.base.packets import interfaces as ticket_interfaces
from grpc.framework.base.packets import null
from grpc.framework.base.packets import packets as tickets
+from grpc.framework.foundation import activated
@enum.unique
@@ -65,7 +66,7 @@ def _status(call, rpc_state):
rpc_state.write.low = _LowWrite.CLOSED
-class ForeLink(ticket_interfaces.ForeLink):
+class ForeLink(ticket_interfaces.ForeLink, activated.Activated):
"""A service-side bridge between RPC Framework and the C-ish _low code."""
def __init__(
@@ -92,13 +93,14 @@ class ForeLink(ticket_interfaces.ForeLink):
self._response_serializers = response_serializers
self._root_certificates = root_certificates
self._key_chain_pairs = key_chain_pairs
- self._port = port
+ self._requested_port = port
self._rear_link = null.NULL_REAR_LINK
self._completion_queue = None
self._server = None
self._rpc_states = {}
self._spinning = False
+ self._port = None
def _on_stop_event(self):
self._spinning = False
@@ -264,23 +266,24 @@ class ForeLink(ticket_interfaces.ForeLink):
"""See ticket_interfaces.ForeLink.join_rear_link for specification."""
self._rear_link = null.NULL_REAR_LINK if rear_link is None else rear_link
- def start(self):
+ def _start(self):
"""Starts this ForeLink.
This method must be called before attempting to exchange tickets with this
object.
"""
with self._condition:
- address = '[::]:%d' % (0 if self._port is None else self._port)
+ address = '[::]:%d' % (
+ 0 if self._requested_port is None else self._requested_port)
self._completion_queue = _low.CompletionQueue()
if self._root_certificates is None and not self._key_chain_pairs:
self._server = _low.Server(self._completion_queue, None)
- port = self._server.add_http2_addr(address)
+ self._port = self._server.add_http2_addr(address)
else:
server_credentials = _low.ServerCredentials(
self._root_certificates, self._key_chain_pairs)
self._server = _low.Server(self._completion_queue, server_credentials)
- port = self._server.add_secure_http2_addr(address)
+ self._port = self._server.add_secure_http2_addr(address)
self._server.start()
self._server.service(None)
@@ -288,11 +291,11 @@ class ForeLink(ticket_interfaces.ForeLink):
self._pool.submit(self._spin, self._completion_queue, self._server)
self._spinning = True
- return port
+ return self
# TODO(nathaniel): Expose graceful-shutdown semantics in which this object
# enters a state in which it finishes ongoing RPCs but refuses new ones.
- def stop(self):
+ def _stop(self):
"""Stops this ForeLink.
This method must be called for proper termination of this object, and no
@@ -301,7 +304,7 @@ class ForeLink(ticket_interfaces.ForeLink):
"""
with self._condition:
self._server.stop()
- # TODO(b/18904187): Yep, this is weird. Deleting a server shouldn't have a
+ # TODO(nathaniel): Yep, this is weird. Deleting a server shouldn't have a
# behaviorally significant side-effect.
self._server = None
self._completion_queue.stop()
@@ -309,6 +312,35 @@ class ForeLink(ticket_interfaces.ForeLink):
while self._spinning:
self._condition.wait()
+ self._port = None
+
+ def __enter__(self):
+ """See activated.Activated.__enter__ for specification."""
+ return self._start()
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ """See activated.Activated.__exit__ for specification."""
+ self._stop()
+ return False
+
+ def start(self):
+ """See activated.Activated.start for specification."""
+ return self._start()
+
+ def stop(self):
+ """See activated.Activated.stop for specification."""
+ self._stop()
+
+ def port(self):
+ """Identifies the port on which this ForeLink is servicing RPCs.
+
+ Returns:
+ The number of the port on which this ForeLink is servicing RPCs, or None
+ if this ForeLink is not currently activated and servicing RPCs.
+ """
+ with self._condition:
+ return self._port
+
def accept_back_to_front_ticket(self, ticket):
"""See ticket_interfaces.ForeLink.accept_back_to_front_ticket for spec."""
with self._condition:
diff --git a/src/python/src/grpc/_adapter/rear.py b/src/python/src/grpc/_adapter/rear.py
index c47c0aa020..cbcf121d9a 100644
--- a/src/python/src/grpc/_adapter/rear.py
+++ b/src/python/src/grpc/_adapter/rear.py
@@ -39,6 +39,7 @@ from grpc._adapter import _low
from grpc.framework.base.packets import interfaces as ticket_interfaces
from grpc.framework.base.packets import null
from grpc.framework.base.packets import packets as tickets
+from grpc.framework.foundation import activated
_INVOCATION_EVENT_KINDS = (
_low.Event.Kind.METADATA_ACCEPTED,
@@ -84,7 +85,7 @@ def _write(operation_id, call, outstanding, write_state, serialized_payload):
raise ValueError('Write attempted after writes completed!')
-class RearLink(ticket_interfaces.RearLink):
+class RearLink(ticket_interfaces.RearLink, activated.Activated):
"""An invocation-side bridge between RPC Framework and the C-ish _low code."""
def __init__(
@@ -297,7 +298,7 @@ class RearLink(ticket_interfaces.RearLink):
with self._condition:
self._fore_link = null.NULL_FORE_LINK if fore_link is None else fore_link
- def start(self):
+ def _start(self):
"""Starts this RearLink.
This method must be called before attempting to exchange tickets with this
@@ -306,8 +307,9 @@ class RearLink(ticket_interfaces.RearLink):
with self._condition:
self._completion_queue = _low.CompletionQueue()
self._channel = _low.Channel('%s:%d' % (self._host, self._port))
+ return self
- def stop(self):
+ def _stop(self):
"""Stops this RearLink.
This method must be called for proper termination of this object, and no
@@ -321,6 +323,23 @@ class RearLink(ticket_interfaces.RearLink):
while self._spinning:
self._condition.wait()
+ def __enter__(self):
+ """See activated.Activated.__enter__ for specification."""
+ return self._start()
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ """See activated.Activated.__exit__ for specification."""
+ self._stop()
+ return False
+
+ def start(self):
+ """See activated.Activated.start for specification."""
+ return self._start()
+
+ def stop(self):
+ """See activated.Activated.stop for specification."""
+ self._stop()
+
def accept_front_to_back_ticket(self, ticket):
"""See ticket_interfaces.RearLink.accept_front_to_back_ticket for spec."""
with self._condition:
diff --git a/src/python/src/grpc/early_adopter/implementations.py b/src/python/src/grpc/early_adopter/implementations.py
index c549317d15..1d76d0f9e0 100644
--- a/src/python/src/grpc/early_adopter/implementations.py
+++ b/src/python/src/grpc/early_adopter/implementations.py
@@ -70,7 +70,8 @@ class _Server(interfaces.Server):
self._pool, self._breakdown.request_deserializers,
self._breakdown.response_serializers, None,
((self._private_key, self._certificate_chain),), port=self._port)
- port = self._fore_link.start()
+ self._fore_link.start()
+ port = self._fore_link.port()
self._back = _tickets_implementations.back(
servicer, self._pool, self._pool, self._pool, _MEGA_TIMEOUT,
_MEGA_TIMEOUT)
diff --git a/src/python/src/grpc/framework/assembly/__init__.py b/src/python/src/grpc/framework/assembly/__init__.py
new file mode 100644
index 0000000000..7086519106
--- /dev/null
+++ b/src/python/src/grpc/framework/assembly/__init__.py
@@ -0,0 +1,30 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
diff --git a/src/python/src/grpc/framework/assembly/implementations.py b/src/python/src/grpc/framework/assembly/implementations.py
new file mode 100644
index 0000000000..461aa9c855
--- /dev/null
+++ b/src/python/src/grpc/framework/assembly/implementations.py
@@ -0,0 +1,305 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Implementations for assembling RPC framework values."""
+
+import threading
+
+from grpc.framework.assembly import interfaces
+from grpc.framework.base import util as base_utilities
+from grpc.framework.base.packets import implementations as tickets_implementations
+from grpc.framework.base.packets import interfaces as tickets_interfaces
+from grpc.framework.common import cardinality
+from grpc.framework.common import style
+from grpc.framework.face import implementations as face_implementations
+from grpc.framework.face import interfaces as face_interfaces
+from grpc.framework.face import utilities as face_utilities
+from grpc.framework.foundation import activated
+from grpc.framework.foundation import logging_pool
+
+_ONE_DAY_IN_SECONDS = 60 * 60 * 24
+_THREAD_POOL_SIZE = 100
+
+
+class _FaceStub(object):
+
+ def __init__(self, rear_link):
+ self._rear_link = rear_link
+ self._lock = threading.Lock()
+ self._pool = None
+ self._front = None
+ self._under_stub = None
+
+ def __enter__(self):
+ with self._lock:
+ self._pool = logging_pool.pool(_THREAD_POOL_SIZE)
+ self._front = tickets_implementations.front(
+ self._pool, self._pool, self._pool)
+ self._rear_link.start()
+ self._rear_link.join_fore_link(self._front)
+ self._front.join_rear_link(self._rear_link)
+ self._under_stub = face_implementations.stub(self._front, self._pool)
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ with self._lock:
+ self._under_stub = None
+ self._rear_link.stop()
+ base_utilities.wait_for_idle(self._front)
+ self._front = None
+ self._pool.shutdown(wait=True)
+ self._pool = None
+ return False
+
+ def __getattr__(self, attr):
+ with self._lock:
+ if self._under_stub is None:
+ raise ValueError('Called out of context!')
+ else:
+ return getattr(self._under_stub, attr)
+
+
+def _behaviors(implementations, front, pool):
+ behaviors = {}
+ stub = face_implementations.stub(front, pool)
+ for name, implementation in implementations.iteritems():
+ if implementation.cardinality is cardinality.Cardinality.UNARY_UNARY:
+ behaviors[name] = stub.unary_unary_sync_async(name)
+ elif implementation.cardinality is cardinality.Cardinality.UNARY_STREAM:
+ behaviors[name] = lambda request, context, bound_name=name: (
+ stub.inline_value_in_stream_out(bound_name, request, context))
+ elif implementation.cardinality is cardinality.Cardinality.STREAM_UNARY:
+ behaviors[name] = stub.stream_unary_sync_async(name)
+ elif implementation.cardinality is cardinality.Cardinality.STREAM_STREAM:
+ behaviors[name] = lambda request_iterator, context, bound_name=name: (
+ stub.inline_stream_in_stream_out(
+ bound_name, request_iterator, context))
+ return behaviors
+
+
+class _DynamicInlineStub(object):
+
+ def __init__(self, implementations, rear_link):
+ self._implementations = implementations
+ self._rear_link = rear_link
+ self._lock = threading.Lock()
+ self._pool = None
+ self._front = None
+ self._behaviors = None
+
+ def __enter__(self):
+ with self._lock:
+ self._pool = logging_pool.pool(_THREAD_POOL_SIZE)
+ self._front = tickets_implementations.front(
+ self._pool, self._pool, self._pool)
+ self._rear_link.start()
+ self._rear_link.join_fore_link(self._front)
+ self._front.join_rear_link(self._rear_link)
+ self._behaviors = _behaviors(
+ self._implementations, self._front, self._pool)
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ with self._lock:
+ self._behaviors = None
+ self._rear_link.stop()
+ base_utilities.wait_for_idle(self._front)
+ self._front = None
+ self._pool.shutdown(wait=True)
+ self._pool = None
+ return False
+
+ def __getattr__(self, attr):
+ with self._lock:
+ behavior = self._behaviors.get(attr)
+ if behavior is None:
+ raise AttributeError(attr)
+ else:
+ return behavior
+
+
+def _servicer(implementations, pool):
+ inline_value_in_value_out_methods = {}
+ inline_value_in_stream_out_methods = {}
+ inline_stream_in_value_out_methods = {}
+ inline_stream_in_stream_out_methods = {}
+ event_value_in_value_out_methods = {}
+ event_value_in_stream_out_methods = {}
+ event_stream_in_value_out_methods = {}
+ event_stream_in_stream_out_methods = {}
+
+ for name, implementation in implementations.iteritems():
+ if implementation.cardinality is cardinality.Cardinality.UNARY_UNARY:
+ if implementation.style is style.Service.INLINE:
+ inline_value_in_value_out_methods[name] = (
+ face_utilities.inline_unary_unary_method(implementation.unary_unary_inline))
+ elif implementation.style is style.Service.EVENT:
+ event_value_in_value_out_methods[name] = (
+ face_utilities.event_unary_unary_method(implementation.unary_unary_event))
+ elif implementation.cardinality is cardinality.Cardinality.UNARY_STREAM:
+ if implementation.style is style.Service.INLINE:
+ inline_value_in_stream_out_methods[name] = (
+ face_utilities.inline_unary_stream_method(implementation.unary_stream_inline))
+ elif implementation.style is style.Service.EVENT:
+ event_value_in_stream_out_methods[name] = (
+ face_utilities.event_unary_stream_method(implementation.unary_stream_event))
+ if implementation.cardinality is cardinality.Cardinality.STREAM_UNARY:
+ if implementation.style is style.Service.INLINE:
+ inline_stream_in_value_out_methods[name] = (
+ face_utilities.inline_stream_unary_method(implementation.stream_unary_inline))
+ elif implementation.style is style.Service.EVENT:
+ event_stream_in_value_out_methods[name] = (
+ face_utilities.event_stream_unary_method(implementation.stream_unary_event))
+ elif implementation.cardinality is cardinality.Cardinality.STREAM_STREAM:
+ if implementation.style is style.Service.INLINE:
+ inline_stream_in_stream_out_methods[name] = (
+ face_utilities.inline_stream_stream_method(implementation.stream_stream_inline))
+ elif implementation.style is style.Service.EVENT:
+ event_stream_in_stream_out_methods[name] = (
+ face_utilities.event_stream_stream_method(implementation.stream_stream_event))
+
+ return face_implementations.servicer(
+ pool,
+ inline_value_in_value_out_methods=inline_value_in_value_out_methods,
+ inline_value_in_stream_out_methods=inline_value_in_stream_out_methods,
+ inline_stream_in_value_out_methods=inline_stream_in_value_out_methods,
+ inline_stream_in_stream_out_methods=inline_stream_in_stream_out_methods,
+ event_value_in_value_out_methods=event_value_in_value_out_methods,
+ event_value_in_stream_out_methods=event_value_in_stream_out_methods,
+ event_stream_in_value_out_methods=event_stream_in_value_out_methods,
+ event_stream_in_stream_out_methods=event_stream_in_stream_out_methods)
+
+
+class _ServiceAssembly(activated.Activated):
+
+ def __init__(self, implementations, fore_link):
+ self._implementations = implementations
+ self._fore_link = fore_link
+ self._lock = threading.Lock()
+ self._pool = None
+ self._back = None
+
+ def _start(self):
+ with self._lock:
+ self._pool = logging_pool.pool(_THREAD_POOL_SIZE)
+ servicer = _servicer(self._implementations, self._pool)
+ self._back = tickets_implementations.back(
+ servicer, self._pool, self._pool, self._pool, _ONE_DAY_IN_SECONDS,
+ _ONE_DAY_IN_SECONDS)
+ self._fore_link.start()
+ self._fore_link.join_rear_link(self._back)
+ self._back.join_fore_link(self._fore_link)
+
+ def _stop(self):
+ with self._lock:
+ self._fore_link.stop()
+ base_utilities.wait_for_idle(self._back)
+ self._back = None
+ self._pool.shutdown(wait=True)
+ self._pool = None
+
+ def __enter__(self):
+ self._start()
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self._stop()
+ return False
+
+ def start(self):
+ return self._start()
+
+ def stop(self):
+ self._stop()
+
+
+def assemble_face_stub(activated_rear_link):
+ """Assembles a face_interfaces.Stub.
+
+ The returned object is a context manager and may only be used in context to
+ invoke RPCs.
+
+ Args:
+ activated_rear_link: An object that is both a tickets_interfaces.RearLink
+ and an activated.Activated. The object should be in the inactive state
+ when passed to this method.
+
+ Returns:
+ A face_interfaces.Stub on which, in context, RPCs can be invoked.
+ """
+ return _FaceStub(activated_rear_link)
+
+
+def assemble_dynamic_inline_stub(implementations, activated_rear_link):
+ """Assembles a stub with method names for attributes.
+
+ The returned object is a context manager and may only be used in context to
+ invoke RPCs.
+
+ The returned object, when used in context, will respond to attribute access
+ as follows: if the requested attribute is the name of a unary-unary RPC
+ method, the value of the attribute will be a
+ face_interfaces.UnaryUnarySyncAsync with which to invoke the RPC method. If
+ the requested attribute is the name of a unary-stream RPC method, the value
+ of the attribute will be a callable with the semantics of
+ face_interfaces.Stub.inline_value_in_stream_out, minus the "name" parameter,
+ with which to invoke the RPC method. If the requested attribute is the name
+ of a stream-unary RPC method, the value of the attribute will be a
+ face_interfaces.StreamUnarySyncAsync with which to invoke the RPC method. If
+ the requested attribute is the name of a stream-stream RPC method, the value
+ of the attribute will be a callable with the semantics of
+ face_interfaces.Stub.inline_stream_in_stream_out, minus the "name" parameter,
+ with which to invoke the RPC method.
+
+ Args:
+ implementations: A dictionary from RPC method name to
+ interfaces.MethodImplementation.
+ activated_rear_link: An object that is both a tickets_interfaces.RearLink
+ and an activated.Activated. The object should be in the inactive state
+ when passed to this method.
+
+ Returns:
+ A stub on which, in context, RPCs can be invoked.
+ """
+ return _DynamicInlineStub(implementations, activated_rear_link)
+
+
+def assemble_service(implementations, activated_fore_link):
+ """Assembles the service-side of the RPC Framework stack.
+
+ Args:
+ implementations: A dictionary from RPC method name to
+ interfaces.MethodImplementation.
+ activated_fore_link: An object that is both a tickets_interfaces.ForeLink
+ and an activated.Activated. The object should be in the inactive state
+ when passed to this method.
+
+ Returns:
+ An activated.Activated value encapsulating RPC service.
+ """
+ return _ServiceAssembly(implementations, activated_fore_link)
diff --git a/src/python/src/grpc/framework/assembly/implementations_test.py b/src/python/src/grpc/framework/assembly/implementations_test.py
new file mode 100644
index 0000000000..74dc02ed83
--- /dev/null
+++ b/src/python/src/grpc/framework/assembly/implementations_test.py
@@ -0,0 +1,284 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+# TODO(nathaniel): Expand this test coverage.
+
+"""Test of the GRPC-backed ForeLink and RearLink."""
+
+import threading
+import unittest
+
+from grpc.framework.assembly import implementations
+from grpc.framework.assembly import utilities
+from grpc.framework.base import interfaces
+from grpc.framework.base.packets import packets as tickets
+from grpc.framework.base.packets import interfaces as tickets_interfaces
+from grpc.framework.base.packets import null
+from grpc.framework.foundation import logging_pool
+from grpc._junkdrawer import math_pb2
+
+DIV = 'Div'
+DIV_MANY = 'DivMany'
+FIB = 'Fib'
+SUM = 'Sum'
+
+def _fibbonacci(limit):
+ left, right = 0, 1
+ for _ in xrange(limit):
+ yield left
+ left, right = right, left + right
+
+
+def _div(request, unused_context):
+ return math_pb2.DivReply(
+ quotient=request.dividend / request.divisor,
+ remainder=request.dividend % request.divisor)
+
+
+def _div_many(request_iterator, unused_context):
+ for request in request_iterator:
+ yield math_pb2.DivReply(
+ quotient=request.dividend / request.divisor,
+ remainder=request.dividend % request.divisor)
+
+
+def _fib(request, unused_context):
+ for number in _fibbonacci(request.limit):
+ yield math_pb2.Num(num=number)
+
+
+def _sum(request_iterator, unused_context):
+ accumulation = 0
+ for request in request_iterator:
+ accumulation += request.num
+ return math_pb2.Num(num=accumulation)
+
+
+_IMPLEMENTATIONS = {
+ DIV: utilities.unary_unary_inline(_div),
+ DIV_MANY: utilities.stream_stream_inline(_div_many),
+ FIB: utilities.unary_stream_inline(_fib),
+ SUM: utilities.stream_unary_inline(_sum),
+}
+
+_TIMEOUT = 10
+
+
+class PipeLink(tickets_interfaces.ForeLink, tickets_interfaces.RearLink):
+
+ def __init__(self):
+ self._fore_lock = threading.Lock()
+ self._fore_link = null.NULL_FORE_LINK
+ self._rear_lock = threading.Lock()
+ self._rear_link = null.NULL_REAR_LINK
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ return False
+
+ def start(self):
+ pass
+
+ def stop(self):
+ pass
+
+ def accept_back_to_front_ticket(self, ticket):
+ with self._fore_lock:
+ self._fore_link.accept_back_to_front_ticket(ticket)
+
+ def join_rear_link(self, rear_link):
+ with self._rear_lock:
+ self._rear_link = null.NULL_REAR_LINK if rear_link is None else rear_link
+
+ def accept_front_to_back_ticket(self, ticket):
+ with self._rear_lock:
+ self._rear_link.accept_front_to_back_ticket(ticket)
+
+ def join_fore_link(self, fore_link):
+ with self._fore_lock:
+ self._fore_link = null.NULL_FORE_LINK if fore_link is None else fore_link
+
+
+class FaceStubTest(unittest.TestCase):
+
+ def testUnaryUnary(self):
+ divisor = 7
+ dividend = 13
+ expected_quotient = 1
+ expected_remainder = 6
+ pipe = PipeLink()
+ service = implementations.assemble_service(_IMPLEMENTATIONS, pipe)
+ face_stub = implementations.assemble_face_stub(pipe)
+
+ service.start()
+ try:
+ with face_stub:
+ response = face_stub.blocking_value_in_value_out(
+ DIV, math_pb2.DivArgs(divisor=divisor, dividend=dividend),
+ _TIMEOUT)
+ self.assertEqual(expected_quotient, response.quotient)
+ self.assertEqual(expected_remainder, response.remainder)
+ finally:
+ service.stop()
+
+ def testUnaryStream(self):
+ stream_length = 29
+ pipe = PipeLink()
+ service = implementations.assemble_service(_IMPLEMENTATIONS, pipe)
+ face_stub = implementations.assemble_face_stub(pipe)
+
+ with service, face_stub:
+ responses = list(
+ face_stub.inline_value_in_stream_out(
+ FIB, math_pb2.FibArgs(limit=stream_length), _TIMEOUT))
+ numbers = [response.num for response in responses]
+ for early, middle, later in zip(numbers, numbers[1:], numbers[2:]):
+ self.assertEqual(early + middle, later)
+
+ def testStreamUnary(self):
+ stream_length = 13
+ pipe = PipeLink()
+ service = implementations.assemble_service(_IMPLEMENTATIONS, pipe)
+ face_stub = implementations.assemble_face_stub(pipe)
+
+ with service, face_stub:
+ sync_async = face_stub.stream_unary_sync_async(SUM)
+ response_future = sync_async.async(
+ (math_pb2.Num(num=index) for index in range(stream_length)),
+ _TIMEOUT)
+ self.assertEqual(
+ (stream_length * (stream_length - 1)) / 2,
+ response_future.result().num)
+
+ def testStreamStream(self):
+ stream_length = 17
+ divisor_offset = 7
+ dividend_offset = 17
+ pipe = PipeLink()
+ service = implementations.assemble_service(_IMPLEMENTATIONS, pipe)
+ face_stub = implementations.assemble_face_stub(pipe)
+
+ with service, face_stub:
+ response_iterator = face_stub.inline_stream_in_stream_out(
+ DIV_MANY,
+ (math_pb2.DivArgs(
+ divisor=divisor_offset + index,
+ dividend=dividend_offset + index)
+ for index in range(stream_length)),
+ _TIMEOUT)
+ for index, response in enumerate(response_iterator):
+ self.assertEqual(
+ (dividend_offset + index) / (divisor_offset + index),
+ response.quotient)
+ self.assertEqual(
+ (dividend_offset + index) % (divisor_offset + index),
+ response.remainder)
+ self.assertEqual(stream_length, index + 1)
+
+
+class DynamicInlineStubTest(unittest.TestCase):
+
+ def testUnaryUnary(self):
+ divisor = 59
+ dividend = 973
+ expected_quotient = dividend / divisor
+ expected_remainder = dividend % divisor
+ pipe = PipeLink()
+ service = implementations.assemble_service(_IMPLEMENTATIONS, pipe)
+ dynamic_stub = implementations.assemble_dynamic_inline_stub(
+ _IMPLEMENTATIONS, pipe)
+
+ service.start()
+ with dynamic_stub:
+ response = dynamic_stub.Div(
+ math_pb2.DivArgs(divisor=divisor, dividend=dividend), _TIMEOUT)
+ self.assertEqual(expected_quotient, response.quotient)
+ self.assertEqual(expected_remainder, response.remainder)
+ service.stop()
+
+ def testUnaryStream(self):
+ stream_length = 43
+ pipe = PipeLink()
+ service = implementations.assemble_service(_IMPLEMENTATIONS, pipe)
+ dynamic_stub = implementations.assemble_dynamic_inline_stub(
+ _IMPLEMENTATIONS, pipe)
+
+ with service, dynamic_stub:
+ response_iterator = dynamic_stub.Fib(
+ math_pb2.FibArgs(limit=stream_length), _TIMEOUT)
+ numbers = tuple(response.num for response in response_iterator)
+ for early, middle, later in zip(numbers, numbers[:1], numbers[:2]):
+ self.assertEqual(early + middle, later)
+ self.assertEqual(stream_length, len(numbers))
+
+ def testStreamUnary(self):
+ stream_length = 127
+ pipe = PipeLink()
+ service = implementations.assemble_service(_IMPLEMENTATIONS, pipe)
+ dynamic_stub = implementations.assemble_dynamic_inline_stub(
+ _IMPLEMENTATIONS, pipe)
+
+ with service, dynamic_stub:
+ response_future = dynamic_stub.Sum.async(
+ (math_pb2.Num(num=index) for index in range(stream_length)),
+ _TIMEOUT)
+ self.assertEqual(
+ (stream_length * (stream_length - 1)) / 2,
+ response_future.result().num)
+
+ def testStreamStream(self):
+ stream_length = 179
+ divisor_offset = 71
+ dividend_offset = 1763
+ pipe = PipeLink()
+ service = implementations.assemble_service(_IMPLEMENTATIONS, pipe)
+ dynamic_stub = implementations.assemble_dynamic_inline_stub(
+ _IMPLEMENTATIONS, pipe)
+
+ with service, dynamic_stub:
+ response_iterator = dynamic_stub.DivMany(
+ (math_pb2.DivArgs(
+ divisor=divisor_offset + index,
+ dividend=dividend_offset + index)
+ for index in range(stream_length)),
+ _TIMEOUT)
+ for index, response in enumerate(response_iterator):
+ self.assertEqual(
+ (dividend_offset + index) / (divisor_offset + index),
+ response.quotient)
+ self.assertEqual(
+ (dividend_offset + index) % (divisor_offset + index),
+ response.remainder)
+ self.assertEqual(stream_length, index + 1)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/src/python/src/grpc/framework/assembly/interfaces.py b/src/python/src/grpc/framework/assembly/interfaces.py
new file mode 100644
index 0000000000..e5d750b2bc
--- /dev/null
+++ b/src/python/src/grpc/framework/assembly/interfaces.py
@@ -0,0 +1,91 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+# TODO(nathaniel): The assembly layer only exists to smooth out wrinkles in
+# the face layer. The two should be squashed together as soon as manageable.
+"""Interfaces for assembling RPC Framework values."""
+
+import abc
+
+# cardinality, style, and stream are referenced from specification in this
+# module.
+from grpc.framework.common import cardinality # pylint: disable=unused-import
+from grpc.framework.common import style # pylint: disable=unused-import
+from grpc.framework.foundation import stream # pylint: disable=unused-import
+
+
+class MethodImplementation(object):
+ """A sum type that describes an RPC method implementation.
+
+ Attributes:
+ cardinality: A cardinality.Cardinality value.
+ style: A style.Service value.
+ unary_unary_inline: The implementation of the RPC method as a callable
+ value that takes a request value and a face_interfaces.RpcContext object
+ and returns a response value. Only non-None if cardinality is
+ cardinality.Cardinality.UNARY_UNARY and style is style.Service.INLINE.
+ unary_stream_inline: The implementation of the RPC method as a callable
+ value that takes a request value and a face_interfaces.RpcContext object
+ and returns an iterator of response values. Only non-None if cardinality
+ is cardinality.Cardinality.UNARY_STREAM and style is
+ style.Service.INLINE.
+ stream_unary_inline: The implementation of the RPC method as a callable
+ value that takes an iterator of request values and a
+ face_interfaces.RpcContext object and returns a response value. Only
+ non-None if cardinality is cardinality.Cardinality.STREAM_UNARY and style
+ is style.Service.INLINE.
+ stream_stream_inline: The implementation of the RPC method as a callable
+ value that takes an iterator of request values and a
+ face_interfaces.RpcContext object and returns an iterator of response
+ values. Only non-None if cardinality is
+ cardinality.Cardinality.STREAM_STREAM and style is style.Service.INLINE.
+ unary_unary_event: The implementation of the RPC method as a callable value
+ that takes a request value, a response callback to which to pass the
+ response value of the RPC, and a face_interfaces.RpcContext. Only
+ non-None if cardinality is cardinality.Cardinality.UNARY_UNARY and style
+ is style.Service.EVENT.
+ unary_stream_event: The implementation of the RPC method as a callable
+ value that takes a request value, a stream.Consumer to which to pass the
+ the response values of the RPC, and a face_interfaces.RpcContext. Only
+ non-None if cardinality is cardinality.Cardinality.UNARY_STREAM and style
+ is style.Service.EVENT.
+ stream_unary_event: The implementation of the RPC method as a callable
+ value that takes a response callback to which to pass the response value
+ of the RPC and a face_interfaces.RpcContext and returns a stream.Consumer
+ to which the request values of the RPC should be passed. Only non-None if
+ cardinality is cardinality.Cardinality.STREAM_UNARY and style is
+ style.Service.EVENT.
+ stream_stream_event: The implementation of the RPC method as a callable
+ value that takes a stream.Consumer to which to pass the response values
+ of the RPC and a face_interfaces.RpcContext and returns a stream.Consumer
+ to which the request values of the RPC should be passed. Only non-None if
+ cardinality is cardinality.Cardinality.STREAM_STREAM and style is
+ style.Service.EVENT.
+ """
+ __metaclass__ = abc.ABCMeta
diff --git a/src/python/src/grpc/framework/assembly/utilities.py b/src/python/src/grpc/framework/assembly/utilities.py
new file mode 100644
index 0000000000..80e7f59c03
--- /dev/null
+++ b/src/python/src/grpc/framework/assembly/utilities.py
@@ -0,0 +1,179 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Utilities for assembling RPC framework values."""
+
+import collections
+
+from grpc.framework.assembly import interfaces
+from grpc.framework.common import cardinality
+from grpc.framework.common import style
+from grpc.framework.face import interfaces as face_interfaces
+from grpc.framework.foundation import stream
+
+
+class _MethodImplementation(
+ interfaces.MethodImplementation,
+ collections.namedtuple(
+ '_MethodImplementation',
+ ['cardinality', 'style', 'unary_unary_inline', 'unary_stream_inline',
+ 'stream_unary_inline', 'stream_stream_inline', 'unary_unary_event',
+ 'unary_stream_event', 'stream_unary_event', 'stream_stream_event',])):
+ pass
+
+
+def unary_unary_inline(behavior):
+ """Creates an interfaces.MethodImplementation for the given behavior.
+
+ Args:
+ behavior: The implementation of a unary-unary RPC method as a callable value
+ that takes a request value and a face_interfaces.RpcContext object and
+ returns a response value.
+
+ Returns:
+ An interfaces.MethodImplementation derived from the given behavior.
+ """
+ return _MethodImplementation(
+ cardinality.Cardinality.UNARY_UNARY, style.Service.INLINE, behavior,
+ None, None, None, None, None, None, None)
+
+
+def unary_stream_inline(behavior):
+ """Creates an interfaces.MethodImplementation for the given behavior.
+
+ Args:
+ behavior: The implementation of a unary-stream RPC method as a callable
+ value that takes a request value and a face_interfaces.RpcContext object
+ and returns an iterator of response values.
+
+ Returns:
+ An interfaces.MethodImplementation derived from the given behavior.
+ """
+ return _MethodImplementation(
+ cardinality.Cardinality.UNARY_STREAM, style.Service.INLINE, None,
+ behavior, None, None, None, None, None, None)
+
+
+def stream_unary_inline(behavior):
+ """Creates an interfaces.MethodImplementation for the given behavior.
+
+ Args:
+ behavior: The implementation of a stream-unary RPC method as a callable
+ value that takes an iterator of request values and a
+ face_interfaces.RpcContext object and returns a response value.
+
+ Returns:
+ An interfaces.MethodImplementation derived from the given behavior.
+ """
+ return _MethodImplementation(
+ cardinality.Cardinality.STREAM_UNARY, style.Service.INLINE, None, None,
+ behavior, None, None, None, None, None)
+
+
+def stream_stream_inline(behavior):
+ """Creates an interfaces.MethodImplementation for the given behavior.
+
+ Args:
+ behavior: The implementation of a stream-stream RPC method as a callable
+ value that takes an iterator of request values and a
+ face_interfaces.RpcContext object and returns an iterator of response
+ values.
+
+ Returns:
+ An interfaces.MethodImplementation derived from the given behavior.
+ """
+ return _MethodImplementation(
+ cardinality.Cardinality.STREAM_STREAM, style.Service.INLINE, None, None,
+ None, behavior, None, None, None, None)
+
+
+def unary_unary_event(behavior):
+ """Creates an interfaces.MethodImplementation for the given behavior.
+
+ Args:
+ behavior: The implementation of a unary-unary RPC method as a callable
+ value that takes a request value, a response callback to which to pass
+ the response value of the RPC, and a face_interfaces.RpcContext.
+
+ Returns:
+ An interfaces.MethodImplementation derived from the given behavior.
+ """
+ return _MethodImplementation(
+ cardinality.Cardinality.UNARY_UNARY, style.Service.EVENT, None, None,
+ None, None, behavior, None, None, None)
+
+
+def unary_stream_event(behavior):
+ """Creates an interfaces.MethodImplementation for the given behavior.
+
+ Args:
+ behavior: The implementation of a unary-stream RPC method as a callable
+ value that takes a request value, a stream.Consumer to which to pass the
+ the response values of the RPC, and a face_interfaces.RpcContext.
+
+ Returns:
+ An interfaces.MethodImplementation derived from the given behavior.
+ """
+ return _MethodImplementation(
+ cardinality.Cardinality.UNARY_STREAM, style.Service.EVENT, None, None,
+ None, None, None, behavior, None, None)
+
+
+def stream_unary_event(behavior):
+ """Creates an interfaces.MethodImplementation for the given behavior.
+
+ Args:
+ behavior: The implementation of a stream-unary RPC method as a callable
+ value that takes a response callback to which to pass the response value
+ of the RPC and a face_interfaces.RpcContext and returns a stream.Consumer
+ to which the request values of the RPC should be passed.
+
+ Returns:
+ An interfaces.MethodImplementation derived from the given behavior.
+ """
+ return _MethodImplementation(
+ cardinality.Cardinality.STREAM_UNARY, style.Service.EVENT, None, None,
+ None, None, None, None, behavior, None)
+
+
+def stream_stream_event(behavior):
+ """Creates an interfaces.MethodImplementation for the given behavior.
+
+ Args:
+ behavior: The implementation of a stream-stream RPC method as a callable
+ value that takes a stream.Consumer to which to pass the response values
+ of the RPC and a face_interfaces.RpcContext and returns a stream.Consumer
+ to which the request values of the RPC should be passed.
+
+ Returns:
+ An interfaces.MethodImplementation derived from the given behavior.
+ """
+ return _MethodImplementation(
+ cardinality.Cardinality.STREAM_STREAM, style.Service.EVENT, None, None,
+ None, None, None, None, None, behavior)
diff --git a/src/python/src/grpc/framework/common/style.py b/src/python/src/grpc/framework/common/style.py
new file mode 100644
index 0000000000..6ae694bdcb
--- /dev/null
+++ b/src/python/src/grpc/framework/common/style.py
@@ -0,0 +1,40 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Defines an enum for classifying RPC methods by control flow semantics."""
+
+import enum
+
+
+@enum.unique
+class Service(enum.Enum):
+ """Describes the control flow style of RPC method implementation."""
+
+ INLINE = 'inline'
+ EVENT = 'event'
diff --git a/src/python/src/grpc/framework/face/utilities.py b/src/python/src/grpc/framework/face/utilities.py
new file mode 100644
index 0000000000..5e34be37da
--- /dev/null
+++ b/src/python/src/grpc/framework/face/utilities.py
@@ -0,0 +1,221 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Utilities for the face layer of RPC Framework."""
+
+# stream is referenced from specification in this module.
+from grpc.framework.face import interfaces
+from grpc.framework.foundation import stream # pylint: disable=unused-import
+
+
+class _InlineUnaryUnaryMethod(interfaces.InlineValueInValueOutMethod):
+
+ def __init__(self, behavior):
+ self._behavior = behavior
+
+ def service(self, request, context):
+ return self._behavior(request, context)
+
+
+class _InlineUnaryStreamMethod(interfaces.InlineValueInStreamOutMethod):
+
+ def __init__(self, behavior):
+ self._behavior = behavior
+
+ def service(self, request, context):
+ return self._behavior(request, context)
+
+
+class _InlineStreamUnaryMethod(interfaces.InlineStreamInValueOutMethod):
+
+ def __init__(self, behavior):
+ self._behavior = behavior
+
+ def service(self, request_iterator, context):
+ return self._behavior(request_iterator, context)
+
+
+class _InlineStreamStreamMethod(interfaces.InlineStreamInStreamOutMethod):
+
+ def __init__(self, behavior):
+ self._behavior = behavior
+
+ def service(self, request_iterator, context):
+ return self._behavior(request_iterator, context)
+
+
+class _EventUnaryUnaryMethod(interfaces.EventValueInValueOutMethod):
+
+ def __init__(self, behavior):
+ self._behavior = behavior
+
+ def service(self, request, response_callback, context):
+ return self._behavior(request, response_callback, context)
+
+
+class _EventUnaryStreamMethod(interfaces.EventValueInStreamOutMethod):
+
+ def __init__(self, behavior):
+ self._behavior = behavior
+
+ def service(self, request, response_consumer, context):
+ return self._behavior(request, response_consumer, context)
+
+
+class _EventStreamUnaryMethod(interfaces.EventStreamInValueOutMethod):
+
+ def __init__(self, behavior):
+ self._behavior = behavior
+
+ def service(self, response_callback, context):
+ return self._behavior(response_callback, context)
+
+
+class _EventStreamStreamMethod(interfaces.EventStreamInStreamOutMethod):
+
+ def __init__(self, behavior):
+ self._behavior = behavior
+
+ def service(self, response_consumer, context):
+ return self._behavior(response_consumer, context)
+
+
+def inline_unary_unary_method(behavior):
+ """Creates an interfaces.InlineValueInValueOutMethod from a behavior.
+
+ Args:
+ behavior: The implementation of a unary-unary RPC method as a callable
+ value that takes a request value and an interfaces.RpcContext object and
+ returns a response value.
+
+ Returns:
+ An interfaces.InlineValueInValueOutMethod derived from the given behavior.
+ """
+ return _InlineUnaryUnaryMethod(behavior)
+
+
+def inline_unary_stream_method(behavior):
+ """Creates an interfaces.InlineValueInStreamOutMethod from a behavior.
+
+ Args:
+ behavior: The implementation of a unary-stream RPC method as a callable
+ value that takes a request value and an interfaces.RpcContext object and
+ returns an iterator of response values.
+
+ Returns:
+ An interfaces.InlineValueInStreamOutMethod derived from the given behavior.
+ """
+ return _InlineUnaryStreamMethod(behavior)
+
+
+def inline_stream_unary_method(behavior):
+ """Creates an interfaces.InlineStreamInValueOutMethod from a behavior.
+
+ Args:
+ behavior: The implementation of a stream-unary RPC method as a callable
+ value that takes an iterator of request values and an
+ interfaces.RpcContext object and returns a response value.
+
+ Returns:
+ An interfaces.InlineStreamInValueOutMethod derived from the given behavior.
+ """
+ return _InlineStreamUnaryMethod(behavior)
+
+
+def inline_stream_stream_method(behavior):
+ """Creates an interfaces.InlineStreamInStreamOutMethod from a behavior.
+
+ Args:
+ behavior: The implementation of a stream-stream RPC method as a callable
+ value that takes an iterator of request values and an
+ interfaces.RpcContext object and returns an iterator of response values.
+
+ Returns:
+ An interfaces.InlineStreamInStreamOutMethod derived from the given
+ behavior.
+ """
+ return _InlineStreamStreamMethod(behavior)
+
+
+def event_unary_unary_method(behavior):
+ """Creates an interfaces.EventValueInValueOutMethod from a behavior.
+
+ Args:
+ behavior: The implementation of a unary-unary RPC method as a callable
+ value that takes a request value, a response callback to which to pass
+ the response value of the RPC, and an interfaces.RpcContext.
+
+ Returns:
+ An interfaces.EventValueInValueOutMethod derived from the given behavior.
+ """
+ return _EventUnaryUnaryMethod(behavior)
+
+
+def event_unary_stream_method(behavior):
+ """Creates an interfaces.EventValueInStreamOutMethod from a behavior.
+
+ Args:
+ behavior: The implementation of a unary-stream RPC method as a callable
+ value that takes a request value, a stream.Consumer to which to pass the
+ response values of the RPC, and an interfaces.RpcContext.
+
+ Returns:
+ An interfaces.EventValueInStreamOutMethod derived from the given behavior.
+ """
+ return _EventUnaryStreamMethod(behavior)
+
+
+def event_stream_unary_method(behavior):
+ """Creates an interfaces.EventStreamInValueOutMethod from a behavior.
+
+ Args:
+ behavior: The implementation of a stream-unary RPC method as a callable
+ value that takes a response callback to which to pass the response value
+ of the RPC and an interfaces.RpcContext and returns a stream.Consumer to
+ which the request values of the RPC should be passed.
+
+ Returns:
+ An interfaces.EventStreamInValueOutMethod derived from the given behavior.
+ """
+ return _EventStreamUnaryMethod(behavior)
+
+
+def event_stream_stream_method(behavior):
+ """Creates an interfaces.EventStreamInStreamOutMethod from a behavior.
+
+ Args:
+ behavior: The implementation of a stream-stream RPC method as a callable
+ value that takes a stream.Consumer to which to pass the response values
+ of the RPC and an interfaces.RpcContext and returns a stream.Consumer to
+ which the request values of the RPC should be passed.
+
+ Returns:
+ An interfaces.EventStreamInStreamOutMethod derived from the given behavior.
+ """
+ return _EventStreamStreamMethod(behavior)
diff --git a/src/python/src/grpc/framework/foundation/activated.py b/src/python/src/grpc/framework/foundation/activated.py
new file mode 100644
index 0000000000..426a71c705
--- /dev/null
+++ b/src/python/src/grpc/framework/foundation/activated.py
@@ -0,0 +1,65 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Interfaces related to streams of values or objects."""
+
+import abc
+
+
+class Activated(object):
+ """Interface for objects that may be started and stopped.
+
+ Values implementing this type must also implement the context manager
+ protocol.
+ """
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def __enter__(self):
+ """See the context manager protocol for specification."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ """See the context manager protocol for specification."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def start(self):
+ """Activates this object.
+
+ Returns:
+ A value equal to the value returned by this object's __enter__ method.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def stop(self):
+ """Deactivates this object."""
+ raise NotImplementedError()
diff --git a/src/python/src/setup.py b/src/python/src/setup.py
index 8e33ebb31c..e3f13fa5c8 100644
--- a/src/python/src/setup.py
+++ b/src/python/src/setup.py
@@ -56,12 +56,13 @@ _EXTENSION_MODULE = _core.Extension(
libraries=_EXTENSION_LIBRARIES,
)
-_PACKAGES=(
+_PACKAGES = (
'grpc',
'grpc._adapter',
'grpc._junkdrawer',
'grpc.early_adopter',
'grpc.framework',
+ 'grpc.framework.assembly',
'grpc.framework.base',
'grpc.framework.base.packets',
'grpc.framework.common',