aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Masood Malekghassemi <soltanmm@users.noreply.github.com>2015-08-27 00:17:05 -0400
committerGravatar Masood Malekghassemi <soltanmm@users.noreply.github.com>2015-08-27 00:17:05 -0400
commit967caaada09ab68acb6242311549012f1a07bbff (patch)
tree05892725470e96ec8e3e08ed79114664f65d319f
parentd739ece688dc1105f90b0d7f4ba25563dd78760e (diff)
parent4c8288ec010ed79bb50659fb6010b92a385e24ad (diff)
Merge pull request #3052 from nathanielmanistaatgoogle/crust
The RPC Framework crust package
-rw-r--r--src/python/grpcio/grpc/framework/crust/__init__.py30
-rw-r--r--src/python/grpcio/grpc/framework/crust/_calls.py204
-rw-r--r--src/python/grpcio/grpc/framework/crust/_control.py545
-rw-r--r--src/python/grpcio/grpc/framework/crust/_service.py166
-rw-r--r--src/python/grpcio/grpc/framework/crust/implementations.py352
-rw-r--r--src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py2
-rw-r--r--src/python/grpcio_test/grpc_test/_crust_over_core_over_links_face_interface_test.py160
-rw-r--r--src/python/grpcio_test/grpc_test/framework/_crust_over_core_face_interface_test.py111
-rw-r--r--src/python/grpcio_test/grpc_test/framework/interfaces/face/_3069_test_constant.py37
-rw-r--r--src/python/grpcio_test/grpc_test/framework/interfaces/face/_blocking_invocation_inline_service.py9
-rw-r--r--src/python/grpcio_test/grpc_test/framework/interfaces/face/_event_invocation_synchronous_event_service.py11
-rw-r--r--src/python/grpcio_test/grpc_test/framework/interfaces/face/_future_invocation_asynchronous_event_service.py17
-rw-r--r--src/python/grpcio_test/grpc_test/framework/interfaces/face/_stock_service.py2
-rwxr-xr-xtools/run_tests/run_python.sh2
14 files changed, 1630 insertions, 18 deletions
diff --git a/src/python/grpcio/grpc/framework/crust/__init__.py b/src/python/grpcio/grpc/framework/crust/__init__.py
new file mode 100644
index 0000000000..7086519106
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/crust/__init__.py
@@ -0,0 +1,30 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
diff --git a/src/python/grpcio/grpc/framework/crust/_calls.py b/src/python/grpcio/grpc/framework/crust/_calls.py
new file mode 100644
index 0000000000..f9077bedfe
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/crust/_calls.py
@@ -0,0 +1,204 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Utility functions for invoking RPCs."""
+
+from grpc.framework.crust import _control
+from grpc.framework.interfaces.base import utilities
+from grpc.framework.interfaces.face import face
+
+_ITERATOR_EXCEPTION_LOG_MESSAGE = 'Exception iterating over requests!'
+
+_EMPTY_COMPLETION = utilities.completion(None, None, None)
+
+
+def _invoke(end, group, method, timeout, initial_metadata, payload, complete):
+ rendezvous = _control.Rendezvous(None, None)
+ operation_context, operator = end.operate(
+ group, method, utilities.full_subscription(rendezvous), timeout,
+ initial_metadata=initial_metadata, payload=payload,
+ completion=_EMPTY_COMPLETION if complete else None)
+ rendezvous.set_operator_and_context(operator, operation_context)
+ outcome = operation_context.add_termination_callback(rendezvous.set_outcome)
+ if outcome is not None:
+ rendezvous.set_outcome(outcome)
+ return rendezvous, operation_context, outcome
+
+
+def _event_return_unary(
+ receiver, abortion_callback, rendezvous, operation_context, outcome, pool):
+ if outcome is None:
+ def in_pool():
+ abortion = rendezvous.add_abortion_callback(abortion_callback)
+ if abortion is None:
+ try:
+ receiver.initial_metadata(rendezvous.initial_metadata())
+ receiver.response(next(rendezvous))
+ receiver.complete(
+ rendezvous.terminal_metadata(), rendezvous.code(),
+ rendezvous.details())
+ except face.AbortionError:
+ pass
+ else:
+ abortion_callback(abortion)
+ pool.submit(_control.pool_wrap(in_pool, operation_context))
+ return rendezvous
+
+
+def _event_return_stream(
+ receiver, abortion_callback, rendezvous, operation_context, outcome, pool):
+ if outcome is None:
+ def in_pool():
+ abortion = rendezvous.add_abortion_callback(abortion_callback)
+ if abortion is None:
+ try:
+ receiver.initial_metadata(rendezvous.initial_metadata())
+ for response in rendezvous:
+ receiver.response(response)
+ receiver.complete(
+ rendezvous.terminal_metadata(), rendezvous.code(),
+ rendezvous.details())
+ except face.AbortionError:
+ pass
+ else:
+ abortion_callback(abortion)
+ pool.submit(_control.pool_wrap(in_pool, operation_context))
+ return rendezvous
+
+
+def blocking_unary_unary(
+ end, group, method, timeout, with_call, initial_metadata, payload):
+ """Services in a blocking fashion a unary-unary servicer method."""
+ rendezvous, unused_operation_context, unused_outcome = _invoke(
+ end, group, method, timeout, initial_metadata, payload, True)
+ if with_call:
+ return next(rendezvous, rendezvous)
+ else:
+ return next(rendezvous)
+
+
+def future_unary_unary(end, group, method, timeout, initial_metadata, payload):
+ """Services a value-in value-out servicer method by returning a Future."""
+ rendezvous, unused_operation_context, unused_outcome = _invoke(
+ end, group, method, timeout, initial_metadata, payload, True)
+ return rendezvous
+
+
+def inline_unary_stream(end, group, method, timeout, initial_metadata, payload):
+ """Services a value-in stream-out servicer method."""
+ rendezvous, unused_operation_context, unused_outcome = _invoke(
+ end, group, method, timeout, initial_metadata, payload, True)
+ return rendezvous
+
+
+def blocking_stream_unary(
+ end, group, method, timeout, with_call, initial_metadata, payload_iterator,
+ pool):
+ """Services in a blocking fashion a stream-in value-out servicer method."""
+ rendezvous, operation_context, outcome = _invoke(
+ end, group, method, timeout, initial_metadata, None, False)
+ if outcome is None:
+ def in_pool():
+ for payload in payload_iterator:
+ rendezvous.consume(payload)
+ rendezvous.terminate()
+ pool.submit(_control.pool_wrap(in_pool, operation_context))
+ if with_call:
+ return next(rendezvous), rendezvous
+ else:
+ return next(rendezvous)
+ else:
+ if with_call:
+ return next(rendezvous), rendezvous
+ else:
+ return next(rendezvous)
+
+
+def future_stream_unary(
+ end, group, method, timeout, initial_metadata, payload_iterator, pool):
+ """Services a stream-in value-out servicer method by returning a Future."""
+ rendezvous, operation_context, outcome = _invoke(
+ end, group, method, timeout, initial_metadata, None, False)
+ if outcome is None:
+ def in_pool():
+ for payload in payload_iterator:
+ rendezvous.consume(payload)
+ rendezvous.terminate()
+ pool.submit(_control.pool_wrap(in_pool, operation_context))
+ return rendezvous
+
+
+def inline_stream_stream(
+ end, group, method, timeout, initial_metadata, payload_iterator, pool):
+ """Services a stream-in stream-out servicer method."""
+ rendezvous, operation_context, outcome = _invoke(
+ end, group, method, timeout, initial_metadata, None, False)
+ if outcome is None:
+ def in_pool():
+ for payload in payload_iterator:
+ rendezvous.consume(payload)
+ rendezvous.terminate()
+ pool.submit(_control.pool_wrap(in_pool, operation_context))
+ return rendezvous
+
+
+def event_unary_unary(
+ end, group, method, timeout, initial_metadata, payload, receiver,
+ abortion_callback, pool):
+ rendezvous, operation_context, outcome = _invoke(
+ end, group, method, timeout, initial_metadata, payload, True)
+ return _event_return_unary(
+ receiver, abortion_callback, rendezvous, operation_context, outcome, pool)
+
+
+def event_unary_stream(
+ end, group, method, timeout, initial_metadata, payload,
+ receiver, abortion_callback, pool):
+ rendezvous, operation_context, outcome = _invoke(
+ end, group, method, timeout, initial_metadata, payload, True)
+ return _event_return_stream(
+ receiver, abortion_callback, rendezvous, operation_context, outcome, pool)
+
+
+def event_stream_unary(
+ end, group, method, timeout, initial_metadata, receiver, abortion_callback,
+ pool):
+ rendezvous, operation_context, outcome = _invoke(
+ end, group, method, timeout, initial_metadata, None, False)
+ return _event_return_unary(
+ receiver, abortion_callback, rendezvous, operation_context, outcome, pool)
+
+
+def event_stream_stream(
+ end, group, method, timeout, initial_metadata, receiver, abortion_callback,
+ pool):
+ rendezvous, operation_context, outcome = _invoke(
+ end, group, method, timeout, initial_metadata, None, False)
+ return _event_return_stream(
+ receiver, abortion_callback, rendezvous, operation_context, outcome, pool)
diff --git a/src/python/grpcio/grpc/framework/crust/_control.py b/src/python/grpcio/grpc/framework/crust/_control.py
new file mode 100644
index 0000000000..01de3c15bd
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/crust/_control.py
@@ -0,0 +1,545 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""State and behavior for translating between sync and async control flow."""
+
+import collections
+import enum
+import sys
+import threading
+import time
+
+from grpc.framework.foundation import abandonment
+from grpc.framework.foundation import callable_util
+from grpc.framework.foundation import future
+from grpc.framework.foundation import stream
+from grpc.framework.interfaces.base import base
+from grpc.framework.interfaces.base import utilities
+from grpc.framework.interfaces.face import face
+
+_DONE_CALLBACK_LOG_MESSAGE = 'Exception calling Future "done" callback!'
+_INTERNAL_ERROR_LOG_MESSAGE = ':-( RPC Framework (Crust) Internal Error! )-:'
+
+_CANNOT_SET_INITIAL_METADATA = (
+ 'Could not set initial metadata - has it already been set, or has a ' +
+ 'payload already been sent?')
+_CANNOT_SET_TERMINAL_METADATA = (
+ 'Could not set terminal metadata - has it already been set, or has RPC ' +
+ 'completion already been indicated?')
+_CANNOT_SET_CODE = (
+ 'Could not set code - has it already been set, or has RPC completion ' +
+ 'already been indicated?')
+_CANNOT_SET_DETAILS = (
+ 'Could not set details - has it already been set, or has RPC completion ' +
+ 'already been indicated?')
+
+
+class _DummyOperator(base.Operator):
+
+ def advance(
+ self, initial_metadata=None, payload=None, completion=None,
+ allowance=None):
+ pass
+
+_DUMMY_OPERATOR = _DummyOperator()
+
+
+class _Awaited(
+ collections.namedtuple('_Awaited', ('kind', 'value',))):
+
+ @enum.unique
+ class Kind(enum.Enum):
+ NOT_YET_ARRIVED = 'not yet arrived'
+ ARRIVED = 'arrived'
+
+_NOT_YET_ARRIVED = _Awaited(_Awaited.Kind.NOT_YET_ARRIVED, None)
+_ARRIVED_AND_NONE = _Awaited(_Awaited.Kind.ARRIVED, None)
+
+
+class _Transitory(
+ collections.namedtuple('_Transitory', ('kind', 'value',))):
+
+ @enum.unique
+ class Kind(enum.Enum):
+ NOT_YET_SEEN = 'not yet seen'
+ PRESENT = 'present'
+ GONE = 'gone'
+
+_NOT_YET_SEEN = _Transitory(_Transitory.Kind.NOT_YET_SEEN, None)
+_GONE = _Transitory(_Transitory.Kind.GONE, None)
+
+
+class _Termination(
+ collections.namedtuple(
+ '_Termination', ('terminated', 'abortion', 'abortion_error',))):
+ """Values indicating whether and how an RPC has terminated.
+
+ Attributes:
+ terminated: A boolean indicating whether or not the RPC has terminated.
+ abortion: A face.Abortion value describing the RPC's abortion or None if the
+ RPC did not abort.
+ abortion_error: A face.AbortionError describing the RPC's abortion or None
+ if the RPC did not abort.
+ """
+
+_NOT_TERMINATED = _Termination(False, None, None)
+
+_OPERATION_OUTCOME_TO_TERMINATION_CONSTRUCTOR = {
+ base.Outcome.COMPLETED: lambda *unused_args: _Termination(True, None, None),
+ base.Outcome.CANCELLED: lambda *args: _Termination(
+ True, face.Abortion(face.Abortion.Kind.CANCELLED, *args),
+ face.CancellationError(*args)),
+ base.Outcome.EXPIRED: lambda *args: _Termination(
+ True, face.Abortion(face.Abortion.Kind.EXPIRED, *args),
+ face.ExpirationError(*args)),
+ base.Outcome.LOCAL_SHUTDOWN: lambda *args: _Termination(
+ True, face.Abortion(face.Abortion.Kind.LOCAL_SHUTDOWN, *args),
+ face.LocalShutdownError(*args)),
+ base.Outcome.REMOTE_SHUTDOWN: lambda *args: _Termination(
+ True, face.Abortion(face.Abortion.Kind.REMOTE_SHUTDOWN, *args),
+ face.RemoteShutdownError(*args)),
+ base.Outcome.RECEPTION_FAILURE: lambda *args: _Termination(
+ True, face.Abortion(face.Abortion.Kind.NETWORK_FAILURE, *args),
+ face.NetworkError(*args)),
+ base.Outcome.TRANSMISSION_FAILURE: lambda *args: _Termination(
+ True, face.Abortion(face.Abortion.Kind.NETWORK_FAILURE, *args),
+ face.NetworkError(*args)),
+ base.Outcome.LOCAL_FAILURE: lambda *args: _Termination(
+ True, face.Abortion(face.Abortion.Kind.LOCAL_FAILURE, *args),
+ face.LocalError(*args)),
+ base.Outcome.REMOTE_FAILURE: lambda *args: _Termination(
+ True, face.Abortion(face.Abortion.Kind.REMOTE_FAILURE, *args),
+ face.RemoteError(*args)),
+}
+
+
+def _wait_once_until(condition, until):
+ if until is None:
+ condition.wait()
+ else:
+ remaining = until - time.time()
+ if remaining < 0:
+ raise future.TimeoutError()
+ else:
+ condition.wait(timeout=remaining)
+
+
+def _done_callback_as_operation_termination_callback(
+ done_callback, rendezvous):
+ def operation_termination_callback(operation_outcome):
+ rendezvous.set_outcome(operation_outcome)
+ done_callback(rendezvous)
+ return operation_termination_callback
+
+
+def _abortion_callback_as_operation_termination_callback(
+ rpc_abortion_callback, rendezvous_set_outcome):
+ def operation_termination_callback(operation_outcome):
+ termination = rendezvous_set_outcome(operation_outcome)
+ if termination.abortion is not None:
+ rpc_abortion_callback(termination.abortion)
+ return operation_termination_callback
+
+
+class Rendezvous(base.Operator, future.Future, stream.Consumer, face.Call):
+ """A rendez-vous for the threads of an operation.
+
+ Instances of this object present iterator and stream.Consumer interfaces for
+ interacting with application code and present a base.Operator interface and
+ maintain a base.Operator internally for interacting with base interface code.
+ """
+
+ def __init__(self, operator, operation_context):
+ self._condition = threading.Condition()
+
+ self._operator = operator
+ self._operation_context = operation_context
+
+ self._up_initial_metadata = _NOT_YET_ARRIVED
+ self._up_payload = None
+ self._up_allowance = 1
+ self._up_completion = _NOT_YET_ARRIVED
+ self._down_initial_metadata = _NOT_YET_SEEN
+ self._down_payload = None
+ self._down_allowance = 1
+ self._down_terminal_metadata = _NOT_YET_SEEN
+ self._down_code = _NOT_YET_SEEN
+ self._down_details = _NOT_YET_SEEN
+
+ self._termination = _NOT_TERMINATED
+
+ # The semantics of future.Future.cancel and future.Future.cancelled are
+ # slightly wonky, so they have to be tracked separately from the rest of the
+ # result of the RPC. This field tracks whether cancellation was requested
+ # prior to termination of the RPC
+ self._cancelled = False
+
+ def set_operator_and_context(self, operator, operation_context):
+ with self._condition:
+ self._operator = operator
+ self._operation_context = operation_context
+
+ def _down_completion(self):
+ if self._down_terminal_metadata.kind is _Transitory.Kind.NOT_YET_SEEN:
+ terminal_metadata = None
+ self._down_terminal_metadata = _GONE
+ elif self._down_terminal_metadata.kind is _Transitory.Kind.PRESENT:
+ terminal_metadata = self._down_terminal_metadata.value
+ self._down_terminal_metadata = _GONE
+ else:
+ terminal_metadata = None
+ if self._down_code.kind is _Transitory.Kind.NOT_YET_SEEN:
+ code = None
+ self._down_code = _GONE
+ elif self._down_code.kind is _Transitory.Kind.PRESENT:
+ code = self._down_code.value
+ self._down_code = _GONE
+ else:
+ code = None
+ if self._down_details.kind is _Transitory.Kind.NOT_YET_SEEN:
+ details = None
+ self._down_details = _GONE
+ elif self._down_details.kind is _Transitory.Kind.PRESENT:
+ details = self._down_details.value
+ self._down_details = _GONE
+ else:
+ details = None
+ return utilities.completion(terminal_metadata, code, details)
+
+ def _set_outcome(self, outcome):
+ if not self._termination.terminated:
+ self._operator = _DUMMY_OPERATOR
+ self._operation_context = None
+ self._down_initial_metadata = _GONE
+ self._down_payload = None
+ self._down_terminal_metadata = _GONE
+ self._down_code = _GONE
+ self._down_details = _GONE
+
+ if self._up_initial_metadata.kind is _Awaited.Kind.NOT_YET_ARRIVED:
+ initial_metadata = None
+ else:
+ initial_metadata = self._up_initial_metadata.value
+ if self._up_completion.kind is _Awaited.Kind.NOT_YET_ARRIVED:
+ terminal_metadata, code, details = None, None, None
+ else:
+ terminal_metadata = self._up_completion.value.terminal_metadata
+ code = self._up_completion.value.code
+ details = self._up_completion.value.message
+ self._termination = _OPERATION_OUTCOME_TO_TERMINATION_CONSTRUCTOR[
+ outcome](initial_metadata, terminal_metadata, code, details)
+
+ self._condition.notify_all()
+
+ return self._termination
+
+ def advance(
+ self, initial_metadata=None, payload=None, completion=None,
+ allowance=None):
+ with self._condition:
+ if initial_metadata is not None:
+ self._up_initial_metadata = _Awaited(
+ _Awaited.Kind.ARRIVED, initial_metadata)
+ if payload is not None:
+ if self._up_initial_metadata.kind is _Awaited.Kind.NOT_YET_ARRIVED:
+ self._up_initial_metadata = _ARRIVED_AND_NONE
+ self._up_payload = payload
+ self._up_allowance -= 1
+ if completion is not None:
+ if self._up_initial_metadata.kind is _Awaited.Kind.NOT_YET_ARRIVED:
+ self._up_initial_metadata = _ARRIVED_AND_NONE
+ self._up_completion = _Awaited(
+ _Awaited.Kind.ARRIVED, completion)
+ if allowance is not None:
+ if self._down_payload is not None:
+ self._operator.advance(payload=self._down_payload)
+ self._down_payload = None
+ self._down_allowance += allowance - 1
+ else:
+ self._down_allowance += allowance
+ self._condition.notify_all()
+
+ def cancel(self):
+ with self._condition:
+ if self._operation_context is not None:
+ self._operation_context.cancel()
+ self._cancelled = True
+ return False
+
+ def cancelled(self):
+ with self._condition:
+ return self._cancelled
+
+ def running(self):
+ with self._condition:
+ return not self._termination.terminated
+
+ def done(self):
+ with self._condition:
+ return self._termination.terminated
+
+ def result(self, timeout=None):
+ until = None if timeout is None else time.time() + timeout
+ with self._condition:
+ while True:
+ if self._termination.terminated:
+ if self._termination.abortion is None:
+ return self._up_payload
+ elif self._termination.abortion.kind is face.Abortion.Kind.CANCELLED:
+ raise future.CancelledError()
+ else:
+ raise self._termination.abortion_error # pylint: disable=raising-bad-type
+ else:
+ _wait_once_until(self._condition, until)
+
+ def exception(self, timeout=None):
+ until = None if timeout is None else time.time() + timeout
+ with self._condition:
+ while True:
+ if self._termination.terminated:
+ if self._termination.abortion is None:
+ return None
+ else:
+ return self._termination.abortion_error
+ else:
+ _wait_once_until(self._condition, until)
+
+ def traceback(self, timeout=None):
+ until = None if timeout is None else time.time() + timeout
+ with self._condition:
+ while True:
+ if self._termination.terminated:
+ if self._termination.abortion_error is None:
+ return None
+ else:
+ abortion_error = self._termination.abortion_error
+ break
+ else:
+ _wait_once_until(self._condition, until)
+
+ try:
+ raise abortion_error
+ except face.AbortionError:
+ return sys.exc_info()[2]
+
+ def add_done_callback(self, fn):
+ with self._condition:
+ if self._operation_context is not None:
+ outcome = self._operation_context.add_termination_callback(
+ _done_callback_as_operation_termination_callback(fn, self))
+ if outcome is None:
+ return
+ else:
+ self._set_outcome(outcome)
+
+ fn(self)
+
+ def consume(self, value):
+ with self._condition:
+ while True:
+ if self._termination.terminated:
+ return
+ elif 0 < self._down_allowance:
+ self._operator.advance(payload=value)
+ self._down_allowance -= 1
+ return
+ else:
+ self._condition.wait()
+
+ def terminate(self):
+ with self._condition:
+ if self._termination.terminated:
+ return
+ elif self._down_code.kind is _Transitory.Kind.GONE:
+ # Conform to specified idempotence of terminate by ignoring extra calls.
+ return
+ else:
+ completion = self._down_completion()
+ self._operator.advance(completion=completion)
+
+ def consume_and_terminate(self, value):
+ with self._condition:
+ while True:
+ if self._termination.terminated:
+ return
+ elif 0 < self._down_allowance:
+ completion = self._down_completion()
+ self._operator.advance(payload=value, completion=completion)
+ return
+ else:
+ self._condition.wait()
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ with self._condition:
+ while True:
+ if self._termination.abortion_error is not None:
+ raise self._termination.abortion_error
+ elif self._up_payload is not None:
+ payload = self._up_payload
+ self._up_payload = None
+ if self._up_completion.kind is _Awaited.Kind.NOT_YET_ARRIVED:
+ self._operator.advance(allowance=1)
+ return payload
+ elif self._up_completion.kind is _Awaited.Kind.ARRIVED:
+ raise StopIteration()
+ else:
+ self._condition.wait()
+
+ def is_active(self):
+ with self._condition:
+ return not self._termination.terminated
+
+ def time_remaining(self):
+ if self._operation_context is None:
+ return 0
+ else:
+ return self._operation_context.time_remaining()
+
+ def add_abortion_callback(self, abortion_callback):
+ with self._condition:
+ if self._operation_context is None:
+ return self._termination.abortion
+ else:
+ outcome = self._operation_context.add_termination_callback(
+ _abortion_callback_as_operation_termination_callback(
+ abortion_callback, self.set_outcome))
+ if outcome is not None:
+ return self._set_outcome(outcome).abortion
+ else:
+ return self._termination.abortion
+
+ def initial_metadata(self):
+ with self._condition:
+ while True:
+ if self._up_initial_metadata.kind is _Awaited.Kind.ARRIVED:
+ return self._up_initial_metadata.value
+ elif self._termination.terminated:
+ return None
+ else:
+ self._condition.wait()
+
+ def terminal_metadata(self):
+ with self._condition:
+ while True:
+ if self._up_completion.kind is _Awaited.Kind.ARRIVED:
+ return self._up_completion.value.terminal_metadata
+ elif self._termination.terminated:
+ return None
+ else:
+ self._condition.wait()
+
+ def code(self):
+ with self._condition:
+ while True:
+ if self._up_completion.kind is _Awaited.Kind.ARRIVED:
+ return self._up_completion.value.code
+ elif self._termination.terminated:
+ return None
+ else:
+ self._condition.wait()
+
+ def details(self):
+ with self._condition:
+ while True:
+ if self._up_completion.kind is _Awaited.Kind.ARRIVED:
+ return self._up_completion.value.message
+ elif self._termination.terminated:
+ return None
+ else:
+ self._condition.wait()
+
+ def set_initial_metadata(self, initial_metadata):
+ with self._condition:
+ if (self._down_initial_metadata.kind is not
+ _Transitory.Kind.NOT_YET_SEEN):
+ raise ValueError(_CANNOT_SET_INITIAL_METADATA)
+ else:
+ self._down_initial_metadata = _GONE
+ self._operator.advance(initial_metadata=initial_metadata)
+
+ def set_terminal_metadata(self, terminal_metadata):
+ with self._condition:
+ if (self._down_terminal_metadata.kind is not
+ _Transitory.Kind.NOT_YET_SEEN):
+ raise ValueError(_CANNOT_SET_TERMINAL_METADATA)
+ else:
+ self._down_terminal_metadata = _Transitory(
+ _Transitory.Kind.PRESENT, terminal_metadata)
+
+ def set_code(self, code):
+ with self._condition:
+ if self._down_code.kind is not _Transitory.Kind.NOT_YET_SEEN:
+ raise ValueError(_CANNOT_SET_CODE)
+ else:
+ self._down_code = _Transitory(_Transitory.Kind.PRESENT, code)
+
+ def set_details(self, details):
+ with self._condition:
+ if self._down_details.kind is not _Transitory.Kind.NOT_YET_SEEN:
+ raise ValueError(_CANNOT_SET_DETAILS)
+ else:
+ self._down_details = _Transitory(_Transitory.Kind.PRESENT, details)
+
+ def set_outcome(self, outcome):
+ with self._condition:
+ return self._set_outcome(outcome)
+
+
+def pool_wrap(behavior, operation_context):
+ """Wraps an operation-related behavior so that it may be called in a pool.
+
+ Args:
+ behavior: A callable related to carrying out an operation.
+ operation_context: A base_interfaces.OperationContext for the operation.
+
+ Returns:
+ A callable that when called carries out the behavior of the given callable
+ and handles whatever exceptions it raises appropriately.
+ """
+ def translation(*args):
+ try:
+ behavior(*args)
+ except (
+ abandonment.Abandoned,
+ face.CancellationError,
+ face.ExpirationError,
+ face.LocalShutdownError,
+ face.RemoteShutdownError,
+ face.NetworkError,
+ face.RemoteError,
+ ) as e:
+ if operation_context.outcome() is None:
+ operation_context.fail(e)
+ except Exception as e:
+ operation_context.fail(e)
+ return callable_util.with_exceptions_logged(
+ translation, _INTERNAL_ERROR_LOG_MESSAGE)
diff --git a/src/python/grpcio/grpc/framework/crust/_service.py b/src/python/grpcio/grpc/framework/crust/_service.py
new file mode 100644
index 0000000000..2455a58f59
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/crust/_service.py
@@ -0,0 +1,166 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Behaviors for servicing RPCs."""
+
+from grpc.framework.crust import _control
+from grpc.framework.foundation import abandonment
+from grpc.framework.interfaces.base import utilities
+from grpc.framework.interfaces.face import face
+
+
+class _ServicerContext(face.ServicerContext):
+
+ def __init__(self, rendezvous):
+ self._rendezvous = rendezvous
+
+ def is_active(self):
+ return self._rendezvous.is_active()
+
+ def time_remaining(self):
+ return self._rendezvous.time_remaining()
+
+ def add_abortion_callback(self, abortion_callback):
+ return self._rendezvous.add_abortion_callback(abortion_callback)
+
+ def cancel(self):
+ self._rendezvous.cancel()
+
+ def invocation_metadata(self):
+ return self._rendezvous.initial_metadata()
+
+ def initial_metadata(self, initial_metadata):
+ self._rendezvous.set_initial_metadata(initial_metadata)
+
+ def terminal_metadata(self, terminal_metadata):
+ self._rendezvous.set_terminal_metadata(terminal_metadata)
+
+ def code(self, code):
+ self._rendezvous.set_code(code)
+
+ def details(self, details):
+ self._rendezvous.set_details(details)
+
+
+def _adaptation(pool, in_pool):
+ def adaptation(operator, operation_context):
+ rendezvous = _control.Rendezvous(operator, operation_context)
+ outcome = operation_context.add_termination_callback(rendezvous.set_outcome)
+ if outcome is None:
+ pool.submit(_control.pool_wrap(in_pool, operation_context), rendezvous)
+ return utilities.full_subscription(rendezvous)
+ else:
+ raise abandonment.Abandoned()
+ return adaptation
+
+
+def adapt_inline_unary_unary(method, pool):
+ def in_pool(rendezvous):
+ request = next(rendezvous)
+ response = method(request, _ServicerContext(rendezvous))
+ rendezvous.consume_and_terminate(response)
+ return _adaptation(pool, in_pool)
+
+
+def adapt_inline_unary_stream(method, pool):
+ def in_pool(rendezvous):
+ request = next(rendezvous)
+ response_iterator = method(request, _ServicerContext(rendezvous))
+ for response in response_iterator:
+ rendezvous.consume(response)
+ rendezvous.terminate()
+ return _adaptation(pool, in_pool)
+
+
+def adapt_inline_stream_unary(method, pool):
+ def in_pool(rendezvous):
+ response = method(rendezvous, _ServicerContext(rendezvous))
+ rendezvous.consume_and_terminate(response)
+ return _adaptation(pool, in_pool)
+
+
+def adapt_inline_stream_stream(method, pool):
+ def in_pool(rendezvous):
+ response_iterator = method(rendezvous, _ServicerContext(rendezvous))
+ for response in response_iterator:
+ rendezvous.consume(response)
+ rendezvous.terminate()
+ return _adaptation(pool, in_pool)
+
+
+def adapt_event_unary_unary(method, pool):
+ def in_pool(rendezvous):
+ request = next(rendezvous)
+ method(
+ request, rendezvous.consume_and_terminate, _ServicerContext(rendezvous))
+ return _adaptation(pool, in_pool)
+
+
+def adapt_event_unary_stream(method, pool):
+ def in_pool(rendezvous):
+ request = next(rendezvous)
+ method(request, rendezvous, _ServicerContext(rendezvous))
+ return _adaptation(pool, in_pool)
+
+
+def adapt_event_stream_unary(method, pool):
+ def in_pool(rendezvous):
+ request_consumer = method(
+ rendezvous.consume_and_terminate, _ServicerContext(rendezvous))
+ for request in rendezvous:
+ request_consumer.consume(request)
+ request_consumer.terminate()
+ return _adaptation(pool, in_pool)
+
+
+def adapt_event_stream_stream(method, pool):
+ def in_pool(rendezvous):
+ request_consumer = method(rendezvous, _ServicerContext(rendezvous))
+ for request in rendezvous:
+ request_consumer.consume(request)
+ request_consumer.terminate()
+ return _adaptation(pool, in_pool)
+
+
+def adapt_multi_method(multi_method, pool):
+ def adaptation(group, method, operator, operation_context):
+ rendezvous = _control.Rendezvous(operator, operation_context)
+ outcome = operation_context.add_termination_callback(rendezvous.set_outcome)
+ if outcome is None:
+ def in_pool():
+ request_consumer = multi_method(
+ group, method, rendezvous, _ServicerContext(rendezvous))
+ for request in rendezvous:
+ request_consumer.consume(request)
+ request_consumer.terminate()
+ pool.submit(_control.pool_wrap(in_pool, operation_context), rendezvous)
+ return utilities.full_subscription(rendezvous)
+ else:
+ raise abandonment.Abandoned()
+ return adaptation
diff --git a/src/python/grpcio/grpc/framework/crust/implementations.py b/src/python/grpcio/grpc/framework/crust/implementations.py
new file mode 100644
index 0000000000..12f7e79641
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/crust/implementations.py
@@ -0,0 +1,352 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Entry points into the Crust layer of RPC Framework."""
+
+from grpc.framework.common import cardinality
+from grpc.framework.common import style
+from grpc.framework.crust import _calls
+from grpc.framework.crust import _service
+from grpc.framework.interfaces.base import base
+from grpc.framework.interfaces.face import face
+
+
+class _BaseServicer(base.Servicer):
+
+ def __init__(self, adapted_methods, adapted_multi_method):
+ self._adapted_methods = adapted_methods
+ self._adapted_multi_method = adapted_multi_method
+
+ def service(self, group, method, context, output_operator):
+ adapted_method = self._adapted_methods.get((group, method), None)
+ if adapted_method is not None:
+ return adapted_method(output_operator, context)
+ elif self._adapted_multi_method is not None:
+ try:
+ return self._adapted_multi_method.service(
+ group, method, output_operator, context)
+ except face.NoSuchMethodError:
+ raise base.NoSuchMethodError()
+ else:
+ raise base.NoSuchMethodError()
+
+
+class _UnaryUnaryMultiCallable(face.UnaryUnaryMultiCallable):
+
+ def __init__(self, end, group, method, pool):
+ self._end = end
+ self._group = group
+ self._method = method
+ self._pool = pool
+
+ def __call__(
+ self, request, timeout, metadata=None, with_call=False):
+ return _calls.blocking_unary_unary(
+ self._end, self._group, self._method, timeout, with_call,
+ metadata, request)
+
+ def future(self, request, timeout, metadata=None):
+ return _calls.future_unary_unary(
+ self._end, self._group, self._method, timeout, metadata,
+ request)
+
+ def event(
+ self, request, receiver, abortion_callback, timeout,
+ metadata=None):
+ return _calls.event_unary_unary(
+ self._end, self._group, self._method, timeout, metadata,
+ request, receiver, abortion_callback, self._pool)
+
+
+class _UnaryStreamMultiCallable(face.UnaryStreamMultiCallable):
+
+ def __init__(self, end, group, method, pool):
+ self._end = end
+ self._group = group
+ self._method = method
+ self._pool = pool
+
+ def __call__(self, request, timeout, metadata=None):
+ return _calls.inline_unary_stream(
+ self._end, self._group, self._method, timeout, metadata,
+ request)
+
+ def event(
+ self, request, receiver, abortion_callback, timeout,
+ metadata=None):
+ return _calls.event_unary_stream(
+ self._end, self._group, self._method, timeout, metadata,
+ request, receiver, abortion_callback, self._pool)
+
+
+class _StreamUnaryMultiCallable(face.StreamUnaryMultiCallable):
+
+ def __init__(self, end, group, method, pool):
+ self._end = end
+ self._group = group
+ self._method = method
+ self._pool = pool
+
+ def __call__(
+ self, request_iterator, timeout, metadata=None,
+ with_call=False):
+ return _calls.blocking_stream_unary(
+ self._end, self._group, self._method, timeout, with_call,
+ metadata, request_iterator, self._pool)
+
+ def future(self, request_iterator, timeout, metadata=None):
+ return _calls.future_stream_unary(
+ self._end, self._group, self._method, timeout, metadata,
+ request_iterator, self._pool)
+
+ def event(
+ self, receiver, abortion_callback, timeout, metadata=None):
+ return _calls.event_stream_unary(
+ self._end, self._group, self._method, timeout, metadata,
+ receiver, abortion_callback, self._pool)
+
+
+class _StreamStreamMultiCallable(face.StreamStreamMultiCallable):
+
+ def __init__(self, end, group, method, pool):
+ self._end = end
+ self._group = group
+ self._method = method
+ self._pool = pool
+
+ def __call__(self, request_iterator, timeout, metadata=None):
+ return _calls.inline_stream_stream(
+ self._end, self._group, self._method, timeout, metadata,
+ request_iterator, self._pool)
+
+ def event(
+ self, receiver, abortion_callback, timeout, metadata=None):
+ return _calls.event_stream_stream(
+ self._end, self._group, self._method, timeout, metadata,
+ receiver, abortion_callback, self._pool)
+
+
+class _GenericStub(face.GenericStub):
+ """An face.GenericStub implementation."""
+
+ def __init__(self, end, pool):
+ self._end = end
+ self._pool = pool
+
+ def blocking_unary_unary(
+ self, group, method, request, timeout, metadata=None,
+ with_call=None):
+ return _calls.blocking_unary_unary(
+ self._end, group, method, timeout, with_call, metadata,
+ request)
+
+ def future_unary_unary(
+ self, group, method, request, timeout, metadata=None):
+ return _calls.future_unary_unary(
+ self._end, group, method, timeout, metadata, request)
+
+ def inline_unary_stream(
+ self, group, method, request, timeout, metadata=None):
+ return _calls.inline_unary_stream(
+ self._end, group, method, timeout, metadata, request)
+
+ def blocking_stream_unary(
+ self, group, method, request_iterator, timeout, metadata=None,
+ with_call=None):
+ return _calls.blocking_stream_unary(
+ self._end, group, method, timeout, with_call, metadata,
+ request_iterator, self._pool)
+
+ def future_stream_unary(
+ self, group, method, request_iterator, timeout, metadata=None):
+ return _calls.future_stream_unary(
+ self._end, group, method, timeout, metadata,
+ request_iterator, self._pool)
+
+ def inline_stream_stream(
+ self, group, method, request_iterator, timeout, metadata=None):
+ return _calls.inline_stream_stream(
+ self._end, group, method, timeout, metadata,
+ request_iterator, self._pool)
+
+ def event_unary_unary(
+ self, group, method, request, receiver, abortion_callback, timeout,
+ metadata=None):
+ return _calls.event_unary_unary(
+ self._end, group, method, timeout, metadata, request,
+ receiver, abortion_callback, self._pool)
+
+ def event_unary_stream(
+ self, group, method, request, receiver, abortion_callback, timeout,
+ metadata=None):
+ return _calls.event_unary_stream(
+ self._end, group, method, timeout, metadata, request,
+ receiver, abortion_callback, self._pool)
+
+ def event_stream_unary(
+ self, group, method, receiver, abortion_callback, timeout,
+ metadata=None):
+ return _calls.event_stream_unary(
+ self._end, group, method, timeout, metadata, receiver,
+ abortion_callback, self._pool)
+
+ def event_stream_stream(
+ self, group, method, receiver, abortion_callback, timeout,
+ metadata=None):
+ return _calls.event_stream_stream(
+ self._end, group, method, timeout, metadata, receiver,
+ abortion_callback, self._pool)
+
+ def unary_unary(self, group, method):
+ return _UnaryUnaryMultiCallable(self._end, group, method, self._pool)
+
+ def unary_stream(self, group, method):
+ return _UnaryStreamMultiCallable(self._end, group, method, self._pool)
+
+ def stream_unary(self, group, method):
+ return _StreamUnaryMultiCallable(self._end, group, method, self._pool)
+
+ def stream_stream(self, group, method):
+ return _StreamStreamMultiCallable(self._end, group, method, self._pool)
+
+
+class _DynamicStub(face.DynamicStub):
+ """An face.DynamicStub implementation."""
+
+ def __init__(self, end, group, cardinalities, pool):
+ self._end = end
+ self._group = group
+ self._cardinalities = cardinalities
+ self._pool = pool
+
+ def __getattr__(self, attr):
+ method_cardinality = self._cardinalities.get(attr)
+ if method_cardinality is cardinality.Cardinality.UNARY_UNARY:
+ return _UnaryUnaryMultiCallable(self._end, self._group, attr, self._pool)
+ elif method_cardinality is cardinality.Cardinality.UNARY_STREAM:
+ return _UnaryStreamMultiCallable(self._end, self._group, attr, self._pool)
+ elif method_cardinality is cardinality.Cardinality.STREAM_UNARY:
+ return _StreamUnaryMultiCallable(self._end, self._group, attr, self._pool)
+ elif method_cardinality is cardinality.Cardinality.STREAM_STREAM:
+ return _StreamStreamMultiCallable(
+ self._end, self._group, attr, self._pool)
+ else:
+ raise AttributeError('_DynamicStub object has no attribute "%s"!' % attr)
+
+
+def _adapt_method_implementations(method_implementations, pool):
+ adapted_implementations = {}
+ for name, method_implementation in method_implementations.iteritems():
+ if method_implementation.style is style.Service.INLINE:
+ if method_implementation.cardinality is cardinality.Cardinality.UNARY_UNARY:
+ adapted_implementations[name] = _service.adapt_inline_unary_unary(
+ method_implementation.unary_unary_inline, pool)
+ elif method_implementation.cardinality is cardinality.Cardinality.UNARY_STREAM:
+ adapted_implementations[name] = _service.adapt_inline_unary_stream(
+ method_implementation.unary_stream_inline, pool)
+ elif method_implementation.cardinality is cardinality.Cardinality.STREAM_UNARY:
+ adapted_implementations[name] = _service.adapt_inline_stream_unary(
+ method_implementation.stream_unary_inline, pool)
+ elif method_implementation.cardinality is cardinality.Cardinality.STREAM_STREAM:
+ adapted_implementations[name] = _service.adapt_inline_stream_stream(
+ method_implementation.stream_stream_inline, pool)
+ elif method_implementation.style is style.Service.EVENT:
+ if method_implementation.cardinality is cardinality.Cardinality.UNARY_UNARY:
+ adapted_implementations[name] = _service.adapt_event_unary_unary(
+ method_implementation.unary_unary_event, pool)
+ elif method_implementation.cardinality is cardinality.Cardinality.UNARY_STREAM:
+ adapted_implementations[name] = _service.adapt_event_unary_stream(
+ method_implementation.unary_stream_event, pool)
+ elif method_implementation.cardinality is cardinality.Cardinality.STREAM_UNARY:
+ adapted_implementations[name] = _service.adapt_event_stream_unary(
+ method_implementation.stream_unary_event, pool)
+ elif method_implementation.cardinality is cardinality.Cardinality.STREAM_STREAM:
+ adapted_implementations[name] = _service.adapt_event_stream_stream(
+ method_implementation.stream_stream_event, pool)
+ return adapted_implementations
+
+
+def servicer(method_implementations, multi_method_implementation, pool):
+ """Creates a base.Servicer.
+
+ It is guaranteed that any passed face.MultiMethodImplementation will
+ only be called to service an RPC if there is no
+ face.MethodImplementation for the RPC method in the passed
+ method_implementations dictionary.
+
+ Args:
+ method_implementations: A dictionary from RPC method name to
+ face.MethodImplementation object to be used to service the named
+ RPC method.
+ multi_method_implementation: An face.MultiMethodImplementation to be
+ used to service any RPCs not serviced by the
+ face.MethodImplementations given in the method_implementations
+ dictionary, or None.
+ pool: A thread pool.
+
+ Returns:
+ A base.Servicer that services RPCs via the given implementations.
+ """
+ adapted_implementations = _adapt_method_implementations(
+ method_implementations, pool)
+ adapted_multi_method_implementation = _service.adapt_multi_method(
+ multi_method_implementation, pool)
+ return _BaseServicer(
+ adapted_implementations, adapted_multi_method_implementation)
+
+
+def generic_stub(end, pool):
+ """Creates an face.GenericStub.
+
+ Args:
+ end: A base.End.
+ pool: A futures.ThreadPoolExecutor.
+
+ Returns:
+ A face.GenericStub that performs RPCs via the given base.End.
+ """
+ return _GenericStub(end, pool)
+
+
+def dynamic_stub(end, group, cardinalities, pool):
+ """Creates an face.DynamicStub.
+
+ Args:
+ end: A base.End.
+ group: The group identifier for all RPCs to be made with the created
+ face.DynamicStub.
+ cardinalities: A dict from method identifier to cardinality.Cardinality
+ value identifying the cardinality of every RPC method to be supported by
+ the created face.DynamicStub.
+ pool: A futures.ThreadPoolExecutor.
+
+ Returns:
+ A face.DynamicStub that performs RPCs via the given base.End.
+ """
+ return _DynamicStub(end, group, cardinalities, pool)
diff --git a/src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py b/src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py
index 72b1ae5642..7fa90fe35f 100644
--- a/src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py
+++ b/src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py
@@ -27,7 +27,7 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-"""Tests the RPC Framework Core's implementation of the Base interface."""
+"""Tests Base interface compliance of the core-over-gRPC-links stack."""
import collections
import logging
diff --git a/src/python/grpcio_test/grpc_test/_crust_over_core_over_links_face_interface_test.py b/src/python/grpcio_test/grpc_test/_crust_over_core_over_links_face_interface_test.py
new file mode 100644
index 0000000000..25b99cbbaf
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/_crust_over_core_over_links_face_interface_test.py
@@ -0,0 +1,160 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Tests Face compliance of the crust-over-core-over-gRPC-links stack."""
+
+import collections
+import unittest
+
+from grpc._adapter import _intermediary_low
+from grpc._links import invocation
+from grpc._links import service
+from grpc.framework.core import implementations as core_implementations
+from grpc.framework.crust import implementations as crust_implementations
+from grpc.framework.foundation import logging_pool
+from grpc.framework.interfaces.links import utilities
+from grpc_test import test_common
+from grpc_test.framework.common import test_constants
+from grpc_test.framework.interfaces.face import test_cases
+from grpc_test.framework.interfaces.face import test_interfaces
+from grpc_test.framework.interfaces.links import test_utilities
+
+
+class _SerializationBehaviors(
+ collections.namedtuple(
+ '_SerializationBehaviors',
+ ('request_serializers', 'request_deserializers', 'response_serializers',
+ 'response_deserializers',))):
+ pass
+
+
+def _serialization_behaviors_from_test_methods(test_methods):
+ request_serializers = {}
+ request_deserializers = {}
+ response_serializers = {}
+ response_deserializers = {}
+ for (group, method), test_method in test_methods.iteritems():
+ request_serializers[group, method] = test_method.serialize_request
+ request_deserializers[group, method] = test_method.deserialize_request
+ response_serializers[group, method] = test_method.serialize_response
+ response_deserializers[group, method] = test_method.deserialize_response
+ return _SerializationBehaviors(
+ request_serializers, request_deserializers, response_serializers,
+ response_deserializers)
+
+
+class _Implementation(test_interfaces.Implementation):
+
+ def instantiate(
+ self, methods, method_implementations, multi_method_implementation):
+ pool = logging_pool.pool(test_constants.POOL_SIZE)
+ servicer = crust_implementations.servicer(
+ method_implementations, multi_method_implementation, pool)
+ serialization_behaviors = _serialization_behaviors_from_test_methods(
+ methods)
+ invocation_end_link = core_implementations.invocation_end_link()
+ service_end_link = core_implementations.service_end_link(
+ servicer, test_constants.DEFAULT_TIMEOUT,
+ test_constants.MAXIMUM_TIMEOUT)
+ service_grpc_link = service.service_link(
+ serialization_behaviors.request_deserializers,
+ serialization_behaviors.response_serializers)
+ port = service_grpc_link.add_port(0, None)
+ channel = _intermediary_low.Channel('localhost:%d' % port, None)
+ invocation_grpc_link = invocation.invocation_link(
+ channel, b'localhost',
+ serialization_behaviors.request_serializers,
+ serialization_behaviors.response_deserializers)
+
+ invocation_end_link.join_link(invocation_grpc_link)
+ invocation_grpc_link.join_link(invocation_end_link)
+ service_grpc_link.join_link(service_end_link)
+ service_end_link.join_link(service_grpc_link)
+ service_end_link.start()
+ invocation_end_link.start()
+ invocation_grpc_link.start()
+ service_grpc_link.start()
+
+ generic_stub = crust_implementations.generic_stub(invocation_end_link, pool)
+ # TODO(nathaniel): Add a "groups" attribute to _digest.TestServiceDigest.
+ group = next(iter(methods))[0]
+ # TODO(nathaniel): Add a "cardinalities_by_group" attribute to
+ # _digest.TestServiceDigest.
+ cardinalities = {
+ method: method_object.cardinality()
+ for (group, method), method_object in methods.iteritems()}
+ dynamic_stub = crust_implementations.dynamic_stub(
+ invocation_end_link, group, cardinalities, pool)
+
+ return generic_stub, {group: dynamic_stub}, (
+ invocation_end_link, invocation_grpc_link, service_grpc_link,
+ service_end_link, pool)
+
+ def destantiate(self, memo):
+ (invocation_end_link, invocation_grpc_link, service_grpc_link,
+ service_end_link, pool) = memo
+ invocation_end_link.stop(0).wait()
+ invocation_grpc_link.stop()
+ service_grpc_link.stop_gracefully()
+ service_end_link.stop(0).wait()
+ invocation_end_link.join_link(utilities.NULL_LINK)
+ invocation_grpc_link.join_link(utilities.NULL_LINK)
+ service_grpc_link.join_link(utilities.NULL_LINK)
+ service_end_link.join_link(utilities.NULL_LINK)
+ pool.shutdown(wait=True)
+
+ def invocation_metadata(self):
+ return test_common.INVOCATION_INITIAL_METADATA
+
+ def initial_metadata(self):
+ return test_common.SERVICE_INITIAL_METADATA
+
+ def terminal_metadata(self):
+ return test_common.SERVICE_TERMINAL_METADATA
+
+ def code(self):
+ return _intermediary_low.Code.OK
+
+ def details(self):
+ return test_common.DETAILS
+
+ def metadata_transmitted(self, original_metadata, transmitted_metadata):
+ return original_metadata is None or grpc_test_common.metadata_transmitted(
+ original_metadata, transmitted_metadata)
+
+
+def load_tests(loader, tests, pattern):
+ return unittest.TestSuite(
+ tests=tuple(
+ loader.loadTestsFromTestCase(test_case_class)
+ for test_case_class in test_cases.test_cases(_Implementation())))
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_test/grpc_test/framework/_crust_over_core_face_interface_test.py b/src/python/grpcio_test/grpc_test/framework/_crust_over_core_face_interface_test.py
new file mode 100644
index 0000000000..30bb85f6c3
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/framework/_crust_over_core_face_interface_test.py
@@ -0,0 +1,111 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Tests Face interface compliance of the crust-over-core stack."""
+
+import collections
+import unittest
+
+from grpc.framework.core import implementations as core_implementations
+from grpc.framework.crust import implementations as crust_implementations
+from grpc.framework.foundation import logging_pool
+from grpc.framework.interfaces.links import utilities
+from grpc_test.framework.common import test_constants
+from grpc_test.framework.interfaces.face import test_cases
+from grpc_test.framework.interfaces.face import test_interfaces
+from grpc_test.framework.interfaces.links import test_utilities
+
+
+class _Implementation(test_interfaces.Implementation):
+
+ def instantiate(
+ self, methods, method_implementations, multi_method_implementation):
+ pool = logging_pool.pool(test_constants.POOL_SIZE)
+ servicer = crust_implementations.servicer(
+ method_implementations, multi_method_implementation, pool)
+
+ service_end_link = core_implementations.service_end_link(
+ servicer, test_constants.DEFAULT_TIMEOUT,
+ test_constants.MAXIMUM_TIMEOUT)
+ invocation_end_link = core_implementations.invocation_end_link()
+ invocation_end_link.join_link(service_end_link)
+ service_end_link.join_link(invocation_end_link)
+ service_end_link.start()
+ invocation_end_link.start()
+
+ generic_stub = crust_implementations.generic_stub(invocation_end_link, pool)
+ # TODO(nathaniel): Add a "groups" attribute to _digest.TestServiceDigest.
+ group = next(iter(methods))[0]
+ # TODO(nathaniel): Add a "cardinalities_by_group" attribute to
+ # _digest.TestServiceDigest.
+ cardinalities = {
+ method: method_object.cardinality()
+ for (group, method), method_object in methods.iteritems()}
+ dynamic_stub = crust_implementations.dynamic_stub(
+ invocation_end_link, group, cardinalities, pool)
+
+ return generic_stub, {group: dynamic_stub}, (
+ invocation_end_link, service_end_link, pool)
+
+ def destantiate(self, memo):
+ invocation_end_link, service_end_link, pool = memo
+ invocation_end_link.stop(0).wait()
+ service_end_link.stop(0).wait()
+ invocation_end_link.join_link(utilities.NULL_LINK)
+ service_end_link.join_link(utilities.NULL_LINK)
+ pool.shutdown(wait=True)
+
+ def invocation_metadata(self):
+ return object()
+
+ def initial_metadata(self):
+ return object()
+
+ def terminal_metadata(self):
+ return object()
+
+ def code(self):
+ return object()
+
+ def details(self):
+ return object()
+
+ def metadata_transmitted(self, original_metadata, transmitted_metadata):
+ return original_metadata is transmitted_metadata
+
+
+def load_tests(loader, tests, pattern):
+ return unittest.TestSuite(
+ tests=tuple(
+ loader.loadTestsFromTestCase(test_case_class)
+ for test_case_class in test_cases.test_cases(_Implementation())))
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_3069_test_constant.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_3069_test_constant.py
new file mode 100644
index 0000000000..363d9ce8f1
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_3069_test_constant.py
@@ -0,0 +1,37 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""A test constant working around issue 3069."""
+
+# test_constants is referenced from specification in this module.
+from grpc_test.framework.common import test_constants # pylint: disable=unused-import
+
+# TODO(issue 3069): Replace uses of this constant with
+# test_constants.SHORT_TIMEOUT.
+REALLY_SHORT_TIMEOUT = 0.1
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_blocking_invocation_inline_service.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_blocking_invocation_inline_service.py
index 857ad5cf3e..8804f3f223 100644
--- a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_blocking_invocation_inline_service.py
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_blocking_invocation_inline_service.py
@@ -37,6 +37,7 @@ from grpc.framework.interfaces.face import face
from grpc_test.framework.common import test_constants
from grpc_test.framework.common import test_control
from grpc_test.framework.common import test_coverage
+from grpc_test.framework.interfaces.face import _3069_test_constant
from grpc_test.framework.interfaces.face import _digest
from grpc_test.framework.interfaces.face import _stock_service
from grpc_test.framework.interfaces.face import test_interfaces # pylint: disable=unused-import
@@ -170,7 +171,7 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
with self._control.pause(), self.assertRaises(
face.ExpirationError):
self._invoker.blocking(group, method)(
- request, test_constants.SHORT_TIMEOUT)
+ request, _3069_test_constant.REALLY_SHORT_TIMEOUT)
def testExpiredUnaryRequestStreamResponse(self):
for (group, method), test_messages_sequence in (
@@ -181,7 +182,7 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
with self._control.pause(), self.assertRaises(
face.ExpirationError):
response_iterator = self._invoker.blocking(group, method)(
- request, test_constants.SHORT_TIMEOUT)
+ request, _3069_test_constant.REALLY_SHORT_TIMEOUT)
list(response_iterator)
def testExpiredStreamRequestUnaryResponse(self):
@@ -193,7 +194,7 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
with self._control.pause(), self.assertRaises(
face.ExpirationError):
self._invoker.blocking(group, method)(
- iter(requests), test_constants.SHORT_TIMEOUT)
+ iter(requests), _3069_test_constant.REALLY_SHORT_TIMEOUT)
def testExpiredStreamRequestStreamResponse(self):
for (group, method), test_messages_sequence in (
@@ -204,7 +205,7 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
with self._control.pause(), self.assertRaises(
face.ExpirationError):
response_iterator = self._invoker.blocking(group, method)(
- iter(requests), test_constants.SHORT_TIMEOUT)
+ iter(requests), _3069_test_constant.REALLY_SHORT_TIMEOUT)
list(response_iterator)
def testFailedUnaryRequestUnaryResponse(self):
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_event_invocation_synchronous_event_service.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_event_invocation_synchronous_event_service.py
index ea5cdeaea3..5a78b4bed2 100644
--- a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_event_invocation_synchronous_event_service.py
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_event_invocation_synchronous_event_service.py
@@ -37,6 +37,7 @@ from grpc.framework.interfaces.face import face
from grpc_test.framework.common import test_constants
from grpc_test.framework.common import test_control
from grpc_test.framework.common import test_coverage
+from grpc_test.framework.interfaces.face import _3069_test_constant
from grpc_test.framework.interfaces.face import _digest
from grpc_test.framework.interfaces.face import _receiver
from grpc_test.framework.interfaces.face import _stock_service
@@ -264,7 +265,8 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
with self._control.pause():
self._invoker.event(group, method)(
- request, receiver, receiver.abort, test_constants.SHORT_TIMEOUT)
+ request, receiver, receiver.abort,
+ _3069_test_constant.REALLY_SHORT_TIMEOUT)
receiver.block_until_terminated()
self.assertIs(face.Abortion.Kind.EXPIRED, receiver.abortion().kind)
@@ -278,7 +280,8 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
with self._control.pause():
self._invoker.event(group, method)(
- request, receiver, receiver.abort, test_constants.SHORT_TIMEOUT)
+ request, receiver, receiver.abort,
+ _3069_test_constant.REALLY_SHORT_TIMEOUT)
receiver.block_until_terminated()
self.assertIs(face.Abortion.Kind.EXPIRED, receiver.abortion().kind)
@@ -290,7 +293,7 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
receiver = _receiver.Receiver()
self._invoker.event(group, method)(
- receiver, receiver.abort, test_constants.SHORT_TIMEOUT)
+ receiver, receiver.abort, _3069_test_constant.REALLY_SHORT_TIMEOUT)
receiver.block_until_terminated()
self.assertIs(face.Abortion.Kind.EXPIRED, receiver.abortion().kind)
@@ -303,7 +306,7 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
receiver = _receiver.Receiver()
call_consumer = self._invoker.event(group, method)(
- receiver, receiver.abort, test_constants.SHORT_TIMEOUT)
+ receiver, receiver.abort, _3069_test_constant.REALLY_SHORT_TIMEOUT)
for request in requests:
call_consumer.consume(request)
receiver.block_until_terminated()
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_future_invocation_asynchronous_event_service.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_future_invocation_asynchronous_event_service.py
index a649362cef..d1107e1576 100644
--- a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_future_invocation_asynchronous_event_service.py
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_future_invocation_asynchronous_event_service.py
@@ -40,6 +40,7 @@ from grpc.framework.interfaces.face import face
from grpc_test.framework.common import test_constants
from grpc_test.framework.common import test_control
from grpc_test.framework.common import test_coverage
+from grpc_test.framework.interfaces.face import _3069_test_constant
from grpc_test.framework.interfaces.face import _digest
from grpc_test.framework.interfaces.face import _stock_service
from grpc_test.framework.interfaces.face import test_interfaces # pylint: disable=unused-import
@@ -265,7 +266,7 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
with self._control.pause():
response_future = self._invoker.future(
- group, method)(request, test_constants.SHORT_TIMEOUT)
+ group, method)(request, _3069_test_constant.REALLY_SHORT_TIMEOUT)
self.assertIsInstance(
response_future.exception(), face.ExpirationError)
with self.assertRaises(face.ExpirationError):
@@ -279,7 +280,7 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
with self._control.pause():
response_iterator = self._invoker.future(group, method)(
- request, test_constants.SHORT_TIMEOUT)
+ request, _3069_test_constant.REALLY_SHORT_TIMEOUT)
with self.assertRaises(face.ExpirationError):
list(response_iterator)
@@ -291,7 +292,7 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
with self._control.pause():
response_future = self._invoker.future(group, method)(
- iter(requests), test_constants.SHORT_TIMEOUT)
+ iter(requests), _3069_test_constant.REALLY_SHORT_TIMEOUT)
self.assertIsInstance(
response_future.exception(), face.ExpirationError)
with self.assertRaises(face.ExpirationError):
@@ -305,7 +306,7 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
with self._control.pause():
response_iterator = self._invoker.future(group, method)(
- iter(requests), test_constants.SHORT_TIMEOUT)
+ iter(requests), _3069_test_constant.REALLY_SHORT_TIMEOUT)
with self.assertRaises(face.ExpirationError):
list(response_iterator)
@@ -317,7 +318,7 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
with self._control.fail():
response_future = self._invoker.future(group, method)(
- request, test_constants.SHORT_TIMEOUT)
+ request, _3069_test_constant.REALLY_SHORT_TIMEOUT)
# Because the servicer fails outside of the thread from which the
# servicer-side runtime called into it its failure is
@@ -340,7 +341,7 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
# expiration of the RPC.
with self._control.fail(), self.assertRaises(face.ExpirationError):
response_iterator = self._invoker.future(group, method)(
- request, test_constants.SHORT_TIMEOUT)
+ request, _3069_test_constant.REALLY_SHORT_TIMEOUT)
list(response_iterator)
def testFailedStreamRequestUnaryResponse(self):
@@ -351,7 +352,7 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
with self._control.fail():
response_future = self._invoker.future(group, method)(
- iter(requests), test_constants.SHORT_TIMEOUT)
+ iter(requests), _3069_test_constant.REALLY_SHORT_TIMEOUT)
# Because the servicer fails outside of the thread from which the
# servicer-side runtime called into it its failure is
@@ -374,5 +375,5 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
# expiration of the RPC.
with self._control.fail(), self.assertRaises(face.ExpirationError):
response_iterator = self._invoker.future(group, method)(
- iter(requests), test_constants.SHORT_TIMEOUT)
+ iter(requests), _3069_test_constant.REALLY_SHORT_TIMEOUT)
list(response_iterator)
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_stock_service.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_stock_service.py
index 1dd2ec3633..808e2c4e36 100644
--- a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_stock_service.py
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_stock_service.py
@@ -1,4 +1,4 @@
-B# Copyright 2015, Google Inc.
+# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
diff --git a/tools/run_tests/run_python.sh b/tools/run_tests/run_python.sh
index 858f300800..fe5685f793 100755
--- a/tools/run_tests/run_python.sh
+++ b/tools/run_tests/run_python.sh
@@ -45,6 +45,8 @@ source "python"$PYVER"_virtual_environment"/bin/activate
# py.test (or find another tool or *something*) that's acceptable to the rest of
# the team...
"python"$PYVER -m grpc_test._core_over_links_base_interface_test
+"python"$PYVER -m grpc_test._crust_over_core_over_links_face_interface_test
+"python"$PYVER -m grpc_test.framework._crust_over_core_face_interface_test
"python"$PYVER -m grpc_test.framework.core._base_interface_test
"python"$PYVER $GRPCIO_TEST/setup.py test -a "-n8 --cov=grpc --junitxml=./report.xml --timeout=300"