aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Masood Malekghassemi <soltanmm@users.noreply.github.com>2015-02-20 15:54:46 -0800
committerGravatar Masood Malekghassemi <soltanmm@users.noreply.github.com>2015-02-20 15:54:46 -0800
commit04608c41345bf9b75f0153ff3ee5cd1b0e106ffc (patch)
treeec2382f569acc07e4692414fe6714a6661d867f1 /src
parent98cb4767cdf5d9b7e83ccd039ef3c6206d87d1ca (diff)
parent7efe54e3367ead56d46f83efece67685ad03fd05 (diff)
Merge pull request #668 from nathanielmanistaatgoogle/assembly
The framework.assembly package.
Diffstat (limited to 'src')
-rw-r--r--src/python/src/grpc/_adapter/_face_test_case.py3
-rw-r--r--src/python/src/grpc/_adapter/_links_test.py9
-rw-r--r--src/python/src/grpc/_adapter/fore.py50
-rw-r--r--src/python/src/grpc/_adapter/rear.py25
-rw-r--r--src/python/src/grpc/early_adopter/implementations.py3
-rw-r--r--src/python/src/grpc/framework/assembly/__init__.py30
-rw-r--r--src/python/src/grpc/framework/assembly/implementations.py305
-rw-r--r--src/python/src/grpc/framework/assembly/implementations_test.py284
-rw-r--r--src/python/src/grpc/framework/assembly/interfaces.py91
-rw-r--r--src/python/src/grpc/framework/assembly/utilities.py179
-rw-r--r--src/python/src/grpc/framework/common/style.py40
-rw-r--r--src/python/src/grpc/framework/face/utilities.py221
-rw-r--r--src/python/src/grpc/framework/foundation/activated.py65
-rw-r--r--src/python/src/setup.py3
14 files changed, 1290 insertions, 18 deletions
diff --git a/src/python/src/grpc/_adapter/_face_test_case.py b/src/python/src/grpc/_adapter/_face_test_case.py
index da73366f44..8cce322d30 100644
--- a/src/python/src/grpc/_adapter/_face_test_case.py
+++ b/src/python/src/grpc/_adapter/_face_test_case.py
@@ -81,7 +81,8 @@ class FaceTestCase(test_case.FaceTestCase, coverage.BlockingCoverage):
fore_link = fore.ForeLink(
pool, serialization.request_deserializers,
serialization.response_serializers, None, ())
- port = fore_link.start()
+ fore_link.start()
+ port = fore_link.port()
rear_link = rear.RearLink(
'localhost', port, pool,
serialization.request_serializers, serialization.response_deserializers)
diff --git a/src/python/src/grpc/_adapter/_links_test.py b/src/python/src/grpc/_adapter/_links_test.py
index ba7660bb92..6b3bcee9fa 100644
--- a/src/python/src/grpc/_adapter/_links_test.py
+++ b/src/python/src/grpc/_adapter/_links_test.py
@@ -70,7 +70,8 @@ class RoundTripTest(unittest.TestCase):
self.fore_link_pool, {test_method: None}, {test_method: None}, None, ())
fore_link.join_rear_link(test_rear_link)
test_rear_link.join_fore_link(fore_link)
- port = fore_link.start()
+ fore_link.start()
+ port = fore_link.port()
rear_link = rear.RearLink(
'localhost', port, self.rear_link_pool, {test_method: None},
@@ -123,7 +124,8 @@ class RoundTripTest(unittest.TestCase):
{test_method: _IDENTITY}, None, ())
fore_link.join_rear_link(test_rear_link)
test_rear_link.join_fore_link(fore_link)
- port = fore_link.start()
+ fore_link.start()
+ port = fore_link.port()
rear_link = rear.RearLink(
'localhost', port, self.rear_link_pool, {test_method: _IDENTITY},
@@ -185,7 +187,8 @@ class RoundTripTest(unittest.TestCase):
{test_method: scenario.serialize_response}, None, ())
fore_link.join_rear_link(test_rear_link)
test_rear_link.join_fore_link(fore_link)
- port = fore_link.start()
+ fore_link.start()
+ port = fore_link.port()
rear_link = rear.RearLink(
'localhost', port, self.rear_link_pool,
diff --git a/src/python/src/grpc/_adapter/fore.py b/src/python/src/grpc/_adapter/fore.py
index f72b2fd5a5..051fc083f1 100644
--- a/src/python/src/grpc/_adapter/fore.py
+++ b/src/python/src/grpc/_adapter/fore.py
@@ -40,6 +40,7 @@ from grpc.framework.base import interfaces
from grpc.framework.base.packets import interfaces as ticket_interfaces
from grpc.framework.base.packets import null
from grpc.framework.base.packets import packets as tickets
+from grpc.framework.foundation import activated
@enum.unique
@@ -65,7 +66,7 @@ def _status(call, rpc_state):
rpc_state.write.low = _LowWrite.CLOSED
-class ForeLink(ticket_interfaces.ForeLink):
+class ForeLink(ticket_interfaces.ForeLink, activated.Activated):
"""A service-side bridge between RPC Framework and the C-ish _low code."""
def __init__(
@@ -92,13 +93,14 @@ class ForeLink(ticket_interfaces.ForeLink):
self._response_serializers = response_serializers
self._root_certificates = root_certificates
self._key_chain_pairs = key_chain_pairs
- self._port = port
+ self._requested_port = port
self._rear_link = null.NULL_REAR_LINK
self._completion_queue = None
self._server = None
self._rpc_states = {}
self._spinning = False
+ self._port = None
def _on_stop_event(self):
self._spinning = False
@@ -264,23 +266,24 @@ class ForeLink(ticket_interfaces.ForeLink):
"""See ticket_interfaces.ForeLink.join_rear_link for specification."""
self._rear_link = null.NULL_REAR_LINK if rear_link is None else rear_link
- def start(self):
+ def _start(self):
"""Starts this ForeLink.
This method must be called before attempting to exchange tickets with this
object.
"""
with self._condition:
- address = '[::]:%d' % (0 if self._port is None else self._port)
+ address = '[::]:%d' % (
+ 0 if self._requested_port is None else self._requested_port)
self._completion_queue = _low.CompletionQueue()
if self._root_certificates is None and not self._key_chain_pairs:
self._server = _low.Server(self._completion_queue, None)
- port = self._server.add_http2_addr(address)
+ self._port = self._server.add_http2_addr(address)
else:
server_credentials = _low.ServerCredentials(
self._root_certificates, self._key_chain_pairs)
self._server = _low.Server(self._completion_queue, server_credentials)
- port = self._server.add_secure_http2_addr(address)
+ self._port = self._server.add_secure_http2_addr(address)
self._server.start()
self._server.service(None)
@@ -288,11 +291,11 @@ class ForeLink(ticket_interfaces.ForeLink):
self._pool.submit(self._spin, self._completion_queue, self._server)
self._spinning = True
- return port
+ return self
# TODO(nathaniel): Expose graceful-shutdown semantics in which this object
# enters a state in which it finishes ongoing RPCs but refuses new ones.
- def stop(self):
+ def _stop(self):
"""Stops this ForeLink.
This method must be called for proper termination of this object, and no
@@ -301,7 +304,7 @@ class ForeLink(ticket_interfaces.ForeLink):
"""
with self._condition:
self._server.stop()
- # TODO(b/18904187): Yep, this is weird. Deleting a server shouldn't have a
+ # TODO(nathaniel): Yep, this is weird. Deleting a server shouldn't have a
# behaviorally significant side-effect.
self._server = None
self._completion_queue.stop()
@@ -309,6 +312,35 @@ class ForeLink(ticket_interfaces.ForeLink):
while self._spinning:
self._condition.wait()
+ self._port = None
+
+ def __enter__(self):
+ """See activated.Activated.__enter__ for specification."""
+ return self._start()
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ """See activated.Activated.__exit__ for specification."""
+ self._stop()
+ return False
+
+ def start(self):
+ """See activated.Activated.start for specification."""
+ return self._start()
+
+ def stop(self):
+ """See activated.Activated.stop for specification."""
+ self._stop()
+
+ def port(self):
+ """Identifies the port on which this ForeLink is servicing RPCs.
+
+ Returns:
+ The number of the port on which this ForeLink is servicing RPCs, or None
+ if this ForeLink is not currently activated and servicing RPCs.
+ """
+ with self._condition:
+ return self._port
+
def accept_back_to_front_ticket(self, ticket):
"""See ticket_interfaces.ForeLink.accept_back_to_front_ticket for spec."""
with self._condition:
diff --git a/src/python/src/grpc/_adapter/rear.py b/src/python/src/grpc/_adapter/rear.py
index c47c0aa020..cbcf121d9a 100644
--- a/src/python/src/grpc/_adapter/rear.py
+++ b/src/python/src/grpc/_adapter/rear.py
@@ -39,6 +39,7 @@ from grpc._adapter import _low
from grpc.framework.base.packets import interfaces as ticket_interfaces
from grpc.framework.base.packets import null
from grpc.framework.base.packets import packets as tickets
+from grpc.framework.foundation import activated
_INVOCATION_EVENT_KINDS = (
_low.Event.Kind.METADATA_ACCEPTED,
@@ -84,7 +85,7 @@ def _write(operation_id, call, outstanding, write_state, serialized_payload):
raise ValueError('Write attempted after writes completed!')
-class RearLink(ticket_interfaces.RearLink):
+class RearLink(ticket_interfaces.RearLink, activated.Activated):
"""An invocation-side bridge between RPC Framework and the C-ish _low code."""
def __init__(
@@ -297,7 +298,7 @@ class RearLink(ticket_interfaces.RearLink):
with self._condition:
self._fore_link = null.NULL_FORE_LINK if fore_link is None else fore_link
- def start(self):
+ def _start(self):
"""Starts this RearLink.
This method must be called before attempting to exchange tickets with this
@@ -306,8 +307,9 @@ class RearLink(ticket_interfaces.RearLink):
with self._condition:
self._completion_queue = _low.CompletionQueue()
self._channel = _low.Channel('%s:%d' % (self._host, self._port))
+ return self
- def stop(self):
+ def _stop(self):
"""Stops this RearLink.
This method must be called for proper termination of this object, and no
@@ -321,6 +323,23 @@ class RearLink(ticket_interfaces.RearLink):
while self._spinning:
self._condition.wait()
+ def __enter__(self):
+ """See activated.Activated.__enter__ for specification."""
+ return self._start()
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ """See activated.Activated.__exit__ for specification."""
+ self._stop()
+ return False
+
+ def start(self):
+ """See activated.Activated.start for specification."""
+ return self._start()
+
+ def stop(self):
+ """See activated.Activated.stop for specification."""
+ self._stop()
+
def accept_front_to_back_ticket(self, ticket):
"""See ticket_interfaces.RearLink.accept_front_to_back_ticket for spec."""
with self._condition:
diff --git a/src/python/src/grpc/early_adopter/implementations.py b/src/python/src/grpc/early_adopter/implementations.py
index c549317d15..1d76d0f9e0 100644
--- a/src/python/src/grpc/early_adopter/implementations.py
+++ b/src/python/src/grpc/early_adopter/implementations.py
@@ -70,7 +70,8 @@ class _Server(interfaces.Server):
self._pool, self._breakdown.request_deserializers,
self._breakdown.response_serializers, None,
((self._private_key, self._certificate_chain),), port=self._port)
- port = self._fore_link.start()
+ self._fore_link.start()
+ port = self._fore_link.port()
self._back = _tickets_implementations.back(
servicer, self._pool, self._pool, self._pool, _MEGA_TIMEOUT,
_MEGA_TIMEOUT)
diff --git a/src/python/src/grpc/framework/assembly/__init__.py b/src/python/src/grpc/framework/assembly/__init__.py
new file mode 100644
index 0000000000..7086519106
--- /dev/null
+++ b/src/python/src/grpc/framework/assembly/__init__.py
@@ -0,0 +1,30 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
diff --git a/src/python/src/grpc/framework/assembly/implementations.py b/src/python/src/grpc/framework/assembly/implementations.py
new file mode 100644
index 0000000000..461aa9c855
--- /dev/null
+++ b/src/python/src/grpc/framework/assembly/implementations.py
@@ -0,0 +1,305 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Implementations for assembling RPC framework values."""
+
+import threading
+
+from grpc.framework.assembly import interfaces
+from grpc.framework.base import util as base_utilities
+from grpc.framework.base.packets import implementations as tickets_implementations
+from grpc.framework.base.packets import interfaces as tickets_interfaces
+from grpc.framework.common import cardinality
+from grpc.framework.common import style
+from grpc.framework.face import implementations as face_implementations
+from grpc.framework.face import interfaces as face_interfaces
+from grpc.framework.face import utilities as face_utilities
+from grpc.framework.foundation import activated
+from grpc.framework.foundation import logging_pool
+
+_ONE_DAY_IN_SECONDS = 60 * 60 * 24
+_THREAD_POOL_SIZE = 100
+
+
+class _FaceStub(object):
+
+ def __init__(self, rear_link):
+ self._rear_link = rear_link
+ self._lock = threading.Lock()
+ self._pool = None
+ self._front = None
+ self._under_stub = None
+
+ def __enter__(self):
+ with self._lock:
+ self._pool = logging_pool.pool(_THREAD_POOL_SIZE)
+ self._front = tickets_implementations.front(
+ self._pool, self._pool, self._pool)
+ self._rear_link.start()
+ self._rear_link.join_fore_link(self._front)
+ self._front.join_rear_link(self._rear_link)
+ self._under_stub = face_implementations.stub(self._front, self._pool)
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ with self._lock:
+ self._under_stub = None
+ self._rear_link.stop()
+ base_utilities.wait_for_idle(self._front)
+ self._front = None
+ self._pool.shutdown(wait=True)
+ self._pool = None
+ return False
+
+ def __getattr__(self, attr):
+ with self._lock:
+ if self._under_stub is None:
+ raise ValueError('Called out of context!')
+ else:
+ return getattr(self._under_stub, attr)
+
+
+def _behaviors(implementations, front, pool):
+ behaviors = {}
+ stub = face_implementations.stub(front, pool)
+ for name, implementation in implementations.iteritems():
+ if implementation.cardinality is cardinality.Cardinality.UNARY_UNARY:
+ behaviors[name] = stub.unary_unary_sync_async(name)
+ elif implementation.cardinality is cardinality.Cardinality.UNARY_STREAM:
+ behaviors[name] = lambda request, context, bound_name=name: (
+ stub.inline_value_in_stream_out(bound_name, request, context))
+ elif implementation.cardinality is cardinality.Cardinality.STREAM_UNARY:
+ behaviors[name] = stub.stream_unary_sync_async(name)
+ elif implementation.cardinality is cardinality.Cardinality.STREAM_STREAM:
+ behaviors[name] = lambda request_iterator, context, bound_name=name: (
+ stub.inline_stream_in_stream_out(
+ bound_name, request_iterator, context))
+ return behaviors
+
+
+class _DynamicInlineStub(object):
+
+ def __init__(self, implementations, rear_link):
+ self._implementations = implementations
+ self._rear_link = rear_link
+ self._lock = threading.Lock()
+ self._pool = None
+ self._front = None
+ self._behaviors = None
+
+ def __enter__(self):
+ with self._lock:
+ self._pool = logging_pool.pool(_THREAD_POOL_SIZE)
+ self._front = tickets_implementations.front(
+ self._pool, self._pool, self._pool)
+ self._rear_link.start()
+ self._rear_link.join_fore_link(self._front)
+ self._front.join_rear_link(self._rear_link)
+ self._behaviors = _behaviors(
+ self._implementations, self._front, self._pool)
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ with self._lock:
+ self._behaviors = None
+ self._rear_link.stop()
+ base_utilities.wait_for_idle(self._front)
+ self._front = None
+ self._pool.shutdown(wait=True)
+ self._pool = None
+ return False
+
+ def __getattr__(self, attr):
+ with self._lock:
+ behavior = self._behaviors.get(attr)
+ if behavior is None:
+ raise AttributeError(attr)
+ else:
+ return behavior
+
+
+def _servicer(implementations, pool):
+ inline_value_in_value_out_methods = {}
+ inline_value_in_stream_out_methods = {}
+ inline_stream_in_value_out_methods = {}
+ inline_stream_in_stream_out_methods = {}
+ event_value_in_value_out_methods = {}
+ event_value_in_stream_out_methods = {}
+ event_stream_in_value_out_methods = {}
+ event_stream_in_stream_out_methods = {}
+
+ for name, implementation in implementations.iteritems():
+ if implementation.cardinality is cardinality.Cardinality.UNARY_UNARY:
+ if implementation.style is style.Service.INLINE:
+ inline_value_in_value_out_methods[name] = (
+ face_utilities.inline_unary_unary_method(implementation.unary_unary_inline))
+ elif implementation.style is style.Service.EVENT:
+ event_value_in_value_out_methods[name] = (
+ face_utilities.event_unary_unary_method(implementation.unary_unary_event))
+ elif implementation.cardinality is cardinality.Cardinality.UNARY_STREAM:
+ if implementation.style is style.Service.INLINE:
+ inline_value_in_stream_out_methods[name] = (
+ face_utilities.inline_unary_stream_method(implementation.unary_stream_inline))
+ elif implementation.style is style.Service.EVENT:
+ event_value_in_stream_out_methods[name] = (
+ face_utilities.event_unary_stream_method(implementation.unary_stream_event))
+ if implementation.cardinality is cardinality.Cardinality.STREAM_UNARY:
+ if implementation.style is style.Service.INLINE:
+ inline_stream_in_value_out_methods[name] = (
+ face_utilities.inline_stream_unary_method(implementation.stream_unary_inline))
+ elif implementation.style is style.Service.EVENT:
+ event_stream_in_value_out_methods[name] = (
+ face_utilities.event_stream_unary_method(implementation.stream_unary_event))
+ elif implementation.cardinality is cardinality.Cardinality.STREAM_STREAM:
+ if implementation.style is style.Service.INLINE:
+ inline_stream_in_stream_out_methods[name] = (
+ face_utilities.inline_stream_stream_method(implementation.stream_stream_inline))
+ elif implementation.style is style.Service.EVENT:
+ event_stream_in_stream_out_methods[name] = (
+ face_utilities.event_stream_stream_method(implementation.stream_stream_event))
+
+ return face_implementations.servicer(
+ pool,
+ inline_value_in_value_out_methods=inline_value_in_value_out_methods,
+ inline_value_in_stream_out_methods=inline_value_in_stream_out_methods,
+ inline_stream_in_value_out_methods=inline_stream_in_value_out_methods,
+ inline_stream_in_stream_out_methods=inline_stream_in_stream_out_methods,
+ event_value_in_value_out_methods=event_value_in_value_out_methods,
+ event_value_in_stream_out_methods=event_value_in_stream_out_methods,
+ event_stream_in_value_out_methods=event_stream_in_value_out_methods,
+ event_stream_in_stream_out_methods=event_stream_in_stream_out_methods)
+
+
+class _ServiceAssembly(activated.Activated):
+
+ def __init__(self, implementations, fore_link):
+ self._implementations = implementations
+ self._fore_link = fore_link
+ self._lock = threading.Lock()
+ self._pool = None
+ self._back = None
+
+ def _start(self):
+ with self._lock:
+ self._pool = logging_pool.pool(_THREAD_POOL_SIZE)
+ servicer = _servicer(self._implementations, self._pool)
+ self._back = tickets_implementations.back(
+ servicer, self._pool, self._pool, self._pool, _ONE_DAY_IN_SECONDS,
+ _ONE_DAY_IN_SECONDS)
+ self._fore_link.start()
+ self._fore_link.join_rear_link(self._back)
+ self._back.join_fore_link(self._fore_link)
+
+ def _stop(self):
+ with self._lock:
+ self._fore_link.stop()
+ base_utilities.wait_for_idle(self._back)
+ self._back = None
+ self._pool.shutdown(wait=True)
+ self._pool = None
+
+ def __enter__(self):
+ self._start()
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self._stop()
+ return False
+
+ def start(self):
+ return self._start()
+
+ def stop(self):
+ self._stop()
+
+
+def assemble_face_stub(activated_rear_link):
+ """Assembles a face_interfaces.Stub.
+
+ The returned object is a context manager and may only be used in context to
+ invoke RPCs.
+
+ Args:
+ activated_rear_link: An object that is both a tickets_interfaces.RearLink
+ and an activated.Activated. The object should be in the inactive state
+ when passed to this method.
+
+ Returns:
+ A face_interfaces.Stub on which, in context, RPCs can be invoked.
+ """
+ return _FaceStub(activated_rear_link)
+
+
+def assemble_dynamic_inline_stub(implementations, activated_rear_link):
+ """Assembles a stub with method names for attributes.
+
+ The returned object is a context manager and may only be used in context to
+ invoke RPCs.
+
+ The returned object, when used in context, will respond to attribute access
+ as follows: if the requested attribute is the name of a unary-unary RPC
+ method, the value of the attribute will be a
+ face_interfaces.UnaryUnarySyncAsync with which to invoke the RPC method. If
+ the requested attribute is the name of a unary-stream RPC method, the value
+ of the attribute will be a callable with the semantics of
+ face_interfaces.Stub.inline_value_in_stream_out, minus the "name" parameter,
+ with which to invoke the RPC method. If the requested attribute is the name
+ of a stream-unary RPC method, the value of the attribute will be a
+ face_interfaces.StreamUnarySyncAsync with which to invoke the RPC method. If
+ the requested attribute is the name of a stream-stream RPC method, the value
+ of the attribute will be a callable with the semantics of
+ face_interfaces.Stub.inline_stream_in_stream_out, minus the "name" parameter,
+ with which to invoke the RPC method.
+
+ Args:
+ implementations: A dictionary from RPC method name to
+ interfaces.MethodImplementation.
+ activated_rear_link: An object that is both a tickets_interfaces.RearLink
+ and an activated.Activated. The object should be in the inactive state
+ when passed to this method.
+
+ Returns:
+ A stub on which, in context, RPCs can be invoked.
+ """
+ return _DynamicInlineStub(implementations, activated_rear_link)
+
+
+def assemble_service(implementations, activated_fore_link):
+ """Assembles the service-side of the RPC Framework stack.
+
+ Args:
+ implementations: A dictionary from RPC method name to
+ interfaces.MethodImplementation.
+ activated_fore_link: An object that is both a tickets_interfaces.ForeLink
+ and an activated.Activated. The object should be in the inactive state
+ when passed to this method.
+
+ Returns:
+ An activated.Activated value encapsulating RPC service.
+ """
+ return _ServiceAssembly(implementations, activated_fore_link)
diff --git a/src/python/src/grpc/framework/assembly/implementations_test.py b/src/python/src/grpc/framework/assembly/implementations_test.py
new file mode 100644
index 0000000000..74dc02ed83
--- /dev/null
+++ b/src/python/src/grpc/framework/assembly/implementations_test.py
@@ -0,0 +1,284 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+# TODO(nathaniel): Expand this test coverage.
+
+"""Test of the GRPC-backed ForeLink and RearLink."""
+
+import threading
+import unittest
+
+from grpc.framework.assembly import implementations
+from grpc.framework.assembly import utilities
+from grpc.framework.base import interfaces
+from grpc.framework.base.packets import packets as tickets
+from grpc.framework.base.packets import interfaces as tickets_interfaces
+from grpc.framework.base.packets import null
+from grpc.framework.foundation import logging_pool
+from grpc._junkdrawer import math_pb2
+
+DIV = 'Div'
+DIV_MANY = 'DivMany'
+FIB = 'Fib'
+SUM = 'Sum'
+
+def _fibbonacci(limit):
+ left, right = 0, 1
+ for _ in xrange(limit):
+ yield left
+ left, right = right, left + right
+
+
+def _div(request, unused_context):
+ return math_pb2.DivReply(
+ quotient=request.dividend / request.divisor,
+ remainder=request.dividend % request.divisor)
+
+
+def _div_many(request_iterator, unused_context):
+ for request in request_iterator:
+ yield math_pb2.DivReply(
+ quotient=request.dividend / request.divisor,
+ remainder=request.dividend % request.divisor)
+
+
+def _fib(request, unused_context):
+ for number in _fibbonacci(request.limit):
+ yield math_pb2.Num(num=number)
+
+
+def _sum(request_iterator, unused_context):
+ accumulation = 0
+ for request in request_iterator:
+ accumulation += request.num
+ return math_pb2.Num(num=accumulation)
+
+
+_IMPLEMENTATIONS = {
+ DIV: utilities.unary_unary_inline(_div),
+ DIV_MANY: utilities.stream_stream_inline(_div_many),
+ FIB: utilities.unary_stream_inline(_fib),
+ SUM: utilities.stream_unary_inline(_sum),
+}
+
+_TIMEOUT = 10
+
+
+class PipeLink(tickets_interfaces.ForeLink, tickets_interfaces.RearLink):
+
+ def __init__(self):
+ self._fore_lock = threading.Lock()
+ self._fore_link = null.NULL_FORE_LINK
+ self._rear_lock = threading.Lock()
+ self._rear_link = null.NULL_REAR_LINK
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ return False
+
+ def start(self):
+ pass
+
+ def stop(self):
+ pass
+
+ def accept_back_to_front_ticket(self, ticket):
+ with self._fore_lock:
+ self._fore_link.accept_back_to_front_ticket(ticket)
+
+ def join_rear_link(self, rear_link):
+ with self._rear_lock:
+ self._rear_link = null.NULL_REAR_LINK if rear_link is None else rear_link
+
+ def accept_front_to_back_ticket(self, ticket):
+ with self._rear_lock:
+ self._rear_link.accept_front_to_back_ticket(ticket)
+
+ def join_fore_link(self, fore_link):
+ with self._fore_lock:
+ self._fore_link = null.NULL_FORE_LINK if fore_link is None else fore_link
+
+
+class FaceStubTest(unittest.TestCase):
+
+ def testUnaryUnary(self):
+ divisor = 7
+ dividend = 13
+ expected_quotient = 1
+ expected_remainder = 6
+ pipe = PipeLink()
+ service = implementations.assemble_service(_IMPLEMENTATIONS, pipe)
+ face_stub = implementations.assemble_face_stub(pipe)
+
+ service.start()
+ try:
+ with face_stub:
+ response = face_stub.blocking_value_in_value_out(
+ DIV, math_pb2.DivArgs(divisor=divisor, dividend=dividend),
+ _TIMEOUT)
+ self.assertEqual(expected_quotient, response.quotient)
+ self.assertEqual(expected_remainder, response.remainder)
+ finally:
+ service.stop()
+
+ def testUnaryStream(self):
+ stream_length = 29
+ pipe = PipeLink()
+ service = implementations.assemble_service(_IMPLEMENTATIONS, pipe)
+ face_stub = implementations.assemble_face_stub(pipe)
+
+ with service, face_stub:
+ responses = list(
+ face_stub.inline_value_in_stream_out(
+ FIB, math_pb2.FibArgs(limit=stream_length), _TIMEOUT))
+ numbers = [response.num for response in responses]
+ for early, middle, later in zip(numbers, numbers[1:], numbers[2:]):
+ self.assertEqual(early + middle, later)
+
+ def testStreamUnary(self):
+ stream_length = 13
+ pipe = PipeLink()
+ service = implementations.assemble_service(_IMPLEMENTATIONS, pipe)
+ face_stub = implementations.assemble_face_stub(pipe)
+
+ with service, face_stub:
+ sync_async = face_stub.stream_unary_sync_async(SUM)
+ response_future = sync_async.async(
+ (math_pb2.Num(num=index) for index in range(stream_length)),
+ _TIMEOUT)
+ self.assertEqual(
+ (stream_length * (stream_length - 1)) / 2,
+ response_future.result().num)
+
+ def testStreamStream(self):
+ stream_length = 17
+ divisor_offset = 7
+ dividend_offset = 17
+ pipe = PipeLink()
+ service = implementations.assemble_service(_IMPLEMENTATIONS, pipe)
+ face_stub = implementations.assemble_face_stub(pipe)
+
+ with service, face_stub:
+ response_iterator = face_stub.inline_stream_in_stream_out(
+ DIV_MANY,
+ (math_pb2.DivArgs(
+ divisor=divisor_offset + index,
+ dividend=dividend_offset + index)
+ for index in range(stream_length)),
+ _TIMEOUT)
+ for index, response in enumerate(response_iterator):
+ self.assertEqual(
+ (dividend_offset + index) / (divisor_offset + index),
+ response.quotient)
+ self.assertEqual(
+ (dividend_offset + index) % (divisor_offset + index),
+ response.remainder)
+ self.assertEqual(stream_length, index + 1)
+
+
+class DynamicInlineStubTest(unittest.TestCase):
+
+ def testUnaryUnary(self):
+ divisor = 59
+ dividend = 973
+ expected_quotient = dividend / divisor
+ expected_remainder = dividend % divisor
+ pipe = PipeLink()
+ service = implementations.assemble_service(_IMPLEMENTATIONS, pipe)
+ dynamic_stub = implementations.assemble_dynamic_inline_stub(
+ _IMPLEMENTATIONS, pipe)
+
+ service.start()
+ with dynamic_stub:
+ response = dynamic_stub.Div(
+ math_pb2.DivArgs(divisor=divisor, dividend=dividend), _TIMEOUT)
+ self.assertEqual(expected_quotient, response.quotient)
+ self.assertEqual(expected_remainder, response.remainder)
+ service.stop()
+
+ def testUnaryStream(self):
+ stream_length = 43
+ pipe = PipeLink()
+ service = implementations.assemble_service(_IMPLEMENTATIONS, pipe)
+ dynamic_stub = implementations.assemble_dynamic_inline_stub(
+ _IMPLEMENTATIONS, pipe)
+
+ with service, dynamic_stub:
+ response_iterator = dynamic_stub.Fib(
+ math_pb2.FibArgs(limit=stream_length), _TIMEOUT)
+ numbers = tuple(response.num for response in response_iterator)
+ for early, middle, later in zip(numbers, numbers[:1], numbers[:2]):
+ self.assertEqual(early + middle, later)
+ self.assertEqual(stream_length, len(numbers))
+
+ def testStreamUnary(self):
+ stream_length = 127
+ pipe = PipeLink()
+ service = implementations.assemble_service(_IMPLEMENTATIONS, pipe)
+ dynamic_stub = implementations.assemble_dynamic_inline_stub(
+ _IMPLEMENTATIONS, pipe)
+
+ with service, dynamic_stub:
+ response_future = dynamic_stub.Sum.async(
+ (math_pb2.Num(num=index) for index in range(stream_length)),
+ _TIMEOUT)
+ self.assertEqual(
+ (stream_length * (stream_length - 1)) / 2,
+ response_future.result().num)
+
+ def testStreamStream(self):
+ stream_length = 179
+ divisor_offset = 71
+ dividend_offset = 1763
+ pipe = PipeLink()
+ service = implementations.assemble_service(_IMPLEMENTATIONS, pipe)
+ dynamic_stub = implementations.assemble_dynamic_inline_stub(
+ _IMPLEMENTATIONS, pipe)
+
+ with service, dynamic_stub:
+ response_iterator = dynamic_stub.DivMany(
+ (math_pb2.DivArgs(
+ divisor=divisor_offset + index,
+ dividend=dividend_offset + index)
+ for index in range(stream_length)),
+ _TIMEOUT)
+ for index, response in enumerate(response_iterator):
+ self.assertEqual(
+ (dividend_offset + index) / (divisor_offset + index),
+ response.quotient)
+ self.assertEqual(
+ (dividend_offset + index) % (divisor_offset + index),
+ response.remainder)
+ self.assertEqual(stream_length, index + 1)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/src/python/src/grpc/framework/assembly/interfaces.py b/src/python/src/grpc/framework/assembly/interfaces.py
new file mode 100644
index 0000000000..e5d750b2bc
--- /dev/null
+++ b/src/python/src/grpc/framework/assembly/interfaces.py
@@ -0,0 +1,91 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+# TODO(nathaniel): The assembly layer only exists to smooth out wrinkles in
+# the face layer. The two should be squashed together as soon as manageable.
+"""Interfaces for assembling RPC Framework values."""
+
+import abc
+
+# cardinality, style, and stream are referenced from specification in this
+# module.
+from grpc.framework.common import cardinality # pylint: disable=unused-import
+from grpc.framework.common import style # pylint: disable=unused-import
+from grpc.framework.foundation import stream # pylint: disable=unused-import
+
+
+class MethodImplementation(object):
+ """A sum type that describes an RPC method implementation.
+
+ Attributes:
+ cardinality: A cardinality.Cardinality value.
+ style: A style.Service value.
+ unary_unary_inline: The implementation of the RPC method as a callable
+ value that takes a request value and a face_interfaces.RpcContext object
+ and returns a response value. Only non-None if cardinality is
+ cardinality.Cardinality.UNARY_UNARY and style is style.Service.INLINE.
+ unary_stream_inline: The implementation of the RPC method as a callable
+ value that takes a request value and a face_interfaces.RpcContext object
+ and returns an iterator of response values. Only non-None if cardinality
+ is cardinality.Cardinality.UNARY_STREAM and style is
+ style.Service.INLINE.
+ stream_unary_inline: The implementation of the RPC method as a callable
+ value that takes an iterator of request values and a
+ face_interfaces.RpcContext object and returns a response value. Only
+ non-None if cardinality is cardinality.Cardinality.STREAM_UNARY and style
+ is style.Service.INLINE.
+ stream_stream_inline: The implementation of the RPC method as a callable
+ value that takes an iterator of request values and a
+ face_interfaces.RpcContext object and returns an iterator of response
+ values. Only non-None if cardinality is
+ cardinality.Cardinality.STREAM_STREAM and style is style.Service.INLINE.
+ unary_unary_event: The implementation of the RPC method as a callable value
+ that takes a request value, a response callback to which to pass the
+ response value of the RPC, and a face_interfaces.RpcContext. Only
+ non-None if cardinality is cardinality.Cardinality.UNARY_UNARY and style
+ is style.Service.EVENT.
+ unary_stream_event: The implementation of the RPC method as a callable
+ value that takes a request value, a stream.Consumer to which to pass the
+ the response values of the RPC, and a face_interfaces.RpcContext. Only
+ non-None if cardinality is cardinality.Cardinality.UNARY_STREAM and style
+ is style.Service.EVENT.
+ stream_unary_event: The implementation of the RPC method as a callable
+ value that takes a response callback to which to pass the response value
+ of the RPC and a face_interfaces.RpcContext and returns a stream.Consumer
+ to which the request values of the RPC should be passed. Only non-None if
+ cardinality is cardinality.Cardinality.STREAM_UNARY and style is
+ style.Service.EVENT.
+ stream_stream_event: The implementation of the RPC method as a callable
+ value that takes a stream.Consumer to which to pass the response values
+ of the RPC and a face_interfaces.RpcContext and returns a stream.Consumer
+ to which the request values of the RPC should be passed. Only non-None if
+ cardinality is cardinality.Cardinality.STREAM_STREAM and style is
+ style.Service.EVENT.
+ """
+ __metaclass__ = abc.ABCMeta
diff --git a/src/python/src/grpc/framework/assembly/utilities.py b/src/python/src/grpc/framework/assembly/utilities.py
new file mode 100644
index 0000000000..80e7f59c03
--- /dev/null
+++ b/src/python/src/grpc/framework/assembly/utilities.py
@@ -0,0 +1,179 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Utilities for assembling RPC framework values."""
+
+import collections
+
+from grpc.framework.assembly import interfaces
+from grpc.framework.common import cardinality
+from grpc.framework.common import style
+from grpc.framework.face import interfaces as face_interfaces
+from grpc.framework.foundation import stream
+
+
+class _MethodImplementation(
+ interfaces.MethodImplementation,
+ collections.namedtuple(
+ '_MethodImplementation',
+ ['cardinality', 'style', 'unary_unary_inline', 'unary_stream_inline',
+ 'stream_unary_inline', 'stream_stream_inline', 'unary_unary_event',
+ 'unary_stream_event', 'stream_unary_event', 'stream_stream_event',])):
+ pass
+
+
+def unary_unary_inline(behavior):
+ """Creates an interfaces.MethodImplementation for the given behavior.
+
+ Args:
+ behavior: The implementation of a unary-unary RPC method as a callable value
+ that takes a request value and a face_interfaces.RpcContext object and
+ returns a response value.
+
+ Returns:
+ An interfaces.MethodImplementation derived from the given behavior.
+ """
+ return _MethodImplementation(
+ cardinality.Cardinality.UNARY_UNARY, style.Service.INLINE, behavior,
+ None, None, None, None, None, None, None)
+
+
+def unary_stream_inline(behavior):
+ """Creates an interfaces.MethodImplementation for the given behavior.
+
+ Args:
+ behavior: The implementation of a unary-stream RPC method as a callable
+ value that takes a request value and a face_interfaces.RpcContext object
+ and returns an iterator of response values.
+
+ Returns:
+ An interfaces.MethodImplementation derived from the given behavior.
+ """
+ return _MethodImplementation(
+ cardinality.Cardinality.UNARY_STREAM, style.Service.INLINE, None,
+ behavior, None, None, None, None, None, None)
+
+
+def stream_unary_inline(behavior):
+ """Creates an interfaces.MethodImplementation for the given behavior.
+
+ Args:
+ behavior: The implementation of a stream-unary RPC method as a callable
+ value that takes an iterator of request values and a
+ face_interfaces.RpcContext object and returns a response value.
+
+ Returns:
+ An interfaces.MethodImplementation derived from the given behavior.
+ """
+ return _MethodImplementation(
+ cardinality.Cardinality.STREAM_UNARY, style.Service.INLINE, None, None,
+ behavior, None, None, None, None, None)
+
+
+def stream_stream_inline(behavior):
+ """Creates an interfaces.MethodImplementation for the given behavior.
+
+ Args:
+ behavior: The implementation of a stream-stream RPC method as a callable
+ value that takes an iterator of request values and a
+ face_interfaces.RpcContext object and returns an iterator of response
+ values.
+
+ Returns:
+ An interfaces.MethodImplementation derived from the given behavior.
+ """
+ return _MethodImplementation(
+ cardinality.Cardinality.STREAM_STREAM, style.Service.INLINE, None, None,
+ None, behavior, None, None, None, None)
+
+
+def unary_unary_event(behavior):
+ """Creates an interfaces.MethodImplementation for the given behavior.
+
+ Args:
+ behavior: The implementation of a unary-unary RPC method as a callable
+ value that takes a request value, a response callback to which to pass
+ the response value of the RPC, and a face_interfaces.RpcContext.
+
+ Returns:
+ An interfaces.MethodImplementation derived from the given behavior.
+ """
+ return _MethodImplementation(
+ cardinality.Cardinality.UNARY_UNARY, style.Service.EVENT, None, None,
+ None, None, behavior, None, None, None)
+
+
+def unary_stream_event(behavior):
+ """Creates an interfaces.MethodImplementation for the given behavior.
+
+ Args:
+ behavior: The implementation of a unary-stream RPC method as a callable
+ value that takes a request value, a stream.Consumer to which to pass the
+ the response values of the RPC, and a face_interfaces.RpcContext.
+
+ Returns:
+ An interfaces.MethodImplementation derived from the given behavior.
+ """
+ return _MethodImplementation(
+ cardinality.Cardinality.UNARY_STREAM, style.Service.EVENT, None, None,
+ None, None, None, behavior, None, None)
+
+
+def stream_unary_event(behavior):
+ """Creates an interfaces.MethodImplementation for the given behavior.
+
+ Args:
+ behavior: The implementation of a stream-unary RPC method as a callable
+ value that takes a response callback to which to pass the response value
+ of the RPC and a face_interfaces.RpcContext and returns a stream.Consumer
+ to which the request values of the RPC should be passed.
+
+ Returns:
+ An interfaces.MethodImplementation derived from the given behavior.
+ """
+ return _MethodImplementation(
+ cardinality.Cardinality.STREAM_UNARY, style.Service.EVENT, None, None,
+ None, None, None, None, behavior, None)
+
+
+def stream_stream_event(behavior):
+ """Creates an interfaces.MethodImplementation for the given behavior.
+
+ Args:
+ behavior: The implementation of a stream-stream RPC method as a callable
+ value that takes a stream.Consumer to which to pass the response values
+ of the RPC and a face_interfaces.RpcContext and returns a stream.Consumer
+ to which the request values of the RPC should be passed.
+
+ Returns:
+ An interfaces.MethodImplementation derived from the given behavior.
+ """
+ return _MethodImplementation(
+ cardinality.Cardinality.STREAM_STREAM, style.Service.EVENT, None, None,
+ None, None, None, None, None, behavior)
diff --git a/src/python/src/grpc/framework/common/style.py b/src/python/src/grpc/framework/common/style.py
new file mode 100644
index 0000000000..6ae694bdcb
--- /dev/null
+++ b/src/python/src/grpc/framework/common/style.py
@@ -0,0 +1,40 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Defines an enum for classifying RPC methods by control flow semantics."""
+
+import enum
+
+
+@enum.unique
+class Service(enum.Enum):
+ """Describes the control flow style of RPC method implementation."""
+
+ INLINE = 'inline'
+ EVENT = 'event'
diff --git a/src/python/src/grpc/framework/face/utilities.py b/src/python/src/grpc/framework/face/utilities.py
new file mode 100644
index 0000000000..5e34be37da
--- /dev/null
+++ b/src/python/src/grpc/framework/face/utilities.py
@@ -0,0 +1,221 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Utilities for the face layer of RPC Framework."""
+
+# stream is referenced from specification in this module.
+from grpc.framework.face import interfaces
+from grpc.framework.foundation import stream # pylint: disable=unused-import
+
+
+class _InlineUnaryUnaryMethod(interfaces.InlineValueInValueOutMethod):
+
+ def __init__(self, behavior):
+ self._behavior = behavior
+
+ def service(self, request, context):
+ return self._behavior(request, context)
+
+
+class _InlineUnaryStreamMethod(interfaces.InlineValueInStreamOutMethod):
+
+ def __init__(self, behavior):
+ self._behavior = behavior
+
+ def service(self, request, context):
+ return self._behavior(request, context)
+
+
+class _InlineStreamUnaryMethod(interfaces.InlineStreamInValueOutMethod):
+
+ def __init__(self, behavior):
+ self._behavior = behavior
+
+ def service(self, request_iterator, context):
+ return self._behavior(request_iterator, context)
+
+
+class _InlineStreamStreamMethod(interfaces.InlineStreamInStreamOutMethod):
+
+ def __init__(self, behavior):
+ self._behavior = behavior
+
+ def service(self, request_iterator, context):
+ return self._behavior(request_iterator, context)
+
+
+class _EventUnaryUnaryMethod(interfaces.EventValueInValueOutMethod):
+
+ def __init__(self, behavior):
+ self._behavior = behavior
+
+ def service(self, request, response_callback, context):
+ return self._behavior(request, response_callback, context)
+
+
+class _EventUnaryStreamMethod(interfaces.EventValueInStreamOutMethod):
+
+ def __init__(self, behavior):
+ self._behavior = behavior
+
+ def service(self, request, response_consumer, context):
+ return self._behavior(request, response_consumer, context)
+
+
+class _EventStreamUnaryMethod(interfaces.EventStreamInValueOutMethod):
+
+ def __init__(self, behavior):
+ self._behavior = behavior
+
+ def service(self, response_callback, context):
+ return self._behavior(response_callback, context)
+
+
+class _EventStreamStreamMethod(interfaces.EventStreamInStreamOutMethod):
+
+ def __init__(self, behavior):
+ self._behavior = behavior
+
+ def service(self, response_consumer, context):
+ return self._behavior(response_consumer, context)
+
+
+def inline_unary_unary_method(behavior):
+ """Creates an interfaces.InlineValueInValueOutMethod from a behavior.
+
+ Args:
+ behavior: The implementation of a unary-unary RPC method as a callable
+ value that takes a request value and an interfaces.RpcContext object and
+ returns a response value.
+
+ Returns:
+ An interfaces.InlineValueInValueOutMethod derived from the given behavior.
+ """
+ return _InlineUnaryUnaryMethod(behavior)
+
+
+def inline_unary_stream_method(behavior):
+ """Creates an interfaces.InlineValueInStreamOutMethod from a behavior.
+
+ Args:
+ behavior: The implementation of a unary-stream RPC method as a callable
+ value that takes a request value and an interfaces.RpcContext object and
+ returns an iterator of response values.
+
+ Returns:
+ An interfaces.InlineValueInStreamOutMethod derived from the given behavior.
+ """
+ return _InlineUnaryStreamMethod(behavior)
+
+
+def inline_stream_unary_method(behavior):
+ """Creates an interfaces.InlineStreamInValueOutMethod from a behavior.
+
+ Args:
+ behavior: The implementation of a stream-unary RPC method as a callable
+ value that takes an iterator of request values and an
+ interfaces.RpcContext object and returns a response value.
+
+ Returns:
+ An interfaces.InlineStreamInValueOutMethod derived from the given behavior.
+ """
+ return _InlineStreamUnaryMethod(behavior)
+
+
+def inline_stream_stream_method(behavior):
+ """Creates an interfaces.InlineStreamInStreamOutMethod from a behavior.
+
+ Args:
+ behavior: The implementation of a stream-stream RPC method as a callable
+ value that takes an iterator of request values and an
+ interfaces.RpcContext object and returns an iterator of response values.
+
+ Returns:
+ An interfaces.InlineStreamInStreamOutMethod derived from the given
+ behavior.
+ """
+ return _InlineStreamStreamMethod(behavior)
+
+
+def event_unary_unary_method(behavior):
+ """Creates an interfaces.EventValueInValueOutMethod from a behavior.
+
+ Args:
+ behavior: The implementation of a unary-unary RPC method as a callable
+ value that takes a request value, a response callback to which to pass
+ the response value of the RPC, and an interfaces.RpcContext.
+
+ Returns:
+ An interfaces.EventValueInValueOutMethod derived from the given behavior.
+ """
+ return _EventUnaryUnaryMethod(behavior)
+
+
+def event_unary_stream_method(behavior):
+ """Creates an interfaces.EventValueInStreamOutMethod from a behavior.
+
+ Args:
+ behavior: The implementation of a unary-stream RPC method as a callable
+ value that takes a request value, a stream.Consumer to which to pass the
+ response values of the RPC, and an interfaces.RpcContext.
+
+ Returns:
+ An interfaces.EventValueInStreamOutMethod derived from the given behavior.
+ """
+ return _EventUnaryStreamMethod(behavior)
+
+
+def event_stream_unary_method(behavior):
+ """Creates an interfaces.EventStreamInValueOutMethod from a behavior.
+
+ Args:
+ behavior: The implementation of a stream-unary RPC method as a callable
+ value that takes a response callback to which to pass the response value
+ of the RPC and an interfaces.RpcContext and returns a stream.Consumer to
+ which the request values of the RPC should be passed.
+
+ Returns:
+ An interfaces.EventStreamInValueOutMethod derived from the given behavior.
+ """
+ return _EventStreamUnaryMethod(behavior)
+
+
+def event_stream_stream_method(behavior):
+ """Creates an interfaces.EventStreamInStreamOutMethod from a behavior.
+
+ Args:
+ behavior: The implementation of a stream-stream RPC method as a callable
+ value that takes a stream.Consumer to which to pass the response values
+ of the RPC and an interfaces.RpcContext and returns a stream.Consumer to
+ which the request values of the RPC should be passed.
+
+ Returns:
+ An interfaces.EventStreamInStreamOutMethod derived from the given behavior.
+ """
+ return _EventStreamStreamMethod(behavior)
diff --git a/src/python/src/grpc/framework/foundation/activated.py b/src/python/src/grpc/framework/foundation/activated.py
new file mode 100644
index 0000000000..426a71c705
--- /dev/null
+++ b/src/python/src/grpc/framework/foundation/activated.py
@@ -0,0 +1,65 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Interfaces related to streams of values or objects."""
+
+import abc
+
+
+class Activated(object):
+ """Interface for objects that may be started and stopped.
+
+ Values implementing this type must also implement the context manager
+ protocol.
+ """
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def __enter__(self):
+ """See the context manager protocol for specification."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ """See the context manager protocol for specification."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def start(self):
+ """Activates this object.
+
+ Returns:
+ A value equal to the value returned by this object's __enter__ method.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def stop(self):
+ """Deactivates this object."""
+ raise NotImplementedError()
diff --git a/src/python/src/setup.py b/src/python/src/setup.py
index 8e33ebb31c..e3f13fa5c8 100644
--- a/src/python/src/setup.py
+++ b/src/python/src/setup.py
@@ -56,12 +56,13 @@ _EXTENSION_MODULE = _core.Extension(
libraries=_EXTENSION_LIBRARIES,
)
-_PACKAGES=(
+_PACKAGES = (
'grpc',
'grpc._adapter',
'grpc._junkdrawer',
'grpc.early_adopter',
'grpc.framework',
+ 'grpc.framework.assembly',
'grpc.framework.base',
'grpc.framework.base.packets',
'grpc.framework.common',