aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python
diff options
context:
space:
mode:
Diffstat (limited to 'src/python')
-rw-r--r--src/python/README.md8
-rw-r--r--src/python/grpcio/commands.py2
-rw-r--r--src/python/grpcio/grpc/_adapter/_c/types.h2
-rw-r--r--src/python/grpcio/grpc/_adapter/_c/types/call.c15
-rw-r--r--src/python/grpcio/grpc/_adapter/_intermediary_low.py11
-rw-r--r--src/python/grpcio/grpc/_adapter/_low.py3
-rw-r--r--src/python/grpcio/grpc/_adapter/fore.py4
-rw-r--r--src/python/grpcio/grpc/_adapter/rear.py6
-rw-r--r--src/python/grpcio/grpc/_links/_constants.py42
-rw-r--r--src/python/grpcio/grpc/_links/invocation.py65
-rw-r--r--src/python/grpcio/grpc/_links/service.py52
-rw-r--r--src/python/grpcio/grpc/beta/_connectivity_channel.py16
-rw-r--r--src/python/grpcio/grpc/beta/_server.py48
-rw-r--r--src/python/grpcio/grpc/beta/_stub.py6
-rw-r--r--src/python/grpcio/grpc/beta/implementations.py (renamed from src/python/grpcio/grpc/beta/beta.py)142
-rw-r--r--src/python/grpcio/grpc/beta/interfaces.py214
-rw-r--r--src/python/grpcio/grpc/beta/utilities.py21
-rw-r--r--src/python/grpcio/grpc/framework/core/_constants.py17
-rw-r--r--src/python/grpcio/grpc/framework/core/_context.py10
-rw-r--r--src/python/grpcio/grpc/framework/core/_emission.py8
-rw-r--r--src/python/grpcio/grpc/framework/core/_end.py26
-rw-r--r--src/python/grpcio/grpc/framework/core/_expiration.py4
-rw-r--r--src/python/grpcio/grpc/framework/core/_ingestion.py48
-rw-r--r--src/python/grpcio/grpc/framework/core/_interfaces.py69
-rw-r--r--src/python/grpcio/grpc/framework/core/_operation.py34
-rw-r--r--src/python/grpcio/grpc/framework/core/_protocol.py176
-rw-r--r--src/python/grpcio/grpc/framework/core/_reception.py46
-rw-r--r--src/python/grpcio/grpc/framework/core/_termination.py34
-rw-r--r--src/python/grpcio/grpc/framework/core/_transmission.py115
-rw-r--r--src/python/grpcio/grpc/framework/core/_utilities.py8
-rw-r--r--src/python/grpcio/grpc/framework/crust/_calls.py73
-rw-r--r--src/python/grpcio/grpc/framework/crust/_control.py62
-rw-r--r--src/python/grpcio/grpc/framework/crust/_service.py13
-rw-r--r--src/python/grpcio/grpc/framework/crust/implementations.py122
-rw-r--r--src/python/grpcio/grpc/framework/interfaces/base/base.py62
-rw-r--r--src/python/grpcio/grpc/framework/interfaces/base/utilities.py13
-rw-r--r--src/python/grpcio/grpc/framework/interfaces/face/face.py99
-rw-r--r--src/python/grpcio/grpc/framework/interfaces/links/links.py18
-rw-r--r--src/python/grpcio/requirements.txt1
-rw-r--r--src/python/grpcio/setup.py2
-rw-r--r--src/python/grpcio_health_checking/setup.py4
-rw-r--r--src/python/grpcio_test/grpc_protoc_plugin/beta_python_plugin_test.py4
-rw-r--r--src/python/grpcio_test/grpc_test/_adapter/_intermediary_low_test.py8
-rw-r--r--src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py7
-rw-r--r--src/python/grpcio_test/grpc_test/_crust_over_core_over_links_face_interface_test.py3
-rw-r--r--src/python/grpcio_test/grpc_test/_links/_transmission_test.py10
-rw-r--r--src/python/grpcio_test/grpc_test/beta/_beta_features_test.py232
-rw-r--r--src/python/grpcio_test/grpc_test/beta/_connectivity_channel_test.py59
-rw-r--r--src/python/grpcio_test/grpc_test/beta/_face_interface_test.py23
-rw-r--r--src/python/grpcio_test/grpc_test/beta/_not_found_test.py75
-rw-r--r--src/python/grpcio_test/grpc_test/beta/_utilities_test.py6
-rw-r--r--src/python/grpcio_test/grpc_test/beta/test_utilities.py12
-rw-r--r--src/python/grpcio_test/grpc_test/framework/interfaces/base/_control.py20
-rw-r--r--src/python/grpcio_test/grpc_test/framework/interfaces/base/_sequence.py19
-rw-r--r--src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py30
-rw-r--r--src/python/grpcio_test/grpc_test/framework/interfaces/face/_blocking_invocation_inline_service.py8
-rw-r--r--src/python/grpcio_test/requirements.txt5
-rw-r--r--src/python/grpcio_test/setup.py4
58 files changed, 1674 insertions, 572 deletions
diff --git a/src/python/README.md b/src/python/README.md
index a21deb33ef..afe7c731f1 100644
--- a/src/python/README.md
+++ b/src/python/README.md
@@ -4,7 +4,7 @@ The Python facility of gRPC.
Status
-------
-Alpha : Ready for early adopters
+Beta : Core behavior well-used and proven; bugs lurk off the beaten path.
PREREQUISITES
-------------
@@ -16,10 +16,10 @@ INSTALLATION
**Linux (Debian):**
-Add [Debian testing][] to your `sources.list` file. Example:
+Add [Debian jessie-backports][] to your `sources.list` file. Example:
```sh
-echo "deb http://ftp.us.debian.org/debian testing main contrib non-free" | \
+echo "deb http://http.debian.net/debian jessie-backports main" | \
sudo tee -a /etc/apt/sources.list
```
@@ -92,4 +92,4 @@ $ ../../tools/distrib/python/submit.py
[gRPC install script]:https://raw.githubusercontent.com/grpc/homebrew-grpc/master/scripts/install
[Quick Start]:http://www.grpc.io/docs/tutorials/basic/python.html
[detailed example]:http://www.grpc.io/docs/installation/python.html
-[Debian testing]:https://www.debian.org/releases/stretch/
+[Debian jessie-backports]:http://backports.debian.org/Instructions/
diff --git a/src/python/grpcio/commands.py b/src/python/grpcio/commands.py
index 89c0fbf0f3..8a2f2d6283 100644
--- a/src/python/grpcio/commands.py
+++ b/src/python/grpcio/commands.py
@@ -64,7 +64,7 @@ class SphinxDocumentation(setuptools.Command):
import sphinx.apidoc
metadata = self.distribution.metadata
src_dir = os.path.join(
- os.getcwd(), self.distribution.package_dir['grpc'])
+ os.getcwd(), self.distribution.package_dir[''], 'grpc')
sys.path.append(src_dir)
sphinx.apidoc.main([
'', '--force', '--full', '-H', metadata.name, '-A', metadata.author,
diff --git a/src/python/grpcio/grpc/_adapter/_c/types.h b/src/python/grpcio/grpc/_adapter/_c/types.h
index ec0687a9fd..31fd470d36 100644
--- a/src/python/grpcio/grpc/_adapter/_c/types.h
+++ b/src/python/grpcio/grpc/_adapter/_c/types.h
@@ -112,6 +112,8 @@ void pygrpc_Call_dealloc(Call *self);
PyObject *pygrpc_Call_start_batch(Call *self, PyObject *args, PyObject *kwargs);
PyObject *pygrpc_Call_cancel(Call *self, PyObject *args, PyObject *kwargs);
PyObject *pygrpc_Call_peer(Call *self);
+PyObject *pygrpc_Call_set_credentials(Call *self, PyObject *args,
+ PyObject *kwargs);
extern PyTypeObject pygrpc_Call_type;
diff --git a/src/python/grpcio/grpc/_adapter/_c/types/call.c b/src/python/grpcio/grpc/_adapter/_c/types/call.c
index 42a50151f6..5604aba39d 100644
--- a/src/python/grpcio/grpc/_adapter/_c/types/call.c
+++ b/src/python/grpcio/grpc/_adapter/_c/types/call.c
@@ -43,6 +43,8 @@ PyMethodDef pygrpc_Call_methods[] = {
{"start_batch", (PyCFunction)pygrpc_Call_start_batch, METH_KEYWORDS, ""},
{"cancel", (PyCFunction)pygrpc_Call_cancel, METH_KEYWORDS, ""},
{"peer", (PyCFunction)pygrpc_Call_peer, METH_NOARGS, ""},
+ {"set_credentials", (PyCFunction)pygrpc_Call_set_credentials, METH_KEYWORDS,
+ ""},
{NULL}
};
const char pygrpc_Call_doc[] = "See grpc._adapter._types.Call.";
@@ -169,3 +171,16 @@ PyObject *pygrpc_Call_peer(Call *self) {
gpr_free(peer);
return py_peer;
}
+PyObject *pygrpc_Call_set_credentials(Call *self, PyObject *args,
+ PyObject *kwargs) {
+ ClientCredentials *creds;
+ grpc_call_error errcode;
+ static char *keywords[] = {"creds", NULL};
+ if (!PyArg_ParseTupleAndKeywords(
+ args, kwargs, "O!:set_credentials", keywords,
+ &pygrpc_ClientCredentials_type, &creds)) {
+ return NULL;
+ }
+ errcode = grpc_call_set_credentials(self->c_call, creds->c_creds);
+ return PyInt_FromLong(errcode);
+}
diff --git a/src/python/grpcio/grpc/_adapter/_intermediary_low.py b/src/python/grpcio/grpc/_adapter/_intermediary_low.py
index 06358e72bc..e2feec6ffb 100644
--- a/src/python/grpcio/grpc/_adapter/_intermediary_low.py
+++ b/src/python/grpcio/grpc/_adapter/_intermediary_low.py
@@ -59,6 +59,7 @@ from grpc._adapter import _types
_IGNORE_ME_TAG = object()
Code = _types.StatusCode
+WriteFlags = _types.OpWriteFlags
class Status(collections.namedtuple('Status', ['code', 'details'])):
@@ -125,9 +126,9 @@ class Call(object):
], _TagAdapter(finish_tag, Event.Kind.FINISH))
return err0 if err0 != _types.CallError.OK else err1 if err1 != _types.CallError.OK else err2 if err2 != _types.CallError.OK else _types.CallError.OK
- def write(self, message, tag):
+ def write(self, message, tag, flags):
return self._internal.start_batch([
- _types.OpArgs.send_message(message, 0)
+ _types.OpArgs.send_message(message, flags)
], _TagAdapter(tag, Event.Kind.WRITE_ACCEPTED))
def complete(self, tag):
@@ -163,6 +164,12 @@ class Call(object):
def cancel(self):
return self._internal.cancel()
+ def peer(self):
+ return self._internal.peer()
+
+ def set_credentials(self, creds):
+ return self._internal.set_credentials(creds._internal)
+
class Channel(object):
"""Adapter from old _low.Channel interface to new _low.Channel."""
diff --git a/src/python/grpcio/grpc/_adapter/_low.py b/src/python/grpcio/grpc/_adapter/_low.py
index 3859ebb0e2..70ceb2a911 100644
--- a/src/python/grpcio/grpc/_adapter/_low.py
+++ b/src/python/grpcio/grpc/_adapter/_low.py
@@ -78,6 +78,9 @@ class Call(_types.Call):
def peer(self):
return self.call.peer()
+ def set_credentials(self, creds):
+ return self.call.set_credentials(creds)
+
class Channel(_types.Channel):
diff --git a/src/python/grpcio/grpc/_adapter/fore.py b/src/python/grpcio/grpc/_adapter/fore.py
index daa41e8bde..acdd69c420 100644
--- a/src/python/grpcio/grpc/_adapter/fore.py
+++ b/src/python/grpcio/grpc/_adapter/fore.py
@@ -56,7 +56,7 @@ class _LowWrite(enum.Enum):
def _write(call, rpc_state, payload):
serialized_payload = rpc_state.serializer(payload)
if rpc_state.write.low is _LowWrite.OPEN:
- call.write(serialized_payload, call)
+ call.write(serialized_payload, call, 0)
rpc_state.write.low = _LowWrite.ACTIVE
else:
rpc_state.write.pending.append(serialized_payload)
@@ -164,7 +164,7 @@ class ForeLink(base_interfaces.ForeLink, activated.Activated):
if rpc_state.write.pending:
serialized_payload = rpc_state.write.pending.pop(0)
- call.write(serialized_payload, call)
+ call.write(serialized_payload, call, 0)
elif rpc_state.write.high is _common.HighWrite.CLOSED:
_status(call, rpc_state)
else:
diff --git a/src/python/grpcio/grpc/_adapter/rear.py b/src/python/grpcio/grpc/_adapter/rear.py
index fd6f45f7a7..17fa47f746 100644
--- a/src/python/grpcio/grpc/_adapter/rear.py
+++ b/src/python/grpcio/grpc/_adapter/rear.py
@@ -78,7 +78,7 @@ class _RPCState(object):
def _write(operation_id, call, outstanding, write_state, serialized_payload):
if write_state.low is _LowWrite.OPEN:
- call.write(serialized_payload, operation_id)
+ call.write(serialized_payload, operation_id, 0)
outstanding.add(_low.Event.Kind.WRITE_ACCEPTED)
write_state.low = _LowWrite.ACTIVE
elif write_state.low is _LowWrite.ACTIVE:
@@ -144,7 +144,7 @@ class RearLink(base_interfaces.RearLink, activated.Activated):
if event.write_accepted:
if rpc_state.common.write.pending:
rpc_state.call.write(
- rpc_state.common.write.pending.pop(0), operation_id)
+ rpc_state.common.write.pending.pop(0), operation_id, 0)
rpc_state.outstanding.add(_low.Event.Kind.WRITE_ACCEPTED)
elif rpc_state.common.write.high is _common.HighWrite.CLOSED:
rpc_state.call.complete(operation_id)
@@ -263,7 +263,7 @@ class RearLink(base_interfaces.RearLink, activated.Activated):
low_state = _LowWrite.OPEN
else:
serialized_payload = request_serializer(payload)
- call.write(serialized_payload, operation_id)
+ call.write(serialized_payload, operation_id, 0)
outstanding.add(_low.Event.Kind.WRITE_ACCEPTED)
low_state = _LowWrite.ACTIVE
diff --git a/src/python/grpcio/grpc/_links/_constants.py b/src/python/grpcio/grpc/_links/_constants.py
new file mode 100644
index 0000000000..117fc5a639
--- /dev/null
+++ b/src/python/grpcio/grpc/_links/_constants.py
@@ -0,0 +1,42 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Constants for use within this package."""
+
+from grpc._adapter import _intermediary_low
+from grpc.beta import interfaces as beta_interfaces
+
+LOW_STATUS_CODE_TO_HIGH_STATUS_CODE = {
+ low: high for low, high in zip(
+ _intermediary_low.Code, beta_interfaces.StatusCode)
+}
+
+HIGH_STATUS_CODE_TO_LOW_STATUS_CODE = {
+ high: low for low, high in LOW_STATUS_CODE_TO_HIGH_STATUS_CODE.items()
+}
diff --git a/src/python/grpcio/grpc/_links/invocation.py b/src/python/grpcio/grpc/_links/invocation.py
index 1676fe7941..67ef86a176 100644
--- a/src/python/grpcio/grpc/_links/invocation.py
+++ b/src/python/grpcio/grpc/_links/invocation.py
@@ -36,6 +36,8 @@ import threading
import time
from grpc._adapter import _intermediary_low
+from grpc._links import _constants
+from grpc.beta import interfaces as beta_interfaces
from grpc.framework.foundation import activated
from grpc.framework.foundation import logging_pool
from grpc.framework.foundation import relay
@@ -72,11 +74,28 @@ class _LowWrite(enum.Enum):
CLOSED = 'CLOSED'
+class _Context(beta_interfaces.GRPCInvocationContext):
+
+ def __init__(self):
+ self._lock = threading.Lock()
+ self._disable_next_compression = False
+
+ def disable_next_request_compression(self):
+ with self._lock:
+ self._disable_next_compression = True
+
+ def next_compression_disabled(self):
+ with self._lock:
+ disabled = self._disable_next_compression
+ self._disable_next_compression = False
+ return disabled
+
+
class _RPCState(object):
def __init__(
self, call, request_serializer, response_deserializer, sequence_number,
- read, allowance, high_write, low_write, due):
+ read, allowance, high_write, low_write, due, context):
self.call = call
self.request_serializer = request_serializer
self.response_deserializer = response_deserializer
@@ -86,6 +105,7 @@ class _RPCState(object):
self.high_write = high_write
self.low_write = low_write
self.due = due
+ self.context = context
def _no_longer_due(kind, rpc_state, key, rpc_states):
@@ -168,14 +188,17 @@ class _Kernel(object):
termination = links.Ticket.Termination.CANCELLATION
elif event.status.code is _intermediary_low.Code.DEADLINE_EXCEEDED:
termination = links.Ticket.Termination.EXPIRATION
+ elif event.status.code is _intermediary_low.Code.UNIMPLEMENTED:
+ termination = links.Ticket.Termination.REMOTE_FAILURE
elif event.status.code is _intermediary_low.Code.UNKNOWN:
termination = links.Ticket.Termination.LOCAL_FAILURE
else:
termination = links.Ticket.Termination.TRANSMISSION_FAILURE
+ code = _constants.LOW_STATUS_CODE_TO_HIGH_STATUS_CODE[event.status.code]
ticket = links.Ticket(
operation_id, rpc_state.sequence_number, None, None, None, None, None,
- None, None, event.metadata, event.status.code, event.status.details,
- termination, None)
+ None, None, event.metadata, code, event.status.details, termination,
+ None)
rpc_state.sequence_number += 1
self._relay.add_value(ticket)
@@ -205,7 +228,7 @@ class _Kernel(object):
def _invoke(
self, operation_id, group, method, initial_metadata, payload, termination,
- timeout, allowance):
+ timeout, allowance, options):
"""Invoke an RPC.
Args:
@@ -220,6 +243,7 @@ class _Kernel(object):
timeout: A duration of time in seconds to allow for the RPC.
allowance: The number of payloads (beyond the free first one) that the
local ticket exchange mate has granted permission to be read.
+ options: A beta_interfaces.GRPCCallOptions value or None.
"""
if termination is links.Ticket.Termination.COMPLETION:
high_write = _HighWrite.CLOSED
@@ -237,6 +261,8 @@ class _Kernel(object):
call = _intermediary_low.Call(
self._channel, self._completion_queue, '/%s/%s' % (group, method),
self._host, time.time() + timeout)
+ if options is not None and options.credentials is not None:
+ call.set_credentials(options.credentials._intermediary_low_credentials)
if transformed_initial_metadata is not None:
for metadata_key, metadata_value in transformed_initial_metadata:
call.add_metadata(metadata_key, metadata_value)
@@ -250,17 +276,33 @@ class _Kernel(object):
low_write = _LowWrite.OPEN
due = set((_METADATA, _FINISH,))
else:
- call.write(request_serializer(payload), operation_id)
+ if options is not None and options.disable_compression:
+ flags = _intermediary_low.WriteFlags.WRITE_NO_COMPRESS
+ else:
+ flags = 0
+ call.write(request_serializer(payload), operation_id, flags)
low_write = _LowWrite.ACTIVE
due = set((_WRITE, _METADATA, _FINISH,))
+ context = _Context()
self._rpc_states[operation_id] = _RPCState(
- call, request_serializer, response_deserializer, 0,
+ call, request_serializer, response_deserializer, 1,
_Read.AWAITING_METADATA, 1 if allowance is None else (1 + allowance),
- high_write, low_write, due)
+ high_write, low_write, due, context)
+ protocol = links.Protocol(links.Protocol.Kind.INVOCATION_CONTEXT, context)
+ ticket = links.Ticket(
+ operation_id, 0, None, None, None, None, None, None, None, None, None,
+ None, None, protocol)
+ self._relay.add_value(ticket)
def _advance(self, operation_id, rpc_state, payload, termination, allowance):
if payload is not None:
- rpc_state.call.write(rpc_state.request_serializer(payload), operation_id)
+ disable_compression = rpc_state.context.next_compression_disabled()
+ if disable_compression:
+ flags = _intermediary_low.WriteFlags.WRITE_NO_COMPRESS
+ else:
+ flags = 0
+ rpc_state.call.write(
+ rpc_state.request_serializer(payload), operation_id, flags)
rpc_state.low_write = _LowWrite.ACTIVE
rpc_state.due.add(_WRITE)
@@ -288,10 +330,15 @@ class _Kernel(object):
if self._completion_queue is None:
logging.error('Received invocation ticket %s after stop!', ticket)
else:
+ if (ticket.protocol is not None and
+ ticket.protocol.kind is links.Protocol.Kind.CALL_OPTION):
+ grpc_call_options = ticket.protocol.value
+ else:
+ grpc_call_options = None
self._invoke(
ticket.operation_id, ticket.group, ticket.method,
ticket.initial_metadata, ticket.payload, ticket.termination,
- ticket.timeout, ticket.allowance)
+ ticket.timeout, ticket.allowance, grpc_call_options)
else:
rpc_state = self._rpc_states.get(ticket.operation_id)
if rpc_state is not None:
diff --git a/src/python/grpcio/grpc/_links/service.py b/src/python/grpcio/grpc/_links/service.py
index 94e7cfc716..f56df84007 100644
--- a/src/python/grpcio/grpc/_links/service.py
+++ b/src/python/grpcio/grpc/_links/service.py
@@ -36,6 +36,8 @@ import threading
import time
from grpc._adapter import _intermediary_low
+from grpc._links import _constants
+from grpc.beta import interfaces as beta_interfaces
from grpc.framework.foundation import logging_pool
from grpc.framework.foundation import relay
from grpc.framework.interfaces.links import links
@@ -88,12 +90,34 @@ class _LowWrite(enum.Enum):
CLOSED = 'CLOSED'
+class _Context(beta_interfaces.GRPCServicerContext):
+
+ def __init__(self, call):
+ self._lock = threading.Lock()
+ self._call = call
+ self._disable_next_compression = False
+
+ def peer(self):
+ with self._lock:
+ return self._call.peer()
+
+ def disable_next_response_compression(self):
+ with self._lock:
+ self._disable_next_compression = True
+
+ def next_compression_disabled(self):
+ with self._lock:
+ disabled = self._disable_next_compression
+ self._disable_next_compression = False
+ return disabled
+
+
class _RPCState(object):
def __init__(
self, request_deserializer, response_serializer, sequence_number, read,
early_read, allowance, high_write, low_write, premetadataed,
- terminal_metadata, code, message, due):
+ terminal_metadata, code, message, due, context):
self.request_deserializer = request_deserializer
self.response_serializer = response_serializer
self.sequence_number = sequence_number
@@ -109,6 +133,7 @@ class _RPCState(object):
self.code = code
self.message = message
self.due = due
+ self.context = context
def _no_longer_due(kind, rpc_state, key, rpc_states):
@@ -122,13 +147,13 @@ def _metadatafy(call, metadata):
call.add_metadata(metadata_key, metadata_value)
-def _status(termination_kind, code, details):
- effective_details = b'' if details is None else details
- if code is None:
- effective_code = _TERMINATION_KIND_TO_CODE[termination_kind]
+def _status(termination_kind, high_code, details):
+ low_details = b'' if details is None else details
+ if high_code is None:
+ low_code = _TERMINATION_KIND_TO_CODE[termination_kind]
else:
- effective_code = code
- return _intermediary_low.Status(effective_code, effective_details)
+ low_code = _constants.HIGH_STATUS_CODE_TO_LOW_STATUS_CODE[high_code]
+ return _intermediary_low.Status(low_code, low_details)
class _Kernel(object):
@@ -162,14 +187,16 @@ class _Kernel(object):
(group, method), _IDENTITY)
call.read(call)
+ context = _Context(call)
self._rpc_states[call] = _RPCState(
request_deserializer, response_serializer, 1, _Read.READING, None, 1,
_HighWrite.OPEN, _LowWrite.OPEN, False, None, None, None,
- set((_READ, _FINISH,)))
+ set((_READ, _FINISH,)), context)
+ protocol = links.Protocol(links.Protocol.Kind.SERVICER_CONTEXT, context)
ticket = links.Ticket(
call, 0, group, method, links.Ticket.Subscription.FULL,
service_acceptance.deadline - time.time(), None, event.metadata, None,
- None, None, None, None, 'TODO: Service Context Object!')
+ None, None, None, None, protocol)
self._relay.add_value(ticket)
def _on_read_event(self, event):
@@ -310,7 +337,12 @@ class _Kernel(object):
self._relay.add_value(early_read_ticket)
if ticket.payload is not None:
- call.write(rpc_state.response_serializer(ticket.payload), call)
+ disable_compression = rpc_state.context.next_compression_disabled()
+ if disable_compression:
+ flags = _intermediary_low.WriteFlags.WRITE_NO_COMPRESS
+ else:
+ flags = 0
+ call.write(rpc_state.response_serializer(ticket.payload), call, flags)
rpc_state.due.add(_WRITE)
rpc_state.low_write = _LowWrite.ACTIVE
diff --git a/src/python/grpcio/grpc/beta/_connectivity_channel.py b/src/python/grpcio/grpc/beta/_connectivity_channel.py
index 457ede79f2..61674a70ad 100644
--- a/src/python/grpcio/grpc/beta/_connectivity_channel.py
+++ b/src/python/grpcio/grpc/beta/_connectivity_channel.py
@@ -33,18 +33,24 @@ import threading
import time
from grpc._adapter import _low
+from grpc._adapter import _types
+from grpc.beta import interfaces
from grpc.framework.foundation import callable_util
_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = (
'Exception calling channel subscription callback!')
+_LOW_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY = {
+ state: connectivity for state, connectivity in zip(
+ _types.ConnectivityState, interfaces.ChannelConnectivity)
+}
+
class ConnectivityChannel(object):
- def __init__(self, low_channel, mapping):
+ def __init__(self, low_channel):
self._lock = threading.Lock()
self._low_channel = low_channel
- self._mapping = mapping
self._polling = False
self._connectivity = None
@@ -88,7 +94,8 @@ class ConnectivityChannel(object):
try_to_connect = initial_try_to_connect
low_connectivity = low_channel.check_connectivity_state(try_to_connect)
with self._lock:
- self._connectivity = self._mapping[low_connectivity]
+ self._connectivity = _LOW_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
+ low_connectivity]
callbacks = tuple(
callback for callback, unused_but_known_to_be_none_connectivity
in self._callbacks_and_connectivities)
@@ -112,7 +119,8 @@ class ConnectivityChannel(object):
if event.success or try_to_connect:
low_connectivity = low_channel.check_connectivity_state(try_to_connect)
with self._lock:
- self._connectivity = self._mapping[low_connectivity]
+ self._connectivity = _LOW_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
+ low_connectivity]
if not self._delivering:
callbacks = self._deliveries(self._connectivity)
if callbacks:
diff --git a/src/python/grpcio/grpc/beta/_server.py b/src/python/grpcio/grpc/beta/_server.py
index 4e46ffd17f..05b954d186 100644
--- a/src/python/grpcio/grpc/beta/_server.py
+++ b/src/python/grpcio/grpc/beta/_server.py
@@ -32,9 +32,11 @@
import threading
from grpc._links import service
+from grpc.beta import interfaces
from grpc.framework.core import implementations as _core_implementations
from grpc.framework.crust import implementations as _crust_implementations
from grpc.framework.foundation import logging_pool
+from grpc.framework.interfaces.base import base
from grpc.framework.interfaces.links import utilities
_DEFAULT_POOL_SIZE = 8
@@ -42,6 +44,23 @@ _DEFAULT_TIMEOUT = 300
_MAXIMUM_TIMEOUT = 24 * 60 * 60
+class _GRPCServicer(base.Servicer):
+
+ def __init__(self, delegate):
+ self._delegate = delegate
+
+ def service(self, group, method, context, output_operator):
+ try:
+ return self._delegate.service(group, method, context, output_operator)
+ except base.NoSuchMethodError as e:
+ if e.code is None and e.details is None:
+ raise base.NoSuchMethodError(
+ interfaces.StatusCode.UNIMPLEMENTED,
+ b'Method "%s" of service "%s" not implemented!' % (method, group))
+ else:
+ raise
+
+
def _disassemble(grpc_link, end_link, pool, event, grace):
grpc_link.begin_stop()
end_link.stop(grace).wait()
@@ -53,7 +72,7 @@ def _disassemble(grpc_link, end_link, pool, event, grace):
event.set()
-class Server(object):
+class Server(interfaces.Server):
def __init__(self, grpc_link, end_link, pool):
self._grpc_link = grpc_link
@@ -63,17 +82,17 @@ class Server(object):
def add_insecure_port(self, address):
return self._grpc_link.add_port(address, None)
- def add_secure_port(self, address, intermediary_low_server_credentials):
+ def add_secure_port(self, address, server_credentials):
return self._grpc_link.add_port(
- address, intermediary_low_server_credentials)
+ address, server_credentials._intermediary_low_credentials) # pylint: disable=protected-access
- def start(self):
+ def _start(self):
self._grpc_link.join_link(self._end_link)
self._end_link.join_link(self._grpc_link)
self._grpc_link.start()
self._end_link.start()
- def stop(self, grace):
+ def _stop(self, grace):
stop_event = threading.Event()
if 0 < grace:
disassembly_thread = threading.Thread(
@@ -86,6 +105,20 @@ class Server(object):
_disassemble(self._grpc_link, self._end_link, self._pool, stop_event, 0)
return stop_event
+ def start(self):
+ self._start()
+
+ def stop(self, grace):
+ return self._stop(grace)
+
+ def __enter__(self):
+ self._start()
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self._stop(0).wait()
+ return False
+
def server(
implementations, multi_implementation, request_deserializers,
@@ -99,8 +132,9 @@ def server(
service_thread_pool = thread_pool
assembly_thread_pool = None
- servicer = _crust_implementations.servicer(
- implementations, multi_implementation, service_thread_pool)
+ servicer = _GRPCServicer(
+ _crust_implementations.servicer(
+ implementations, multi_implementation, service_thread_pool))
grpc_link = service.service_link(request_deserializers, response_serializers)
diff --git a/src/python/grpcio/grpc/beta/_stub.py b/src/python/grpcio/grpc/beta/_stub.py
index cfbecb852b..11dab889cd 100644
--- a/src/python/grpcio/grpc/beta/_stub.py
+++ b/src/python/grpcio/grpc/beta/_stub.py
@@ -49,6 +49,12 @@ class _AutoIntermediary(object):
def __getattr__(self, attr):
return getattr(self._delegate, attr)
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ return False
+
def __del__(self):
self._on_deletion()
diff --git a/src/python/grpcio/grpc/beta/beta.py b/src/python/grpcio/grpc/beta/implementations.py
index b3a161087f..9b461fb3dd 100644
--- a/src/python/grpcio/grpc/beta/beta.py
+++ b/src/python/grpcio/grpc/beta/implementations.py
@@ -40,6 +40,7 @@ from grpc._adapter import _types
from grpc.beta import _connectivity_channel
from grpc.beta import _server
from grpc.beta import _stub
+from grpc.beta import interfaces
from grpc.framework.common import cardinality # pylint: disable=unused-import
from grpc.framework.interfaces.face import face # pylint: disable=unused-import
@@ -47,32 +48,6 @@ _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = (
'Exception calling channel subscription callback!')
-@enum.unique
-class ChannelConnectivity(enum.Enum):
- """Mirrors grpc_connectivity_state in the gRPC Core.
-
- Attributes:
- IDLE: The channel is idle.
- CONNECTING: The channel is connecting.
- READY: The channel is ready to conduct RPCs.
- TRANSIENT_FAILURE: The channel has seen a failure from which it expects to
- recover.
- FATAL_FAILURE: The channel has seen a failure from which it cannot recover.
- """
-
- IDLE = (_types.ConnectivityState.IDLE, 'idle',)
- CONNECTING = (_types.ConnectivityState.CONNECTING, 'connecting',)
- READY = (_types.ConnectivityState.READY, 'ready',)
- TRANSIENT_FAILURE = (
- _types.ConnectivityState.TRANSIENT_FAILURE, 'transient failure',)
- FATAL_FAILURE = (_types.ConnectivityState.FATAL_FAILURE, 'fatal failure',)
-
-_LOW_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY = {
- state: connectivity for state, connectivity in zip(
- _types.ConnectivityState, ChannelConnectivity)
-}
-
-
class ClientCredentials(object):
"""A value encapsulating the data required to create a secure Channel.
@@ -118,13 +93,14 @@ class Channel(object):
self._low_channel = low_channel
self._intermediary_low_channel = intermediary_low_channel
self._connectivity_channel = _connectivity_channel.ConnectivityChannel(
- low_channel, _LOW_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY)
+ low_channel)
def subscribe(self, callback, try_to_connect=None):
"""Subscribes to this Channel's connectivity.
Args:
- callback: A callable to be invoked and passed this Channel's connectivity.
+ callback: A callable to be invoked and passed an
+ interfaces.ChannelConnectivity identifying this Channel's connectivity.
The callable will be invoked immediately upon subscription and again for
every change to this Channel's connectivity thereafter until it is
unsubscribed.
@@ -144,7 +120,7 @@ class Channel(object):
self._connectivity_channel.unsubscribe(callback)
-def create_insecure_channel(host, port):
+def insecure_channel(host, port):
"""Creates an insecure Channel to a remote host.
Args:
@@ -159,7 +135,7 @@ def create_insecure_channel(host, port):
return Channel(intermediary_low_channel._internal, intermediary_low_channel) # pylint: disable=protected-access
-def create_secure_channel(host, port, client_credentials):
+def secure_channel(host, port, client_credentials):
"""Creates a secure Channel to a remote host.
Args:
@@ -313,86 +289,6 @@ def ssl_server_credentials(
intermediary_low_credentials._internal, intermediary_low_credentials) # pylint: disable=protected-access
-class Server(object):
- """Services RPCs."""
- __metaclass__ = abc.ABCMeta
-
- @abc.abstractmethod
- def add_insecure_port(self, address):
- """Reserves a port for insecure RPC service once this Server becomes active.
-
- This method may only be called before calling this Server's start method is
- called.
-
- Args:
- address: The address for which to open a port.
-
- Returns:
- An integer port on which RPCs will be serviced after this link has been
- started. This is typically the same number as the port number contained
- in the passed address, but will likely be different if the port number
- contained in the passed address was zero.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def add_secure_port(self, address, server_credentials):
- """Reserves a port for secure RPC service after this Server becomes active.
-
- This method may only be called before calling this Server's start method is
- called.
-
- Args:
- address: The address for which to open a port.
- server_credentials: A ServerCredentials.
-
- Returns:
- An integer port on which RPCs will be serviced after this link has been
- started. This is typically the same number as the port number contained
- in the passed address, but will likely be different if the port number
- contained in the passed address was zero.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def start(self):
- """Starts this Server's service of RPCs.
-
- This method may only be called while the server is not serving RPCs (i.e. it
- is not idempotent).
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def stop(self, grace):
- """Stops this Server's service of RPCs.
-
- All calls to this method immediately stop service of new RPCs. When existing
- RPCs are aborted is controlled by the grace period parameter passed to this
- method.
-
- This method may be called at any time and is idempotent. Passing a smaller
- grace value than has been passed in a previous call will have the effect of
- stopping the Server sooner. Passing a larger grace value than has been
- passed in a previous call will not have the effect of stopping the sooner
- later.
-
- Args:
- grace: A duration of time in seconds to allow existing RPCs to complete
- before being aborted by this Server's stopping. May be zero for
- immediate abortion of all in-progress RPCs.
-
- Returns:
- A threading.Event that will be set when this Server has completely
- stopped. The returned event may not be set until after the full grace
- period (if some ongoing RPC continues for the full length of the period)
- of it may be set much sooner (such as if this Server had no RPCs underway
- at the time it was stopped or if all RPCs that it had underway completed
- very early in the grace period).
- """
- raise NotImplementedError()
-
-
class ServerOptions(object):
"""A value encapsulating the various options for creation of a Server.
@@ -450,27 +346,8 @@ def server_options(
thread_pool, thread_pool_size, default_timeout, maximum_timeout)
-class _Server(Server):
-
- def __init__(self, underserver):
- self._underserver = underserver
-
- def add_insecure_port(self, address):
- return self._underserver.add_insecure_port(address)
-
- def add_secure_port(self, address, server_credentials):
- return self._underserver.add_secure_port(
- address, server_credentials._intermediary_low_credentials) # pylint: disable=protected-access
-
- def start(self):
- self._underserver.start()
-
- def stop(self, grace):
- return self._underserver.stop(grace)
-
-
def server(service_implementations, options=None):
- """Creates a Server with which RPCs can be serviced.
+ """Creates an interfaces.Server with which RPCs can be serviced.
Args:
service_implementations: A dictionary from service name-method name pair to
@@ -479,13 +356,12 @@ def server(service_implementations, options=None):
functionality of the returned Server.
Returns:
- A Server with which RPCs can be serviced.
+ An interfaces.Server with which RPCs can be serviced.
"""
effective_options = _EMPTY_SERVER_OPTIONS if options is None else options
- underserver = _server.server(
+ return _server.server(
service_implementations, effective_options.multi_method_implementation,
effective_options.request_deserializers,
effective_options.response_serializers, effective_options.thread_pool,
effective_options.thread_pool_size, effective_options.default_timeout,
effective_options.maximum_timeout)
- return _Server(underserver)
diff --git a/src/python/grpcio/grpc/beta/interfaces.py b/src/python/grpcio/grpc/beta/interfaces.py
new file mode 100644
index 0000000000..07c8618f70
--- /dev/null
+++ b/src/python/grpcio/grpc/beta/interfaces.py
@@ -0,0 +1,214 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Constants and interfaces of the Beta API of gRPC Python."""
+
+import abc
+import enum
+
+from grpc._adapter import _types
+
+
+@enum.unique
+class ChannelConnectivity(enum.Enum):
+ """Mirrors grpc_connectivity_state in the gRPC Core.
+
+ Attributes:
+ IDLE: The channel is idle.
+ CONNECTING: The channel is connecting.
+ READY: The channel is ready to conduct RPCs.
+ TRANSIENT_FAILURE: The channel has seen a failure from which it expects to
+ recover.
+ FATAL_FAILURE: The channel has seen a failure from which it cannot recover.
+ """
+ IDLE = (_types.ConnectivityState.IDLE, 'idle',)
+ CONNECTING = (_types.ConnectivityState.CONNECTING, 'connecting',)
+ READY = (_types.ConnectivityState.READY, 'ready',)
+ TRANSIENT_FAILURE = (
+ _types.ConnectivityState.TRANSIENT_FAILURE, 'transient failure',)
+ FATAL_FAILURE = (_types.ConnectivityState.FATAL_FAILURE, 'fatal failure',)
+
+
+@enum.unique
+class StatusCode(enum.Enum):
+ """Mirrors grpc_status_code in the C core."""
+ OK = 0
+ CANCELLED = 1
+ UNKNOWN = 2
+ INVALID_ARGUMENT = 3
+ DEADLINE_EXCEEDED = 4
+ NOT_FOUND = 5
+ ALREADY_EXISTS = 6
+ PERMISSION_DENIED = 7
+ RESOURCE_EXHAUSTED = 8
+ FAILED_PRECONDITION = 9
+ ABORTED = 10
+ OUT_OF_RANGE = 11
+ UNIMPLEMENTED = 12
+ INTERNAL = 13
+ UNAVAILABLE = 14
+ DATA_LOSS = 15
+ UNAUTHENTICATED = 16
+
+
+class GRPCCallOptions(object):
+ """A value encapsulating gRPC-specific options passed on RPC invocation.
+
+ This class and its instances have no supported interface - it exists to
+ define the type of its instances and its instances exist to be passed to
+ other functions.
+ """
+
+ def __init__(self, disable_compression, subcall_of, credentials):
+ self.disable_compression = disable_compression
+ self.subcall_of = subcall_of
+ self.credentials = credentials
+
+
+def grpc_call_options(disable_compression=False, credentials=None):
+ """Creates a GRPCCallOptions value to be passed at RPC invocation.
+
+ All parameters are optional and should always be passed by keyword.
+
+ Args:
+ disable_compression: A boolean indicating whether or not compression should
+ be disabled for the request object of the RPC. Only valid for
+ request-unary RPCs.
+ credentials: A ClientCredentials object to use for the invoked RPC.
+ """
+ return GRPCCallOptions(disable_compression, None, credentials)
+
+
+class GRPCServicerContext(object):
+ """Exposes gRPC-specific options and behaviors to code servicing RPCs."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def peer(self):
+ """Identifies the peer that invoked the RPC being serviced.
+
+ Returns:
+ A string identifying the peer that invoked the RPC being serviced.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def disable_next_response_compression(self):
+ """Disables compression of the next response passed by the application."""
+ raise NotImplementedError()
+
+
+class GRPCInvocationContext(object):
+ """Exposes gRPC-specific options and behaviors to code invoking RPCs."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def disable_next_request_compression(self):
+ """Disables compression of the next request passed by the application."""
+ raise NotImplementedError()
+
+
+class Server(object):
+ """Services RPCs."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def add_insecure_port(self, address):
+ """Reserves a port for insecure RPC service once this Server becomes active.
+
+ This method may only be called before calling this Server's start method is
+ called.
+
+ Args:
+ address: The address for which to open a port.
+
+ Returns:
+ An integer port on which RPCs will be serviced after this link has been
+ started. This is typically the same number as the port number contained
+ in the passed address, but will likely be different if the port number
+ contained in the passed address was zero.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def add_secure_port(self, address, server_credentials):
+ """Reserves a port for secure RPC service after this Server becomes active.
+
+ This method may only be called before calling this Server's start method is
+ called.
+
+ Args:
+ address: The address for which to open a port.
+ server_credentials: A ServerCredentials.
+
+ Returns:
+ An integer port on which RPCs will be serviced after this link has been
+ started. This is typically the same number as the port number contained
+ in the passed address, but will likely be different if the port number
+ contained in the passed address was zero.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def start(self):
+ """Starts this Server's service of RPCs.
+
+ This method may only be called while the server is not serving RPCs (i.e. it
+ is not idempotent).
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def stop(self, grace):
+ """Stops this Server's service of RPCs.
+
+ All calls to this method immediately stop service of new RPCs. When existing
+ RPCs are aborted is controlled by the grace period parameter passed to this
+ method.
+
+ This method may be called at any time and is idempotent. Passing a smaller
+ grace value than has been passed in a previous call will have the effect of
+ stopping the Server sooner. Passing a larger grace value than has been
+ passed in a previous call will not have the effect of stopping the sooner
+ later.
+
+ Args:
+ grace: A duration of time in seconds to allow existing RPCs to complete
+ before being aborted by this Server's stopping. May be zero for
+ immediate abortion of all in-progress RPCs.
+
+ Returns:
+ A threading.Event that will be set when this Server has completely
+ stopped. The returned event may not be set until after the full grace
+ period (if some ongoing RPC continues for the full length of the period)
+ of it may be set much sooner (such as if this Server had no RPCs underway
+ at the time it was stopped or if all RPCs that it had underway completed
+ very early in the grace period).
+ """
+ raise NotImplementedError()
diff --git a/src/python/grpcio/grpc/beta/utilities.py b/src/python/grpcio/grpc/beta/utilities.py
index 1b5356e3ad..fb07a76579 100644
--- a/src/python/grpcio/grpc/beta/utilities.py
+++ b/src/python/grpcio/grpc/beta/utilities.py
@@ -32,7 +32,9 @@
import threading
import time
-from grpc.beta import beta
+# implementations is referenced from specification in this module.
+from grpc.beta import implementations # pylint: disable=unused-import
+from grpc.beta import interfaces
from grpc.framework.foundation import callable_util
from grpc.framework.foundation import future
@@ -70,7 +72,8 @@ class _ChannelReadyFuture(future.Future):
def _update(self, connectivity):
with self._condition:
- if not self._cancelled and connectivity is beta.ChannelConnectivity.READY:
+ if (not self._cancelled and
+ connectivity is interfaces.ChannelConnectivity.READY):
self._matured = True
self._channel.unsubscribe(self._update)
self._condition.notify_all()
@@ -141,19 +144,19 @@ class _ChannelReadyFuture(future.Future):
def channel_ready_future(channel):
- """Creates a future.Future that matures when a beta.Channel is ready.
+ """Creates a future.Future tracking when an implementations.Channel is ready.
- Cancelling the returned future.Future does not tell the given beta.Channel to
- abandon attempts it may have been making to connect; cancelling merely
- deactivates the return future.Future's subscription to the given
- beta.Channel's connectivity.
+ Cancelling the returned future.Future does not tell the given
+ implementations.Channel to abandon attempts it may have been making to
+ connect; cancelling merely deactivates the return future.Future's
+ subscription to the given implementations.Channel's connectivity.
Args:
- channel: A beta.Channel.
+ channel: An implementations.Channel.
Returns:
A future.Future that matures when the given Channel has connectivity
- beta.ChannelConnectivity.READY.
+ interfaces.ChannelConnectivity.READY.
"""
ready_future = _ChannelReadyFuture(channel)
ready_future.start()
diff --git a/src/python/grpcio/grpc/framework/core/_constants.py b/src/python/grpcio/grpc/framework/core/_constants.py
index d3be3a4c4a..0f47cb48e0 100644
--- a/src/python/grpcio/grpc/framework/core/_constants.py
+++ b/src/python/grpcio/grpc/framework/core/_constants.py
@@ -44,14 +44,15 @@ TICKET_SUBSCRIPTION_FOR_BASE_SUBSCRIPTION_KIND = {
# ticket should be sent to the other side in the event of such an
# outcome.
ABORTION_OUTCOME_TO_TICKET_TERMINATION = {
- base.Outcome.CANCELLED: links.Ticket.Termination.CANCELLATION,
- base.Outcome.EXPIRED: links.Ticket.Termination.EXPIRATION,
- base.Outcome.LOCAL_SHUTDOWN: links.Ticket.Termination.SHUTDOWN,
- base.Outcome.REMOTE_SHUTDOWN: None,
- base.Outcome.RECEPTION_FAILURE: links.Ticket.Termination.RECEPTION_FAILURE,
- base.Outcome.TRANSMISSION_FAILURE: None,
- base.Outcome.LOCAL_FAILURE: links.Ticket.Termination.LOCAL_FAILURE,
- base.Outcome.REMOTE_FAILURE: links.Ticket.Termination.REMOTE_FAILURE,
+ base.Outcome.Kind.CANCELLED: links.Ticket.Termination.CANCELLATION,
+ base.Outcome.Kind.EXPIRED: links.Ticket.Termination.EXPIRATION,
+ base.Outcome.Kind.LOCAL_SHUTDOWN: links.Ticket.Termination.SHUTDOWN,
+ base.Outcome.Kind.REMOTE_SHUTDOWN: None,
+ base.Outcome.Kind.RECEPTION_FAILURE:
+ links.Ticket.Termination.RECEPTION_FAILURE,
+ base.Outcome.Kind.TRANSMISSION_FAILURE: None,
+ base.Outcome.Kind.LOCAL_FAILURE: links.Ticket.Termination.LOCAL_FAILURE,
+ base.Outcome.Kind.REMOTE_FAILURE: links.Ticket.Termination.REMOTE_FAILURE,
}
INTERNAL_ERROR_LOG_MESSAGE = ':-( RPC Framework (Core) internal error! )-:'
diff --git a/src/python/grpcio/grpc/framework/core/_context.py b/src/python/grpcio/grpc/framework/core/_context.py
index 76b3534530..a346e9d478 100644
--- a/src/python/grpcio/grpc/framework/core/_context.py
+++ b/src/python/grpcio/grpc/framework/core/_context.py
@@ -33,6 +33,7 @@ import time
# _interfaces is referenced from specification in this module.
from grpc.framework.core import _interfaces # pylint: disable=unused-import
+from grpc.framework.core import _utilities
from grpc.framework.interfaces.base import base
@@ -56,11 +57,12 @@ class OperationContext(base.OperationContext):
self._transmission_manager = transmission_manager
self._expiration_manager = expiration_manager
- def _abort(self, outcome):
+ def _abort(self, outcome_kind):
with self._lock:
if self._termination_manager.outcome is None:
+ outcome = _utilities.Outcome(outcome_kind, None, None)
self._termination_manager.abort(outcome)
- self._transmission_manager.abort(outcome, None, None)
+ self._transmission_manager.abort(outcome)
self._expiration_manager.terminate()
def outcome(self):
@@ -85,8 +87,8 @@ class OperationContext(base.OperationContext):
def cancel(self):
"""See base.OperationContext.cancel for specification."""
- self._abort(base.Outcome.CANCELLED)
+ self._abort(base.Outcome.Kind.CANCELLED)
def fail(self, exception):
"""See base.OperationContext.fail for specification."""
- self._abort(base.Outcome.LOCAL_FAILURE)
+ self._abort(base.Outcome.Kind.LOCAL_FAILURE)
diff --git a/src/python/grpcio/grpc/framework/core/_emission.py b/src/python/grpcio/grpc/framework/core/_emission.py
index 2d7b2e2f10..8ab59dc3e5 100644
--- a/src/python/grpcio/grpc/framework/core/_emission.py
+++ b/src/python/grpcio/grpc/framework/core/_emission.py
@@ -30,6 +30,7 @@
"""State and behavior for handling emitted values."""
from grpc.framework.core import _interfaces
+from grpc.framework.core import _utilities
from grpc.framework.interfaces.base import base
@@ -81,9 +82,10 @@ class EmissionManager(_interfaces.EmissionManager):
payload_present and self._completion_seen or
completion_present and self._completion_seen or
allowance_present and allowance <= 0):
- self._termination_manager.abort(base.Outcome.LOCAL_FAILURE)
- self._transmission_manager.abort(
- base.Outcome.LOCAL_FAILURE, None, None)
+ outcome = _utilities.Outcome(
+ base.Outcome.Kind.LOCAL_FAILURE, None, None)
+ self._termination_manager.abort(outcome)
+ self._transmission_manager.abort(outcome)
self._expiration_manager.terminate()
else:
self._initial_metadata_seen |= initial_metadata_present
diff --git a/src/python/grpcio/grpc/framework/core/_end.py b/src/python/grpcio/grpc/framework/core/_end.py
index f57cde4e58..8e07d9061e 100644
--- a/src/python/grpcio/grpc/framework/core/_end.py
+++ b/src/python/grpcio/grpc/framework/core/_end.py
@@ -69,7 +69,7 @@ class _Cycle(object):
def _abort(operations):
for operation in operations:
- operation.abort(base.Outcome.LOCAL_SHUTDOWN)
+ operation.abort(base.Outcome.Kind.LOCAL_SHUTDOWN)
def _cancel_futures(futures):
@@ -90,19 +90,19 @@ def _termination_action(lock, stats, operation_id, cycle):
Args:
lock: A lock to hold during the termination action.
- states: A mapping from base.Outcome values to integers to increment with
- the outcome given to the termination action.
+ stats: A mapping from base.Outcome.Kind values to integers to increment
+ with the outcome kind given to the termination action.
operation_id: The operation ID for the termination action.
cycle: A _Cycle value to be updated during the termination action.
Returns:
- A callable that takes an operation outcome as its sole parameter and that
- should be used as the termination action for the operation associated
- with the given operation ID.
+ A callable that takes an operation outcome kind as its sole parameter and
+ that should be used as the termination action for the operation
+ associated with the given operation ID.
"""
- def termination_action(outcome):
+ def termination_action(outcome_kind):
with lock:
- stats[outcome] += 1
+ stats[outcome_kind] += 1
cycle.operations.pop(operation_id, None)
if not cycle.operations:
for action in cycle.idle_actions:
@@ -127,7 +127,7 @@ class _End(End):
self._lock = threading.Condition()
self._servicer_package = servicer_package
- self._stats = {outcome: 0 for outcome in base.Outcome}
+ self._stats = {outcome_kind: 0 for outcome_kind in base.Outcome.Kind}
self._mate = None
@@ -168,7 +168,7 @@ class _End(End):
def operate(
self, group, method, subscription, timeout, initial_metadata=None,
- payload=None, completion=None):
+ payload=None, completion=None, protocol_options=None):
"""See base.End.operate for specification."""
operation_id = uuid.uuid4()
with self._lock:
@@ -177,9 +177,9 @@ class _End(End):
termination_action = _termination_action(
self._lock, self._stats, operation_id, self._cycle)
operation = _operation.invocation_operate(
- operation_id, group, method, subscription, timeout, initial_metadata,
- payload, completion, self._mate.accept_ticket, termination_action,
- self._cycle.pool)
+ operation_id, group, method, subscription, timeout, protocol_options,
+ initial_metadata, payload, completion, self._mate.accept_ticket,
+ termination_action, self._cycle.pool)
self._cycle.operations[operation_id] = operation
return operation.context, operation.operator
diff --git a/src/python/grpcio/grpc/framework/core/_expiration.py b/src/python/grpcio/grpc/framework/core/_expiration.py
index d8690b3a02..ded0ab6bce 100644
--- a/src/python/grpcio/grpc/framework/core/_expiration.py
+++ b/src/python/grpcio/grpc/framework/core/_expiration.py
@@ -32,6 +32,7 @@
import time
from grpc.framework.core import _interfaces
+from grpc.framework.core import _utilities
from grpc.framework.foundation import later
from grpc.framework.interfaces.base import base
@@ -73,7 +74,8 @@ class _ExpirationManager(_interfaces.ExpirationManager):
if self._future is not None and index == self._index:
self._future = None
self._termination_manager.expire()
- self._transmission_manager.abort(base.Outcome.EXPIRED, None, None)
+ self._transmission_manager.abort(
+ _utilities.Outcome(base.Outcome.Kind.EXPIRED, None, None))
return expire
def start(self):
diff --git a/src/python/grpcio/grpc/framework/core/_ingestion.py b/src/python/grpcio/grpc/framework/core/_ingestion.py
index 7b8127f3fc..4129a8ce43 100644
--- a/src/python/grpcio/grpc/framework/core/_ingestion.py
+++ b/src/python/grpcio/grpc/framework/core/_ingestion.py
@@ -35,6 +35,7 @@ import enum
from grpc.framework.core import _constants
from grpc.framework.core import _interfaces
+from grpc.framework.core import _utilities
from grpc.framework.foundation import abandonment
from grpc.framework.foundation import callable_util
from grpc.framework.interfaces.base import base
@@ -46,7 +47,7 @@ _INGESTION_EXCEPTION_LOG_MESSAGE = 'Exception during ingestion!'
class _SubscriptionCreation(
collections.namedtuple(
'_SubscriptionCreation',
- ('kind', 'subscription', 'code', 'message',))):
+ ('kind', 'subscription', 'code', 'details',))):
"""A sum type for the outcome of ingestion initialization.
Attributes:
@@ -56,7 +57,7 @@ class _SubscriptionCreation(
code: A code value to be sent to the other side of the operation along with
an indication that the operation is being aborted due to an error on the
remote side of the operation. Only present if kind is Kind.REMOTE_ERROR.
- message: A message value to be sent to the other side of the operation
+ details: A details value to be sent to the other side of the operation
along with an indication that the operation is being aborted due to an
error on the remote side of the operation. Only present if kind is
Kind.REMOTE_ERROR.
@@ -114,7 +115,7 @@ class _ServiceSubscriptionCreator(_SubscriptionCreator):
group, method, self._operation_context, self._output_operator)
except base.NoSuchMethodError as e:
return _SubscriptionCreation(
- _SubscriptionCreation.Kind.REMOTE_ERROR, None, e.code, e.message)
+ _SubscriptionCreation.Kind.REMOTE_ERROR, None, e.code, e.details)
except abandonment.Abandoned:
return _SubscriptionCreation(
_SubscriptionCreation.Kind.ABANDONED, None, None, None)
@@ -139,7 +140,7 @@ class _IngestionManager(_interfaces.IngestionManager):
def __init__(
self, lock, pool, subscription, subscription_creator, termination_manager,
- transmission_manager, expiration_manager):
+ transmission_manager, expiration_manager, protocol_manager):
"""Constructor.
Args:
@@ -156,12 +157,14 @@ class _IngestionManager(_interfaces.IngestionManager):
transmission_manager: The _interfaces.TransmissionManager for the
operation.
expiration_manager: The _interfaces.ExpirationManager for the operation.
+ protocol_manager: The _interfaces.ProtocolManager for the operation.
"""
self._lock = lock
self._pool = pool
self._termination_manager = termination_manager
self._transmission_manager = transmission_manager
self._expiration_manager = expiration_manager
+ self._protocol_manager = protocol_manager
if subscription is None:
self._subscription_creator = subscription_creator
@@ -190,11 +193,13 @@ class _IngestionManager(_interfaces.IngestionManager):
self._pending_payloads = None
self._pending_completion = None
- def _abort_and_notify(self, outcome, code, message):
+ def _abort_and_notify(self, outcome_kind, code, details):
self._abort_internal_only()
- self._termination_manager.abort(outcome)
- self._transmission_manager.abort(outcome, code, message)
- self._expiration_manager.terminate()
+ if self._termination_manager.outcome is None:
+ outcome = _utilities.Outcome(outcome_kind, code, details)
+ self._termination_manager.abort(outcome)
+ self._transmission_manager.abort(outcome)
+ self._expiration_manager.terminate()
def _operator_next(self):
"""Computes the next step for full-subscription ingestion.
@@ -250,12 +255,13 @@ class _IngestionManager(_interfaces.IngestionManager):
else:
with self._lock:
if self._termination_manager.outcome is None:
- self._abort_and_notify(base.Outcome.LOCAL_FAILURE, None, None)
+ self._abort_and_notify(
+ base.Outcome.Kind.LOCAL_FAILURE, None, None)
return
else:
with self._lock:
if self._termination_manager.outcome is None:
- self._abort_and_notify(base.Outcome.LOCAL_FAILURE, None, None)
+ self._abort_and_notify(base.Outcome.Kind.LOCAL_FAILURE, None, None)
return
def _operator_post_create(self, subscription):
@@ -279,18 +285,21 @@ class _IngestionManager(_interfaces.IngestionManager):
if outcome.return_value is None:
with self._lock:
if self._termination_manager.outcome is None:
- self._abort_and_notify(base.Outcome.LOCAL_FAILURE, None, None)
+ self._abort_and_notify(base.Outcome.Kind.LOCAL_FAILURE, None, None)
elif outcome.return_value.kind is _SubscriptionCreation.Kind.ABANDONED:
with self._lock:
if self._termination_manager.outcome is None:
- self._abort_and_notify(base.Outcome.LOCAL_FAILURE, None, None)
+ self._abort_and_notify(base.Outcome.Kind.LOCAL_FAILURE, None, None)
elif outcome.return_value.kind is _SubscriptionCreation.Kind.REMOTE_ERROR:
code = outcome.return_value.code
- message = outcome.return_value.message
+ details = outcome.return_value.details
with self._lock:
if self._termination_manager.outcome is None:
- self._abort_and_notify(base.Outcome.REMOTE_FAILURE, code, message)
+ self._abort_and_notify(
+ base.Outcome.Kind.REMOTE_FAILURE, code, details)
elif outcome.return_value.subscription.kind is base.Subscription.Kind.FULL:
+ self._protocol_manager.set_protocol_receiver(
+ outcome.return_value.subscription.protocol_receiver)
self._operator_post_create(outcome.return_value.subscription)
else:
# TODO(nathaniel): Support other subscriptions.
@@ -373,7 +382,7 @@ class _IngestionManager(_interfaces.IngestionManager):
def invocation_ingestion_manager(
subscription, lock, pool, termination_manager, transmission_manager,
- expiration_manager):
+ expiration_manager, protocol_manager):
"""Creates an IngestionManager appropriate for invocation-side use.
Args:
@@ -385,18 +394,20 @@ def invocation_ingestion_manager(
transmission_manager: The _interfaces.TransmissionManager for the
operation.
expiration_manager: The _interfaces.ExpirationManager for the operation.
+ protocol_manager: The _interfaces.ProtocolManager for the operation.
Returns:
An IngestionManager appropriate for invocation-side use.
"""
return _IngestionManager(
lock, pool, subscription, None, termination_manager, transmission_manager,
- expiration_manager)
+ expiration_manager, protocol_manager)
def service_ingestion_manager(
servicer, operation_context, output_operator, lock, pool,
- termination_manager, transmission_manager, expiration_manager):
+ termination_manager, transmission_manager, expiration_manager,
+ protocol_manager):
"""Creates an IngestionManager appropriate for service-side use.
The returned IngestionManager will require its set_group_and_name method to be
@@ -415,6 +426,7 @@ def service_ingestion_manager(
transmission_manager: The _interfaces.TransmissionManager for the
operation.
expiration_manager: The _interfaces.ExpirationManager for the operation.
+ protocol_manager: The _interfaces.ProtocolManager for the operation.
Returns:
An IngestionManager appropriate for service-side use.
@@ -423,4 +435,4 @@ def service_ingestion_manager(
servicer, operation_context, output_operator)
return _IngestionManager(
lock, pool, None, subscription_creator, termination_manager,
- transmission_manager, expiration_manager)
+ transmission_manager, expiration_manager, protocol_manager)
diff --git a/src/python/grpcio/grpc/framework/core/_interfaces.py b/src/python/grpcio/grpc/framework/core/_interfaces.py
index deb5f34f9b..ffa686b2b7 100644
--- a/src/python/grpcio/grpc/framework/core/_interfaces.py
+++ b/src/python/grpcio/grpc/framework/core/_interfaces.py
@@ -50,13 +50,13 @@ class TerminationManager(object):
If the operation has already terminated the callback will not be called.
Args:
- callback: A callable that will be passed an interfaces.Outcome value.
+ callback: A callable that will be passed a base.Outcome value.
Returns:
None if the operation has not yet terminated and the passed callback will
- be called when it does, or a base.Outcome value describing the operation
- termination if the operation has terminated and the callback will not be
- called as a result of this method call.
+ be called when it does, or a base.Outcome value describing the
+ operation termination if the operation has terminated and the callback
+ will not be called as a result of this method call.
"""
raise NotImplementedError()
@@ -76,8 +76,13 @@ class TerminationManager(object):
raise NotImplementedError()
@abc.abstractmethod
- def reception_complete(self):
- """Indicates that reception from the other side is complete."""
+ def reception_complete(self, code, details):
+ """Indicates that reception from the other side is complete.
+
+ Args:
+ code: An application-specific code value.
+ details: An application-specific details value.
+ """
raise NotImplementedError()
@abc.abstractmethod
@@ -95,7 +100,7 @@ class TerminationManager(object):
"""Indicates that the operation must abort for the indicated reason.
Args:
- outcome: An interfaces.Outcome indicating operation abortion.
+ outcome: A base.Outcome indicating operation abortion.
"""
raise NotImplementedError()
@@ -106,8 +111,8 @@ class TransmissionManager(object):
@abc.abstractmethod
def kick_off(
- self, group, method, timeout, initial_metadata, payload, completion,
- allowance):
+ self, group, method, timeout, protocol_options, initial_metadata,
+ payload, completion, allowance):
"""Transmits the values associated with operation invocation."""
raise NotImplementedError()
@@ -155,19 +160,13 @@ class TransmissionManager(object):
raise NotImplementedError()
@abc.abstractmethod
- def abort(self, outcome, code, message):
+ def abort(self, outcome):
"""Indicates that the operation has aborted.
Args:
- outcome: An interfaces.Outcome for the operation. If None, indicates that
- the operation abortion should not be communicated to the other side of
- the operation.
- code: A code value to communicate to the other side of the operation
- along with indication of operation abortion. May be None, and has no
- effect if outcome is None.
- message: A message value to communicate to the other side of the
- operation along with indication of operation abortion. May be None, and
- has no effect if outcome is None.
+ outcome: A base.Outcome for the operation. If None, indicates that the
+ operation abortion should not be communicated to the other side of the
+ operation.
"""
raise NotImplementedError()
@@ -204,6 +203,31 @@ class ExpirationManager(object):
raise NotImplementedError()
+class ProtocolManager(object):
+ """A manager of protocol-specific values passing through an operation."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def set_protocol_receiver(self, protocol_receiver):
+ """Registers the customer object that will receive protocol objects.
+
+ Args:
+ protocol_receiver: A base.ProtocolReceiver to which protocol objects for
+ the operation should be passed.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def accept_protocol_context(self, protocol_context):
+ """Accepts the protocol context object for the operation.
+
+ Args:
+ protocol_context: An object designated for use as the protocol context
+ of the operation, with further semantics implementation-determined.
+ """
+ raise NotImplementedError()
+
+
class EmissionManager(base.Operator):
"""A manager of values emitted by customer code."""
__metaclass__ = abc.ABCMeta
@@ -279,8 +303,7 @@ class ReceptionManager(object):
"""Handle a ticket from the other side of the operation.
Args:
- ticket: An interfaces.BackToFrontTicket or interfaces.FrontToBackTicket
- appropriate to this end of the operation and this object.
+ ticket: A links.Ticket for the operation.
"""
raise NotImplementedError()
@@ -305,10 +328,10 @@ class Operation(object):
raise NotImplementedError()
@abc.abstractmethod
- def abort(self, outcome):
+ def abort(self, outcome_kind):
"""Aborts the operation.
Args:
- outcome: A base.Outcome value indicating operation abortion.
+ outcome_kind: A base.Outcome.Kind value indicating operation abortion.
"""
raise NotImplementedError()
diff --git a/src/python/grpcio/grpc/framework/core/_operation.py b/src/python/grpcio/grpc/framework/core/_operation.py
index cc873c03f9..020c0c9ed9 100644
--- a/src/python/grpcio/grpc/framework/core/_operation.py
+++ b/src/python/grpcio/grpc/framework/core/_operation.py
@@ -31,16 +31,16 @@
import threading
-# _utilities is referenced from specification in this module.
from grpc.framework.core import _context
from grpc.framework.core import _emission
from grpc.framework.core import _expiration
from grpc.framework.core import _ingestion
from grpc.framework.core import _interfaces
+from grpc.framework.core import _protocol
from grpc.framework.core import _reception
from grpc.framework.core import _termination
from grpc.framework.core import _transmission
-from grpc.framework.core import _utilities # pylint: disable=unused-import
+from grpc.framework.core import _utilities
class _EasyOperation(_interfaces.Operation):
@@ -75,17 +75,19 @@ class _EasyOperation(_interfaces.Operation):
with self._lock:
self._reception_manager.receive_ticket(ticket)
- def abort(self, outcome):
+ def abort(self, outcome_kind):
with self._lock:
if self._termination_manager.outcome is None:
+ outcome = _utilities.Outcome(outcome_kind, None, None)
self._termination_manager.abort(outcome)
- self._transmission_manager.abort(outcome, None, None)
+ self._transmission_manager.abort(outcome)
self._expiration_manager.terminate()
def invocation_operate(
- operation_id, group, method, subscription, timeout, initial_metadata,
- payload, completion, ticket_sink, termination_action, pool):
+ operation_id, group, method, subscription, timeout, protocol_options,
+ initial_metadata, payload, completion, ticket_sink, termination_action,
+ pool):
"""Constructs objects necessary for front-side operation management.
Args:
@@ -95,6 +97,8 @@ def invocation_operate(
subscription: A base.Subscription describing the customer's interest in the
results of the operation.
timeout: A length of time in seconds to allow for the operation.
+ protocol_options: A transport-specific, application-specific, and/or
+ protocol-specific value relating to the invocation. May be None.
initial_metadata: An initial metadata value to be sent to the other side of
the operation. May be None if the initial metadata will be passed later or
if there will be no initial metadata passed at all.
@@ -120,23 +124,27 @@ def invocation_operate(
operation_id, ticket_sink, lock, pool, termination_manager)
expiration_manager = _expiration.invocation_expiration_manager(
timeout, lock, termination_manager, transmission_manager)
+ protocol_manager = _protocol.invocation_protocol_manager(
+ subscription, lock, pool, termination_manager, transmission_manager,
+ expiration_manager)
operation_context = _context.OperationContext(
lock, termination_manager, transmission_manager, expiration_manager)
emission_manager = _emission.EmissionManager(
lock, termination_manager, transmission_manager, expiration_manager)
ingestion_manager = _ingestion.invocation_ingestion_manager(
subscription, lock, pool, termination_manager, transmission_manager,
- expiration_manager)
+ expiration_manager, protocol_manager)
reception_manager = _reception.ReceptionManager(
termination_manager, transmission_manager, expiration_manager,
- ingestion_manager)
+ protocol_manager, ingestion_manager)
termination_manager.set_expiration_manager(expiration_manager)
transmission_manager.set_expiration_manager(expiration_manager)
emission_manager.set_ingestion_manager(ingestion_manager)
transmission_manager.kick_off(
- group, method, timeout, initial_metadata, payload, completion, None)
+ group, method, timeout, protocol_options, initial_metadata, payload,
+ completion, None)
return _EasyOperation(
lock, termination_manager, transmission_manager, expiration_manager,
@@ -170,16 +178,20 @@ def service_operate(
ticket.timeout, servicer_package.default_timeout,
servicer_package.maximum_timeout, lock, termination_manager,
transmission_manager)
+ protocol_manager = _protocol.service_protocol_manager(
+ lock, pool, termination_manager, transmission_manager,
+ expiration_manager)
operation_context = _context.OperationContext(
lock, termination_manager, transmission_manager, expiration_manager)
emission_manager = _emission.EmissionManager(
lock, termination_manager, transmission_manager, expiration_manager)
ingestion_manager = _ingestion.service_ingestion_manager(
servicer_package.servicer, operation_context, emission_manager, lock,
- pool, termination_manager, transmission_manager, expiration_manager)
+ pool, termination_manager, transmission_manager, expiration_manager,
+ protocol_manager)
reception_manager = _reception.ReceptionManager(
termination_manager, transmission_manager, expiration_manager,
- ingestion_manager)
+ protocol_manager, ingestion_manager)
termination_manager.set_expiration_manager(expiration_manager)
transmission_manager.set_expiration_manager(expiration_manager)
diff --git a/src/python/grpcio/grpc/framework/core/_protocol.py b/src/python/grpcio/grpc/framework/core/_protocol.py
new file mode 100644
index 0000000000..3177b5e302
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_protocol.py
@@ -0,0 +1,176 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""State and behavior for passing protocol objects in an operation."""
+
+import collections
+import enum
+
+from grpc.framework.core import _constants
+from grpc.framework.core import _interfaces
+from grpc.framework.core import _utilities
+from grpc.framework.foundation import callable_util
+from grpc.framework.interfaces.base import base
+
+_EXCEPTION_LOG_MESSAGE = 'Exception delivering protocol object!'
+
+_LOCAL_FAILURE_OUTCOME = _utilities.Outcome(
+ base.Outcome.Kind.LOCAL_FAILURE, None, None)
+
+
+class _Awaited(
+ collections.namedtuple('_Awaited', ('kind', 'value',))):
+
+ @enum.unique
+ class Kind(enum.Enum):
+ NOT_YET_ARRIVED = 'not yet arrived'
+ ARRIVED = 'arrived'
+
+_NOT_YET_ARRIVED = _Awaited(_Awaited.Kind.NOT_YET_ARRIVED, None)
+_ARRIVED_AND_NONE = _Awaited(_Awaited.Kind.ARRIVED, None)
+
+
+class _Transitory(
+ collections.namedtuple('_Transitory', ('kind', 'value',))):
+
+ @enum.unique
+ class Kind(enum.Enum):
+ NOT_YET_SEEN = 'not yet seen'
+ PRESENT = 'present'
+ GONE = 'gone'
+
+_NOT_YET_SEEN = _Transitory(_Transitory.Kind.NOT_YET_SEEN, None)
+_GONE = _Transitory(_Transitory.Kind.GONE, None)
+
+
+class _ProtocolManager(_interfaces.ProtocolManager):
+ """An implementation of _interfaces.ExpirationManager."""
+
+ def __init__(
+ self, protocol_receiver, lock, pool, termination_manager,
+ transmission_manager, expiration_manager):
+ """Constructor.
+
+ Args:
+ protocol_receiver: An _Awaited wrapping of the base.ProtocolReceiver to
+ which protocol objects should be passed during the operation. May be
+ of kind _Awaited.Kind.NOT_YET_ARRIVED if the customer's subscription is
+ not yet known and may be of kind _Awaited.Kind.ARRIVED but with a value
+ of None if the customer's subscription did not include a
+ ProtocolReceiver.
+ lock: The operation-wide lock.
+ pool: A thread pool.
+ termination_manager: The _interfaces.TerminationManager for the operation.
+ transmission_manager: The _interfaces.TransmissionManager for the
+ operation.
+ expiration_manager: The _interfaces.ExpirationManager for the operation.
+ """
+ self._lock = lock
+ self._pool = pool
+ self._termination_manager = termination_manager
+ self._transmission_manager = transmission_manager
+ self._expiration_manager = expiration_manager
+
+ self._protocol_receiver = protocol_receiver
+ self._context = _NOT_YET_SEEN
+
+ def _abort_and_notify(self, outcome):
+ if self._termination_manager.outcome is None:
+ self._termination_manager.abort(outcome)
+ self._transmission_manager.abort(outcome)
+ self._expiration_manager.terminate()
+
+ def _deliver(self, behavior, value):
+ def deliver():
+ delivery_outcome = callable_util.call_logging_exceptions(
+ behavior, _EXCEPTION_LOG_MESSAGE, value)
+ if delivery_outcome.kind is callable_util.Outcome.Kind.RAISED:
+ with self._lock:
+ self._abort_and_notify(_LOCAL_FAILURE_OUTCOME)
+ self._pool.submit(
+ callable_util.with_exceptions_logged(
+ deliver, _constants.INTERNAL_ERROR_LOG_MESSAGE))
+
+ def set_protocol_receiver(self, protocol_receiver):
+ """See _interfaces.ProtocolManager.set_protocol_receiver for spec."""
+ self._protocol_receiver = _Awaited(_Awaited.Kind.ARRIVED, protocol_receiver)
+ if (self._context.kind is _Transitory.Kind.PRESENT and
+ protocol_receiver is not None):
+ self._deliver(protocol_receiver.context, self._context.value)
+ self._context = _GONE
+
+ def accept_protocol_context(self, protocol_context):
+ """See _interfaces.ProtocolManager.accept_protocol_context for spec."""
+ if self._protocol_receiver.kind is _Awaited.Kind.ARRIVED:
+ if self._protocol_receiver.value is not None:
+ self._deliver(self._protocol_receiver.value.context, protocol_context)
+ self._context = _GONE
+ else:
+ self._context = _Transitory(_Transitory.Kind.PRESENT, protocol_context)
+
+
+def invocation_protocol_manager(
+ subscription, lock, pool, termination_manager, transmission_manager,
+ expiration_manager):
+ """Creates an _interfaces.ProtocolManager for invocation-side use.
+
+ Args:
+ subscription: The local customer's subscription to the operation.
+ lock: The operation-wide lock.
+ pool: A thread pool.
+ termination_manager: The _interfaces.TerminationManager for the operation.
+ transmission_manager: The _interfaces.TransmissionManager for the
+ operation.
+ expiration_manager: The _interfaces.ExpirationManager for the operation.
+ """
+ if subscription.kind is base.Subscription.Kind.FULL:
+ awaited_protocol_receiver = _Awaited(
+ _Awaited.Kind.ARRIVED, subscription.protocol_receiver)
+ else:
+ awaited_protocol_receiver = _ARRIVED_AND_NONE
+ return _ProtocolManager(
+ awaited_protocol_receiver, lock, pool, termination_manager,
+ transmission_manager, expiration_manager)
+
+
+def service_protocol_manager(
+ lock, pool, termination_manager, transmission_manager, expiration_manager):
+ """Creates an _interfaces.ProtocolManager for service-side use.
+
+ Args:
+ lock: The operation-wide lock.
+ pool: A thread pool.
+ termination_manager: The _interfaces.TerminationManager for the operation.
+ transmission_manager: The _interfaces.TransmissionManager for the
+ operation.
+ expiration_manager: The _interfaces.ExpirationManager for the operation.
+ """
+ return _ProtocolManager(
+ _NOT_YET_ARRIVED, lock, pool, termination_manager, transmission_manager,
+ expiration_manager)
diff --git a/src/python/grpcio/grpc/framework/core/_reception.py b/src/python/grpcio/grpc/framework/core/_reception.py
index 1cebe3874b..ff81450dee 100644
--- a/src/python/grpcio/grpc/framework/core/_reception.py
+++ b/src/python/grpcio/grpc/framework/core/_reception.py
@@ -30,39 +30,52 @@
"""State and behavior for ticket reception."""
from grpc.framework.core import _interfaces
+from grpc.framework.core import _utilities
from grpc.framework.interfaces.base import base
from grpc.framework.interfaces.base import utilities
from grpc.framework.interfaces.links import links
-_REMOTE_TICKET_TERMINATION_TO_LOCAL_OUTCOME = {
- links.Ticket.Termination.CANCELLATION: base.Outcome.CANCELLED,
- links.Ticket.Termination.EXPIRATION: base.Outcome.EXPIRED,
- links.Ticket.Termination.SHUTDOWN: base.Outcome.REMOTE_SHUTDOWN,
- links.Ticket.Termination.RECEPTION_FAILURE: base.Outcome.RECEPTION_FAILURE,
+_REMOTE_TICKET_TERMINATION_TO_LOCAL_OUTCOME_KIND = {
+ links.Ticket.Termination.CANCELLATION: base.Outcome.Kind.CANCELLED,
+ links.Ticket.Termination.EXPIRATION: base.Outcome.Kind.EXPIRED,
+ links.Ticket.Termination.SHUTDOWN: base.Outcome.Kind.REMOTE_SHUTDOWN,
+ links.Ticket.Termination.RECEPTION_FAILURE:
+ base.Outcome.Kind.RECEPTION_FAILURE,
links.Ticket.Termination.TRANSMISSION_FAILURE:
- base.Outcome.TRANSMISSION_FAILURE,
- links.Ticket.Termination.LOCAL_FAILURE: base.Outcome.REMOTE_FAILURE,
- links.Ticket.Termination.REMOTE_FAILURE: base.Outcome.LOCAL_FAILURE,
+ base.Outcome.Kind.TRANSMISSION_FAILURE,
+ links.Ticket.Termination.LOCAL_FAILURE: base.Outcome.Kind.REMOTE_FAILURE,
+ links.Ticket.Termination.REMOTE_FAILURE: base.Outcome.Kind.LOCAL_FAILURE,
}
+_RECEPTION_FAILURE_OUTCOME = _utilities.Outcome(
+ base.Outcome.Kind.RECEPTION_FAILURE, None, None)
+
+
+def _carrying_protocol_context(ticket):
+ return ticket.protocol is not None and ticket.protocol.kind in (
+ links.Protocol.Kind.INVOCATION_CONTEXT,
+ links.Protocol.Kind.SERVICER_CONTEXT,)
+
class ReceptionManager(_interfaces.ReceptionManager):
"""A ReceptionManager based around a _Receiver passed to it."""
def __init__(
self, termination_manager, transmission_manager, expiration_manager,
- ingestion_manager):
+ protocol_manager, ingestion_manager):
"""Constructor.
Args:
termination_manager: The operation's _interfaces.TerminationManager.
transmission_manager: The operation's _interfaces.TransmissionManager.
expiration_manager: The operation's _interfaces.ExpirationManager.
+ protocol_manager: The operation's _interfaces.ProtocolManager.
ingestion_manager: The operation's _interfaces.IngestionManager.
"""
self._termination_manager = termination_manager
self._transmission_manager = transmission_manager
self._expiration_manager = expiration_manager
+ self._protocol_manager = protocol_manager
self._ingestion_manager = ingestion_manager
self._lowest_unseen_sequence_number = 0
@@ -73,7 +86,7 @@ class ReceptionManager(_interfaces.ReceptionManager):
self._aborted = True
if self._termination_manager.outcome is None:
self._termination_manager.abort(outcome)
- self._transmission_manager.abort(None, None, None)
+ self._transmission_manager.abort(None)
self._expiration_manager.terminate()
def _sequence_failure(self, ticket):
@@ -95,6 +108,10 @@ class ReceptionManager(_interfaces.ReceptionManager):
def _process_one(self, ticket):
if ticket.sequence_number == 0:
self._ingestion_manager.set_group_and_method(ticket.group, ticket.method)
+ if _carrying_protocol_context(ticket):
+ self._protocol_manager.accept_protocol_context(ticket.protocol.value)
+ else:
+ self._protocol_manager.accept_protocol_context(None)
if ticket.timeout is not None:
self._expiration_manager.change_timeout(ticket.timeout)
if ticket.termination is None:
@@ -102,6 +119,7 @@ class ReceptionManager(_interfaces.ReceptionManager):
else:
completion = utilities.completion(
ticket.terminal_metadata, ticket.code, ticket.message)
+ self._termination_manager.reception_complete(ticket.code, ticket.message)
self._ingestion_manager.advance(
ticket.initial_metadata, ticket.payload, completion, ticket.allowance)
if ticket.allowance is not None:
@@ -129,10 +147,12 @@ class ReceptionManager(_interfaces.ReceptionManager):
if self._aborted:
return
elif self._sequence_failure(ticket):
- self._abort(base.Outcome.RECEPTION_FAILURE)
+ self._abort(_RECEPTION_FAILURE_OUTCOME)
elif ticket.termination not in (None, links.Ticket.Termination.COMPLETION):
- outcome = _REMOTE_TICKET_TERMINATION_TO_LOCAL_OUTCOME[ticket.termination]
- self._abort(outcome)
+ outcome_kind = _REMOTE_TICKET_TERMINATION_TO_LOCAL_OUTCOME_KIND[
+ ticket.termination]
+ self._abort(
+ _utilities.Outcome(outcome_kind, ticket.code, ticket.message))
elif ticket.sequence_number == self._lowest_unseen_sequence_number:
self._process(ticket)
else:
diff --git a/src/python/grpcio/grpc/framework/core/_termination.py b/src/python/grpcio/grpc/framework/core/_termination.py
index ad9f6123d8..bdb9147e5b 100644
--- a/src/python/grpcio/grpc/framework/core/_termination.py
+++ b/src/python/grpcio/grpc/framework/core/_termination.py
@@ -33,6 +33,7 @@ import abc
from grpc.framework.core import _constants
from grpc.framework.core import _interfaces
+from grpc.framework.core import _utilities
from grpc.framework.foundation import callable_util
from grpc.framework.interfaces.base import base
@@ -74,7 +75,8 @@ class _TerminationManager(TerminationManager):
predicate: One of _invocation_completion_predicate or
_service_completion_predicate to be used to determine when the operation
has completed.
- action: A behavior to pass the operation outcome on operation termination.
+ action: A behavior to pass the operation outcome's kind on operation
+ termination.
pool: A thread pool.
"""
self._predicate = predicate
@@ -82,14 +84,19 @@ class _TerminationManager(TerminationManager):
self._pool = pool
self._expiration_manager = None
- self.outcome = None
self._callbacks = []
+ self._code = None
+ self._details = None
self._emission_complete = False
self._transmission_complete = False
self._reception_complete = False
self._ingestion_complete = False
+ # The None-ness of outcome is the operation-wide record of whether and how
+ # the operation has terminated.
+ self.outcome = None
+
def set_expiration_manager(self, expiration_manager):
self._expiration_manager = expiration_manager
@@ -106,8 +113,10 @@ class _TerminationManager(TerminationManager):
act = callable_util.with_exceptions_logged(
self._action, _constants.INTERNAL_ERROR_LOG_MESSAGE)
- if outcome is base.Outcome.LOCAL_FAILURE:
- self._pool.submit(act, outcome)
+ # TODO(issue 3202): Don't call the local application's callbacks if it has
+ # previously shown a programming defect.
+ if False and outcome.kind is base.Outcome.Kind.LOCAL_FAILURE:
+ self._pool.submit(act, base.Outcome.Kind.LOCAL_FAILURE)
else:
def call_callbacks_and_act(callbacks, outcome):
for callback in callbacks:
@@ -115,9 +124,11 @@ class _TerminationManager(TerminationManager):
callback, _constants.TERMINATION_CALLBACK_EXCEPTION_LOG_MESSAGE,
outcome)
if callback_outcome.exception is not None:
- outcome = base.Outcome.LOCAL_FAILURE
+ act_outcome_kind = base.Outcome.Kind.LOCAL_FAILURE
break
- act(outcome)
+ else:
+ act_outcome_kind = outcome.kind
+ act(act_outcome_kind)
self._pool.submit(
callable_util.with_exceptions_logged(
@@ -132,7 +143,9 @@ class _TerminationManager(TerminationManager):
if self._predicate(
self._emission_complete, self._transmission_complete,
self._reception_complete, self._ingestion_complete):
- self._terminate_and_notify(base.Outcome.COMPLETED)
+ self._terminate_and_notify(
+ _utilities.Outcome(
+ base.Outcome.Kind.COMPLETED, self._code, self._details))
return True
else:
return False
@@ -163,10 +176,12 @@ class _TerminationManager(TerminationManager):
else:
return False
- def reception_complete(self):
+ def reception_complete(self, code, details):
"""See superclass method for specification."""
if self.outcome is None:
self._reception_complete = True
+ self._code = code
+ self._details = details
self._perhaps_complete()
def ingestion_complete(self):
@@ -177,7 +192,8 @@ class _TerminationManager(TerminationManager):
def expire(self):
"""See _interfaces.TerminationManager.expire for specification."""
- self._terminate_internal_only(base.Outcome.EXPIRED)
+ self._terminate_internal_only(
+ _utilities.Outcome(base.Outcome.Kind.EXPIRED, None, None))
def abort(self, outcome):
"""See _interfaces.TerminationManager.abort for specification."""
diff --git a/src/python/grpcio/grpc/framework/core/_transmission.py b/src/python/grpcio/grpc/framework/core/_transmission.py
index efef87dd4c..65b12c4160 100644
--- a/src/python/grpcio/grpc/framework/core/_transmission.py
+++ b/src/python/grpcio/grpc/framework/core/_transmission.py
@@ -29,14 +29,21 @@
"""State and behavior for ticket transmission during an operation."""
+import collections
+import enum
+
from grpc.framework.core import _constants
from grpc.framework.core import _interfaces
+from grpc.framework.core import _utilities
from grpc.framework.foundation import callable_util
from grpc.framework.interfaces.base import base
from grpc.framework.interfaces.links import links
_TRANSMISSION_EXCEPTION_LOG_MESSAGE = 'Exception during transmission!'
+_TRANSMISSION_FAILURE_OUTCOME = _utilities.Outcome(
+ base.Outcome.Kind.TRANSMISSION_FAILURE, None, None)
+
def _explode_completion(completion):
if completion is None:
@@ -47,6 +54,31 @@ def _explode_completion(completion):
links.Ticket.Termination.COMPLETION)
+class _Abort(
+ collections.namedtuple(
+ '_Abort', ('kind', 'termination', 'code', 'details',))):
+ """Tracks whether the operation aborted and what is to be done about it.
+
+ Attributes:
+ kind: A Kind value describing the overall kind of the _Abort.
+ termination: A links.Ticket.Termination value to be sent to the other side
+ of the operation. Only valid if kind is Kind.ABORTED_NOTIFY_NEEDED.
+ code: A code value to be sent to the other side of the operation. Only
+ valid if kind is Kind.ABORTED_NOTIFY_NEEDED.
+ details: A details value to be sent to the other side of the operation.
+ Only valid if kind is Kind.ABORTED_NOTIFY_NEEDED.
+ """
+
+ @enum.unique
+ class Kind(enum.Enum):
+ NOT_ABORTED = 'not aborted'
+ ABORTED_NOTIFY_NEEDED = 'aborted notify needed'
+ ABORTED_NO_NOTIFY = 'aborted no notify'
+
+_NOT_ABORTED = _Abort(_Abort.Kind.NOT_ABORTED, None, None, None)
+_ABORTED_NO_NOTIFY = _Abort(_Abort.Kind.ABORTED_NO_NOTIFY, None, None, None)
+
+
class TransmissionManager(_interfaces.TransmissionManager):
"""An _interfaces.TransmissionManager that sends links.Tickets."""
@@ -79,8 +111,7 @@ class TransmissionManager(_interfaces.TransmissionManager):
self._initial_metadata = None
self._payloads = []
self._completion = None
- self._aborted = False
- self._abortion_outcome = None
+ self._abort = _NOT_ABORTED
self._transmitting = False
def set_expiration_manager(self, expiration_manager):
@@ -94,24 +125,15 @@ class TransmissionManager(_interfaces.TransmissionManager):
A links.Ticket to be sent to the other side of the operation or None if
there is nothing to be sent at this time.
"""
- if self._aborted:
- if self._abortion_outcome is None:
- return None
- else:
- termination = _constants.ABORTION_OUTCOME_TO_TICKET_TERMINATION[
- self._abortion_outcome]
- if termination is None:
- return None
- else:
- self._abortion_outcome = None
- if self._completion is None:
- code, message = None, None
- else:
- code, message = self._completion.code, self._completion.message
- return links.Ticket(
- self._operation_id, self._lowest_unused_sequence_number, None,
- None, None, None, None, None, None, None, code, message,
- termination, None)
+ if self._abort.kind is _Abort.Kind.ABORTED_NO_NOTIFY:
+ return None
+ elif self._abort.kind is _Abort.Kind.ABORTED_NOTIFY_NEEDED:
+ termination = self._abort.termination
+ code, details = self._abort.code, self._abort.details
+ self._abort = _ABORTED_NO_NOTIFY
+ return links.Ticket(
+ self._operation_id, self._lowest_unused_sequence_number, None, None,
+ None, None, None, None, None, None, code, details, termination, None)
action = False
# TODO(nathaniel): Support other subscriptions.
@@ -174,8 +196,9 @@ class TransmissionManager(_interfaces.TransmissionManager):
return
else:
with self._lock:
+ self._abort = _ABORTED_NO_NOTIFY
if self._termination_manager.outcome is None:
- self._termination_manager.abort(base.Outcome.TRANSMISSION_FAILURE)
+ self._termination_manager.abort(_TRANSMISSION_FAILURE_OUTCOME)
self._expiration_manager.terminate()
return
@@ -184,23 +207,27 @@ class TransmissionManager(_interfaces.TransmissionManager):
self._transmitting = True
def kick_off(
- self, group, method, timeout, initial_metadata, payload, completion,
- allowance):
+ self, group, method, timeout, protocol_options, initial_metadata,
+ payload, completion, allowance):
"""See _interfaces.TransmissionManager.kickoff for specification."""
# TODO(nathaniel): Support other subscriptions.
subscription = links.Ticket.Subscription.FULL
terminal_metadata, code, message, termination = _explode_completion(
completion)
self._remote_allowance = 1 if payload is None else 0
+ protocol = links.Protocol(links.Protocol.Kind.CALL_OPTION, protocol_options)
ticket = links.Ticket(
self._operation_id, 0, group, method, subscription, timeout, allowance,
initial_metadata, payload, terminal_metadata, code, message,
- termination, None)
+ termination, protocol)
self._lowest_unused_sequence_number = 1
self._transmit(ticket)
def advance(self, initial_metadata, payload, completion, allowance):
"""See _interfaces.TransmissionManager.advance for specification."""
+ if self._abort.kind is not _Abort.Kind.NOT_ABORTED:
+ return
+
effective_initial_metadata = initial_metadata
effective_payload = payload
effective_completion = completion
@@ -246,7 +273,9 @@ class TransmissionManager(_interfaces.TransmissionManager):
def timeout(self, timeout):
"""See _interfaces.TransmissionManager.timeout for specification."""
- if self._transmitting:
+ if self._abort.kind is not _Abort.Kind.NOT_ABORTED:
+ return
+ elif self._transmitting:
self._timeout = timeout
else:
ticket = links.Ticket(
@@ -257,7 +286,9 @@ class TransmissionManager(_interfaces.TransmissionManager):
def allowance(self, allowance):
"""See _interfaces.TransmissionManager.allowance for specification."""
- if self._transmitting or not self._payloads:
+ if self._abort.kind is not _Abort.Kind.NOT_ABORTED:
+ return
+ elif self._transmitting or not self._payloads:
self._remote_allowance += allowance
else:
self._remote_allowance += allowance - 1
@@ -281,22 +312,24 @@ class TransmissionManager(_interfaces.TransmissionManager):
self._remote_complete = True
self._local_allowance = 0
- def abort(self, outcome, code, message):
+ def abort(self, outcome):
"""See _interfaces.TransmissionManager.abort for specification."""
- if self._transmitting:
- self._aborted, self._abortion_outcome = True, outcome
- else:
- self._aborted = True
- if outcome is not None:
- termination = _constants.ABORTION_OUTCOME_TO_TICKET_TERMINATION[
- outcome]
- if termination is not None:
- if self._completion is None:
- code, message = None, None
- else:
- code, message = self._completion.code, self._completion.message
+ if self._abort.kind is _Abort.Kind.NOT_ABORTED:
+ if outcome is None:
+ self._abort = _ABORTED_NO_NOTIFY
+ else:
+ termination = _constants.ABORTION_OUTCOME_TO_TICKET_TERMINATION.get(
+ outcome.kind)
+ if termination is None:
+ self._abort = _ABORTED_NO_NOTIFY
+ elif self._transmitting:
+ self._abort = _Abort(
+ _Abort.Kind.ABORTED_NOTIFY_NEEDED, termination, outcome.code,
+ outcome.details)
+ else:
ticket = links.Ticket(
self._operation_id, self._lowest_unused_sequence_number, None,
- None, None, None, None, None, None, None, code, message,
- termination, None)
+ None, None, None, None, None, None, None, outcome.code,
+ outcome.details, termination, None)
self._transmit(ticket)
+ self._abort = _ABORTED_NO_NOTIFY
diff --git a/src/python/grpcio/grpc/framework/core/_utilities.py b/src/python/grpcio/grpc/framework/core/_utilities.py
index 5b0d798751..abedc727e4 100644
--- a/src/python/grpcio/grpc/framework/core/_utilities.py
+++ b/src/python/grpcio/grpc/framework/core/_utilities.py
@@ -31,6 +31,8 @@
import collections
+from grpc.framework.interfaces.base import base
+
class ServicerPackage(
collections.namedtuple(
@@ -44,3 +46,9 @@ class ServicerPackage(
maximum_timeout: A float indicating the maximum length of time in seconds to
allow for an operation.
"""
+
+
+class Outcome(
+ base.Outcome,
+ collections.namedtuple('Outcome', ('kind', 'code', 'details',))):
+ """A trivial implementation of base.Outcome."""
diff --git a/src/python/grpcio/grpc/framework/crust/_calls.py b/src/python/grpcio/grpc/framework/crust/_calls.py
index f9077bedfe..bff940d747 100644
--- a/src/python/grpcio/grpc/framework/crust/_calls.py
+++ b/src/python/grpcio/grpc/framework/crust/_calls.py
@@ -38,10 +38,14 @@ _ITERATOR_EXCEPTION_LOG_MESSAGE = 'Exception iterating over requests!'
_EMPTY_COMPLETION = utilities.completion(None, None, None)
-def _invoke(end, group, method, timeout, initial_metadata, payload, complete):
+def _invoke(
+ end, group, method, timeout, protocol_options, initial_metadata, payload,
+ complete):
rendezvous = _control.Rendezvous(None, None)
+ subscription = utilities.full_subscription(
+ rendezvous, _control.protocol_receiver(rendezvous))
operation_context, operator = end.operate(
- group, method, utilities.full_subscription(rendezvous), timeout,
+ group, method, subscription, timeout, protocol_options=protocol_options,
initial_metadata=initial_metadata, payload=payload,
completion=_EMPTY_COMPLETION if complete else None)
rendezvous.set_operator_and_context(operator, operation_context)
@@ -93,36 +97,43 @@ def _event_return_stream(
def blocking_unary_unary(
- end, group, method, timeout, with_call, initial_metadata, payload):
+ end, group, method, timeout, with_call, protocol_options, initial_metadata,
+ payload):
"""Services in a blocking fashion a unary-unary servicer method."""
rendezvous, unused_operation_context, unused_outcome = _invoke(
- end, group, method, timeout, initial_metadata, payload, True)
+ end, group, method, timeout, protocol_options, initial_metadata, payload,
+ True)
if with_call:
- return next(rendezvous, rendezvous)
+ return next(rendezvous), rendezvous
else:
return next(rendezvous)
-def future_unary_unary(end, group, method, timeout, initial_metadata, payload):
+def future_unary_unary(
+ end, group, method, timeout, protocol_options, initial_metadata, payload):
"""Services a value-in value-out servicer method by returning a Future."""
rendezvous, unused_operation_context, unused_outcome = _invoke(
- end, group, method, timeout, initial_metadata, payload, True)
+ end, group, method, timeout, protocol_options, initial_metadata, payload,
+ True)
return rendezvous
-def inline_unary_stream(end, group, method, timeout, initial_metadata, payload):
+def inline_unary_stream(
+ end, group, method, timeout, protocol_options, initial_metadata, payload):
"""Services a value-in stream-out servicer method."""
rendezvous, unused_operation_context, unused_outcome = _invoke(
- end, group, method, timeout, initial_metadata, payload, True)
+ end, group, method, timeout, protocol_options, initial_metadata, payload,
+ True)
return rendezvous
def blocking_stream_unary(
- end, group, method, timeout, with_call, initial_metadata, payload_iterator,
- pool):
+ end, group, method, timeout, with_call, protocol_options, initial_metadata,
+ payload_iterator, pool):
"""Services in a blocking fashion a stream-in value-out servicer method."""
rendezvous, operation_context, outcome = _invoke(
- end, group, method, timeout, initial_metadata, None, False)
+ end, group, method, timeout, protocol_options, initial_metadata, None,
+ False)
if outcome is None:
def in_pool():
for payload in payload_iterator:
@@ -141,10 +152,12 @@ def blocking_stream_unary(
def future_stream_unary(
- end, group, method, timeout, initial_metadata, payload_iterator, pool):
+ end, group, method, timeout, protocol_options, initial_metadata,
+ payload_iterator, pool):
"""Services a stream-in value-out servicer method by returning a Future."""
rendezvous, operation_context, outcome = _invoke(
- end, group, method, timeout, initial_metadata, None, False)
+ end, group, method, timeout, protocol_options, initial_metadata, None,
+ False)
if outcome is None:
def in_pool():
for payload in payload_iterator:
@@ -155,10 +168,12 @@ def future_stream_unary(
def inline_stream_stream(
- end, group, method, timeout, initial_metadata, payload_iterator, pool):
+ end, group, method, timeout, protocol_options, initial_metadata,
+ payload_iterator, pool):
"""Services a stream-in stream-out servicer method."""
rendezvous, operation_context, outcome = _invoke(
- end, group, method, timeout, initial_metadata, None, False)
+ end, group, method, timeout, protocol_options, initial_metadata, None,
+ False)
if outcome is None:
def in_pool():
for payload in payload_iterator:
@@ -169,36 +184,40 @@ def inline_stream_stream(
def event_unary_unary(
- end, group, method, timeout, initial_metadata, payload, receiver,
- abortion_callback, pool):
+ end, group, method, timeout, protocol_options, initial_metadata, payload,
+ receiver, abortion_callback, pool):
rendezvous, operation_context, outcome = _invoke(
- end, group, method, timeout, initial_metadata, payload, True)
+ end, group, method, timeout, protocol_options, initial_metadata, payload,
+ True)
return _event_return_unary(
receiver, abortion_callback, rendezvous, operation_context, outcome, pool)
def event_unary_stream(
- end, group, method, timeout, initial_metadata, payload,
+ end, group, method, timeout, protocol_options, initial_metadata, payload,
receiver, abortion_callback, pool):
rendezvous, operation_context, outcome = _invoke(
- end, group, method, timeout, initial_metadata, payload, True)
+ end, group, method, timeout, protocol_options, initial_metadata, payload,
+ True)
return _event_return_stream(
receiver, abortion_callback, rendezvous, operation_context, outcome, pool)
def event_stream_unary(
- end, group, method, timeout, initial_metadata, receiver, abortion_callback,
- pool):
+ end, group, method, timeout, protocol_options, initial_metadata, receiver,
+ abortion_callback, pool):
rendezvous, operation_context, outcome = _invoke(
- end, group, method, timeout, initial_metadata, None, False)
+ end, group, method, timeout, protocol_options, initial_metadata, None,
+ False)
return _event_return_unary(
receiver, abortion_callback, rendezvous, operation_context, outcome, pool)
def event_stream_stream(
- end, group, method, timeout, initial_metadata, receiver, abortion_callback,
- pool):
+ end, group, method, timeout, protocol_options, initial_metadata, receiver,
+ abortion_callback, pool):
rendezvous, operation_context, outcome = _invoke(
- end, group, method, timeout, initial_metadata, None, False)
+ end, group, method, timeout, protocol_options, initial_metadata, None,
+ False)
return _event_return_stream(
receiver, abortion_callback, rendezvous, operation_context, outcome, pool)
diff --git a/src/python/grpcio/grpc/framework/crust/_control.py b/src/python/grpcio/grpc/framework/crust/_control.py
index 01de3c15bd..5e9efdf732 100644
--- a/src/python/grpcio/grpc/framework/crust/_control.py
+++ b/src/python/grpcio/grpc/framework/crust/_control.py
@@ -110,30 +110,31 @@ class _Termination(
_NOT_TERMINATED = _Termination(False, None, None)
-_OPERATION_OUTCOME_TO_TERMINATION_CONSTRUCTOR = {
- base.Outcome.COMPLETED: lambda *unused_args: _Termination(True, None, None),
- base.Outcome.CANCELLED: lambda *args: _Termination(
+_OPERATION_OUTCOME_KIND_TO_TERMINATION_CONSTRUCTOR = {
+ base.Outcome.Kind.COMPLETED: lambda *unused_args: _Termination(
+ True, None, None),
+ base.Outcome.Kind.CANCELLED: lambda *args: _Termination(
True, face.Abortion(face.Abortion.Kind.CANCELLED, *args),
face.CancellationError(*args)),
- base.Outcome.EXPIRED: lambda *args: _Termination(
+ base.Outcome.Kind.EXPIRED: lambda *args: _Termination(
True, face.Abortion(face.Abortion.Kind.EXPIRED, *args),
face.ExpirationError(*args)),
- base.Outcome.LOCAL_SHUTDOWN: lambda *args: _Termination(
+ base.Outcome.Kind.LOCAL_SHUTDOWN: lambda *args: _Termination(
True, face.Abortion(face.Abortion.Kind.LOCAL_SHUTDOWN, *args),
face.LocalShutdownError(*args)),
- base.Outcome.REMOTE_SHUTDOWN: lambda *args: _Termination(
+ base.Outcome.Kind.REMOTE_SHUTDOWN: lambda *args: _Termination(
True, face.Abortion(face.Abortion.Kind.REMOTE_SHUTDOWN, *args),
face.RemoteShutdownError(*args)),
- base.Outcome.RECEPTION_FAILURE: lambda *args: _Termination(
+ base.Outcome.Kind.RECEPTION_FAILURE: lambda *args: _Termination(
True, face.Abortion(face.Abortion.Kind.NETWORK_FAILURE, *args),
face.NetworkError(*args)),
- base.Outcome.TRANSMISSION_FAILURE: lambda *args: _Termination(
+ base.Outcome.Kind.TRANSMISSION_FAILURE: lambda *args: _Termination(
True, face.Abortion(face.Abortion.Kind.NETWORK_FAILURE, *args),
face.NetworkError(*args)),
- base.Outcome.LOCAL_FAILURE: lambda *args: _Termination(
+ base.Outcome.Kind.LOCAL_FAILURE: lambda *args: _Termination(
True, face.Abortion(face.Abortion.Kind.LOCAL_FAILURE, *args),
face.LocalError(*args)),
- base.Outcome.REMOTE_FAILURE: lambda *args: _Termination(
+ base.Outcome.Kind.REMOTE_FAILURE: lambda *args: _Termination(
True, face.Abortion(face.Abortion.Kind.REMOTE_FAILURE, *args),
face.RemoteError(*args)),
}
@@ -181,6 +182,8 @@ class Rendezvous(base.Operator, future.Future, stream.Consumer, face.Call):
self._operator = operator
self._operation_context = operation_context
+ self._protocol_context = _NOT_YET_ARRIVED
+
self._up_initial_metadata = _NOT_YET_ARRIVED
self._up_payload = None
self._up_allowance = 1
@@ -247,13 +250,17 @@ class Rendezvous(base.Operator, future.Future, stream.Consumer, face.Call):
else:
initial_metadata = self._up_initial_metadata.value
if self._up_completion.kind is _Awaited.Kind.NOT_YET_ARRIVED:
- terminal_metadata, code, details = None, None, None
+ terminal_metadata = None
else:
terminal_metadata = self._up_completion.value.terminal_metadata
+ if outcome.kind is base.Outcome.Kind.COMPLETED:
code = self._up_completion.value.code
details = self._up_completion.value.message
- self._termination = _OPERATION_OUTCOME_TO_TERMINATION_CONSTRUCTOR[
- outcome](initial_metadata, terminal_metadata, code, details)
+ else:
+ code = outcome.code
+ details = outcome.details
+ self._termination = _OPERATION_OUTCOME_KIND_TO_TERMINATION_CONSTRUCTOR[
+ outcome.kind](initial_metadata, terminal_metadata, code, details)
self._condition.notify_all()
@@ -437,6 +444,16 @@ class Rendezvous(base.Operator, future.Future, stream.Consumer, face.Call):
else:
return self._termination.abortion
+ def protocol_context(self):
+ with self._condition:
+ while True:
+ if self._protocol_context.kind is _Awaited.Kind.ARRIVED:
+ return self._protocol_context.value
+ elif self._termination.abortion_error is not None:
+ raise self._termination.abortion_error
+ else:
+ self._condition.wait()
+
def initial_metadata(self):
with self._condition:
while True:
@@ -509,11 +526,30 @@ class Rendezvous(base.Operator, future.Future, stream.Consumer, face.Call):
else:
self._down_details = _Transitory(_Transitory.Kind.PRESENT, details)
+ def set_protocol_context(self, protocol_context):
+ with self._condition:
+ self._protocol_context = _Awaited(
+ _Awaited.Kind.ARRIVED, protocol_context)
+ self._condition.notify_all()
+
def set_outcome(self, outcome):
with self._condition:
return self._set_outcome(outcome)
+class _ProtocolReceiver(base.ProtocolReceiver):
+
+ def __init__(self, rendezvous):
+ self._rendezvous = rendezvous
+
+ def context(self, protocol_context):
+ self._rendezvous.set_protocol_context(protocol_context)
+
+
+def protocol_receiver(rendezvous):
+ return _ProtocolReceiver(rendezvous)
+
+
def pool_wrap(behavior, operation_context):
"""Wraps an operation-related behavior so that it may be called in a pool.
diff --git a/src/python/grpcio/grpc/framework/crust/_service.py b/src/python/grpcio/grpc/framework/crust/_service.py
index 2455a58f59..9903415c09 100644
--- a/src/python/grpcio/grpc/framework/crust/_service.py
+++ b/src/python/grpcio/grpc/framework/crust/_service.py
@@ -52,6 +52,9 @@ class _ServicerContext(face.ServicerContext):
def cancel(self):
self._rendezvous.cancel()
+ def protocol_context(self):
+ return self._rendezvous.protocol_context()
+
def invocation_metadata(self):
return self._rendezvous.initial_metadata()
@@ -71,10 +74,12 @@ class _ServicerContext(face.ServicerContext):
def _adaptation(pool, in_pool):
def adaptation(operator, operation_context):
rendezvous = _control.Rendezvous(operator, operation_context)
+ subscription = utilities.full_subscription(
+ rendezvous, _control.protocol_receiver(rendezvous))
outcome = operation_context.add_termination_callback(rendezvous.set_outcome)
if outcome is None:
pool.submit(_control.pool_wrap(in_pool, operation_context), rendezvous)
- return utilities.full_subscription(rendezvous)
+ return subscription
else:
raise abandonment.Abandoned()
return adaptation
@@ -151,16 +156,18 @@ def adapt_event_stream_stream(method, pool):
def adapt_multi_method(multi_method, pool):
def adaptation(group, method, operator, operation_context):
rendezvous = _control.Rendezvous(operator, operation_context)
+ subscription = utilities.full_subscription(
+ rendezvous, _control.protocol_receiver(rendezvous))
outcome = operation_context.add_termination_callback(rendezvous.set_outcome)
if outcome is None:
def in_pool():
- request_consumer = multi_method(
+ request_consumer = multi_method.service(
group, method, rendezvous, _ServicerContext(rendezvous))
for request in rendezvous:
request_consumer.consume(request)
request_consumer.terminate()
pool.submit(_control.pool_wrap(in_pool, operation_context), rendezvous)
- return utilities.full_subscription(rendezvous)
+ return subscription
else:
raise abandonment.Abandoned()
return adaptation
diff --git a/src/python/grpcio/grpc/framework/crust/implementations.py b/src/python/grpcio/grpc/framework/crust/implementations.py
index 12f7e79641..4ebc4e9ae8 100644
--- a/src/python/grpcio/grpc/framework/crust/implementations.py
+++ b/src/python/grpcio/grpc/framework/crust/implementations.py
@@ -49,12 +49,12 @@ class _BaseServicer(base.Servicer):
return adapted_method(output_operator, context)
elif self._adapted_multi_method is not None:
try:
- return self._adapted_multi_method.service(
+ return self._adapted_multi_method(
group, method, output_operator, context)
except face.NoSuchMethodError:
- raise base.NoSuchMethodError()
+ raise base.NoSuchMethodError(None, None)
else:
- raise base.NoSuchMethodError()
+ raise base.NoSuchMethodError(None, None)
class _UnaryUnaryMultiCallable(face.UnaryUnaryMultiCallable):
@@ -66,22 +66,23 @@ class _UnaryUnaryMultiCallable(face.UnaryUnaryMultiCallable):
self._pool = pool
def __call__(
- self, request, timeout, metadata=None, with_call=False):
+ self, request, timeout, metadata=None, with_call=False,
+ protocol_options=None):
return _calls.blocking_unary_unary(
self._end, self._group, self._method, timeout, with_call,
- metadata, request)
+ protocol_options, metadata, request)
- def future(self, request, timeout, metadata=None):
+ def future(self, request, timeout, metadata=None, protocol_options=None):
return _calls.future_unary_unary(
- self._end, self._group, self._method, timeout, metadata,
- request)
+ self._end, self._group, self._method, timeout, protocol_options,
+ metadata, request)
def event(
self, request, receiver, abortion_callback, timeout,
- metadata=None):
+ metadata=None, protocol_options=None):
return _calls.event_unary_unary(
- self._end, self._group, self._method, timeout, metadata,
- request, receiver, abortion_callback, self._pool)
+ self._end, self._group, self._method, timeout, protocol_options,
+ metadata, request, receiver, abortion_callback, self._pool)
class _UnaryStreamMultiCallable(face.UnaryStreamMultiCallable):
@@ -92,17 +93,17 @@ class _UnaryStreamMultiCallable(face.UnaryStreamMultiCallable):
self._method = method
self._pool = pool
- def __call__(self, request, timeout, metadata=None):
+ def __call__(self, request, timeout, metadata=None, protocol_options=None):
return _calls.inline_unary_stream(
- self._end, self._group, self._method, timeout, metadata,
- request)
+ self._end, self._group, self._method, timeout, protocol_options,
+ metadata, request)
def event(
self, request, receiver, abortion_callback, timeout,
- metadata=None):
+ metadata=None, protocol_options=None):
return _calls.event_unary_stream(
- self._end, self._group, self._method, timeout, metadata,
- request, receiver, abortion_callback, self._pool)
+ self._end, self._group, self._method, timeout, protocol_options,
+ metadata, request, receiver, abortion_callback, self._pool)
class _StreamUnaryMultiCallable(face.StreamUnaryMultiCallable):
@@ -115,21 +116,23 @@ class _StreamUnaryMultiCallable(face.StreamUnaryMultiCallable):
def __call__(
self, request_iterator, timeout, metadata=None,
- with_call=False):
+ with_call=False, protocol_options=None):
return _calls.blocking_stream_unary(
self._end, self._group, self._method, timeout, with_call,
- metadata, request_iterator, self._pool)
+ protocol_options, metadata, request_iterator, self._pool)
- def future(self, request_iterator, timeout, metadata=None):
+ def future(
+ self, request_iterator, timeout, metadata=None, protocol_options=None):
return _calls.future_stream_unary(
- self._end, self._group, self._method, timeout, metadata,
- request_iterator, self._pool)
+ self._end, self._group, self._method, timeout, protocol_options,
+ metadata, request_iterator, self._pool)
def event(
- self, receiver, abortion_callback, timeout, metadata=None):
+ self, receiver, abortion_callback, timeout, metadata=None,
+ protocol_options=None):
return _calls.event_stream_unary(
- self._end, self._group, self._method, timeout, metadata,
- receiver, abortion_callback, self._pool)
+ self._end, self._group, self._method, timeout, protocol_options,
+ metadata, receiver, abortion_callback, self._pool)
class _StreamStreamMultiCallable(face.StreamStreamMultiCallable):
@@ -140,16 +143,18 @@ class _StreamStreamMultiCallable(face.StreamStreamMultiCallable):
self._method = method
self._pool = pool
- def __call__(self, request_iterator, timeout, metadata=None):
+ def __call__(
+ self, request_iterator, timeout, metadata=None, protocol_options=None):
return _calls.inline_stream_stream(
- self._end, self._group, self._method, timeout, metadata,
- request_iterator, self._pool)
+ self._end, self._group, self._method, timeout, protocol_options,
+ metadata, request_iterator, self._pool)
def event(
- self, receiver, abortion_callback, timeout, metadata=None):
+ self, receiver, abortion_callback, timeout, metadata=None,
+ protocol_options=None):
return _calls.event_stream_stream(
- self._end, self._group, self._method, timeout, metadata,
- receiver, abortion_callback, self._pool)
+ self._end, self._group, self._method, timeout, protocol_options,
+ metadata, receiver, abortion_callback, self._pool)
class _GenericStub(face.GenericStub):
@@ -161,66 +166,70 @@ class _GenericStub(face.GenericStub):
def blocking_unary_unary(
self, group, method, request, timeout, metadata=None,
- with_call=None):
+ with_call=None, protocol_options=None):
return _calls.blocking_unary_unary(
- self._end, group, method, timeout, with_call, metadata,
- request)
+ self._end, group, method, timeout, with_call, protocol_options,
+ metadata, request)
def future_unary_unary(
- self, group, method, request, timeout, metadata=None):
+ self, group, method, request, timeout, metadata=None,
+ protocol_options=None):
return _calls.future_unary_unary(
- self._end, group, method, timeout, metadata, request)
+ self._end, group, method, timeout, protocol_options, metadata, request)
def inline_unary_stream(
- self, group, method, request, timeout, metadata=None):
+ self, group, method, request, timeout, metadata=None,
+ protocol_options=None):
return _calls.inline_unary_stream(
- self._end, group, method, timeout, metadata, request)
+ self._end, group, method, timeout, protocol_options, metadata, request)
def blocking_stream_unary(
self, group, method, request_iterator, timeout, metadata=None,
- with_call=None):
+ with_call=None, protocol_options=None):
return _calls.blocking_stream_unary(
- self._end, group, method, timeout, with_call, metadata,
- request_iterator, self._pool)
+ self._end, group, method, timeout, with_call, protocol_options,
+ metadata, request_iterator, self._pool)
def future_stream_unary(
- self, group, method, request_iterator, timeout, metadata=None):
+ self, group, method, request_iterator, timeout, metadata=None,
+ protocol_options=None):
return _calls.future_stream_unary(
- self._end, group, method, timeout, metadata,
+ self._end, group, method, timeout, protocol_options, metadata,
request_iterator, self._pool)
def inline_stream_stream(
- self, group, method, request_iterator, timeout, metadata=None):
+ self, group, method, request_iterator, timeout, metadata=None,
+ protocol_options=None):
return _calls.inline_stream_stream(
- self._end, group, method, timeout, metadata,
+ self._end, group, method, timeout, protocol_options, metadata,
request_iterator, self._pool)
def event_unary_unary(
self, group, method, request, receiver, abortion_callback, timeout,
- metadata=None):
+ metadata=None, protocol_options=None):
return _calls.event_unary_unary(
- self._end, group, method, timeout, metadata, request,
+ self._end, group, method, timeout, protocol_options, metadata, request,
receiver, abortion_callback, self._pool)
def event_unary_stream(
self, group, method, request, receiver, abortion_callback, timeout,
- metadata=None):
+ metadata=None, protocol_options=None):
return _calls.event_unary_stream(
- self._end, group, method, timeout, metadata, request,
+ self._end, group, method, timeout, protocol_options, metadata, request,
receiver, abortion_callback, self._pool)
def event_stream_unary(
self, group, method, receiver, abortion_callback, timeout,
- metadata=None):
+ metadata=None, protocol_options=None):
return _calls.event_stream_unary(
- self._end, group, method, timeout, metadata, receiver,
+ self._end, group, method, timeout, protocol_options, metadata, receiver,
abortion_callback, self._pool)
def event_stream_stream(
self, group, method, receiver, abortion_callback, timeout,
- metadata=None):
+ metadata=None, protocol_options=None):
return _calls.event_stream_stream(
- self._end, group, method, timeout, metadata, receiver,
+ self._end, group, method, timeout, protocol_options, metadata, receiver,
abortion_callback, self._pool)
def unary_unary(self, group, method):
@@ -315,8 +324,11 @@ def servicer(method_implementations, multi_method_implementation, pool):
"""
adapted_implementations = _adapt_method_implementations(
method_implementations, pool)
- adapted_multi_method_implementation = _service.adapt_multi_method(
- multi_method_implementation, pool)
+ if multi_method_implementation is None:
+ adapted_multi_method_implementation = None
+ else:
+ adapted_multi_method_implementation = _service.adapt_multi_method(
+ multi_method_implementation, pool)
return _BaseServicer(
adapted_implementations, adapted_multi_method_implementation)
diff --git a/src/python/grpcio/grpc/framework/interfaces/base/base.py b/src/python/grpcio/grpc/framework/interfaces/base/base.py
index bc52efb4c5..a1e70be5e8 100644
--- a/src/python/grpcio/grpc/framework/interfaces/base/base.py
+++ b/src/python/grpcio/grpc/framework/interfaces/base/base.py
@@ -40,7 +40,7 @@ applications choose.
# threading is referenced from specification in this module.
import abc
import enum
-import threading
+import threading # pylint: disable=unused-import
# abandonment is referenced from specification in this module.
from grpc.framework.foundation import abandonment # pylint: disable=unused-import
@@ -69,19 +69,30 @@ class NoSuchMethodError(Exception):
self.details = details
-@enum.unique
-class Outcome(enum.Enum):
- """Operation outcomes."""
+class Outcome(object):
+ """The outcome of an operation.
- COMPLETED = 'completed'
- CANCELLED = 'cancelled'
- EXPIRED = 'expired'
- LOCAL_SHUTDOWN = 'local shutdown'
- REMOTE_SHUTDOWN = 'remote shutdown'
- RECEPTION_FAILURE = 'reception failure'
- TRANSMISSION_FAILURE = 'transmission failure'
- LOCAL_FAILURE = 'local failure'
- REMOTE_FAILURE = 'remote failure'
+ Attributes:
+ kind: A Kind value coarsely identifying how the operation terminated.
+ code: An application-specific code value or None if no such value was
+ provided.
+ details: An application-specific details value or None if no such value was
+ provided.
+ """
+
+ @enum.unique
+ class Kind(enum.Enum):
+ """Ways in which an operation can terminate."""
+
+ COMPLETED = 'completed'
+ CANCELLED = 'cancelled'
+ EXPIRED = 'expired'
+ LOCAL_SHUTDOWN = 'local shutdown'
+ REMOTE_SHUTDOWN = 'remote shutdown'
+ RECEPTION_FAILURE = 'reception failure'
+ TRANSMISSION_FAILURE = 'transmission failure'
+ LOCAL_FAILURE = 'local failure'
+ REMOTE_FAILURE = 'remote failure'
class Completion(object):
@@ -173,6 +184,19 @@ class Operator(object):
"""
raise NotImplementedError()
+class ProtocolReceiver(object):
+ """A means of receiving protocol values during an operation."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def context(self, protocol_context):
+ """Accepts the protocol context object for the operation.
+
+ Args:
+ protocol_context: The protocol context object for the operation.
+ """
+ raise NotImplementedError()
+
class Subscription(object):
"""Describes customer code's interest in values from the other side.
@@ -188,7 +212,11 @@ class Subscription(object):
otherwise.
operator: An Operator to be passed values from the other side of the
operation. Must be non-None if kind is Kind.FULL. Must be None otherwise.
+ protocol_receiver: A ProtocolReceiver to be passed protocol objects as they
+ become available during the operation. Must be non-None if kind is
+ Kind.FULL.
"""
+ __metaclass__ = abc.ABCMeta
@enum.unique
class Kind(enum.Enum):
@@ -263,7 +291,7 @@ class End(object):
@abc.abstractmethod
def operate(
self, group, method, subscription, timeout, initial_metadata=None,
- payload=None, completion=None):
+ payload=None, completion=None, protocol_options=None):
"""Commences an operation.
Args:
@@ -279,6 +307,8 @@ class End(object):
payload: An initial payload for the operation.
completion: A Completion value indicating the end of transmission to the
other side of the operation.
+ protocol_options: A value specified by the provider of a Base interface
+ implementation affording custom state and behavior.
Returns:
A pair of objects affording information about the operation and action
@@ -294,8 +324,8 @@ class End(object):
"""Reports the number of terminated operations broken down by outcome.
Returns:
- A dictionary from Outcome value to an integer identifying the number
- of operations that terminated with that outcome.
+ A dictionary from Outcome.Kind value to an integer identifying the number
+ of operations that terminated with that outcome kind.
"""
raise NotImplementedError()
diff --git a/src/python/grpcio/grpc/framework/interfaces/base/utilities.py b/src/python/grpcio/grpc/framework/interfaces/base/utilities.py
index a9ee1a0981..87a85018f5 100644
--- a/src/python/grpcio/grpc/framework/interfaces/base/utilities.py
+++ b/src/python/grpcio/grpc/framework/interfaces/base/utilities.py
@@ -45,11 +45,12 @@ class _Subscription(
base.Subscription,
collections.namedtuple(
'_Subscription',
- ('kind', 'termination_callback', 'allowance', 'operator',))):
+ ('kind', 'termination_callback', 'allowance', 'operator',
+ 'protocol_receiver',))):
"""A trivial implementation of base.Subscription."""
_NONE_SUBSCRIPTION = _Subscription(
- base.Subscription.Kind.NONE, None, None, None)
+ base.Subscription.Kind.NONE, None, None, None, None)
def completion(terminal_metadata, code, message):
@@ -66,14 +67,16 @@ def completion(terminal_metadata, code, message):
return _Completion(terminal_metadata, code, message)
-def full_subscription(operator):
+def full_subscription(operator, protocol_receiver):
"""Creates a "full" base.Subscription for the given base.Operator.
Args:
operator: A base.Operator to be used in an operation.
+ protocol_receiver: A base.ProtocolReceiver to be used in an operation.
Returns:
A base.Subscription of kind base.Subscription.Kind.FULL wrapping the given
- base.Operator.
+ base.Operator and base.ProtocolReceiver.
"""
- return _Subscription(base.Subscription.Kind.FULL, None, None, operator)
+ return _Subscription(
+ base.Subscription.Kind.FULL, None, None, operator, protocol_receiver)
diff --git a/src/python/grpcio/grpc/framework/interfaces/face/face.py b/src/python/grpcio/grpc/framework/interfaces/face/face.py
index 948e7505b6..bc9a434a76 100644
--- a/src/python/grpcio/grpc/framework/interfaces/face/face.py
+++ b/src/python/grpcio/grpc/framework/interfaces/face/face.py
@@ -184,6 +184,16 @@ class RpcContext(object):
"""
raise NotImplementedError()
+ @abc.abstractmethod
+ def protocol_context(self):
+ """Accesses a custom object specified by an implementation provider.
+
+ Returns:
+ A value specified by the provider of a Face interface implementation
+ affording custom state and behavior.
+ """
+ raise NotImplementedError()
+
class Call(RpcContext):
"""Invocation-side utility object for an RPC."""
@@ -354,7 +364,8 @@ class UnaryUnaryMultiCallable(object):
@abc.abstractmethod
def __call__(
- self, request, timeout, metadata=None, with_call=False):
+ self, request, timeout, metadata=None, with_call=False,
+ protocol_options=None):
"""Synchronously invokes the underlying RPC.
Args:
@@ -364,6 +375,8 @@ class UnaryUnaryMultiCallable(object):
the RPC.
with_call: Whether or not to include return a Call for the RPC in addition
to the reponse.
+ protocol_options: A value specified by the provider of a Face interface
+ implementation affording custom state and behavior.
Returns:
The response value for the RPC, and a Call for the RPC if with_call was
@@ -375,7 +388,7 @@ class UnaryUnaryMultiCallable(object):
raise NotImplementedError()
@abc.abstractmethod
- def future(self, request, timeout, metadata=None):
+ def future(self, request, timeout, metadata=None, protocol_options=None):
"""Asynchronously invokes the underlying RPC.
Args:
@@ -383,6 +396,8 @@ class UnaryUnaryMultiCallable(object):
timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of
the RPC.
+ protocol_options: A value specified by the provider of a Face interface
+ implementation affording custom state and behavior.
Returns:
An object that is both a Call for the RPC and a future.Future. In the
@@ -395,7 +410,7 @@ class UnaryUnaryMultiCallable(object):
@abc.abstractmethod
def event(
self, request, receiver, abortion_callback, timeout,
- metadata=None):
+ metadata=None, protocol_options=None):
"""Asynchronously invokes the underlying RPC.
Args:
@@ -406,6 +421,8 @@ class UnaryUnaryMultiCallable(object):
timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of
the RPC.
+ protocol_options: A value specified by the provider of a Face interface
+ implementation affording custom state and behavior.
Returns:
A Call for the RPC.
@@ -418,7 +435,7 @@ class UnaryStreamMultiCallable(object):
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
- def __call__(self, request, timeout, metadata=None):
+ def __call__(self, request, timeout, metadata=None, protocol_options=None):
"""Invokes the underlying RPC.
Args:
@@ -426,6 +443,8 @@ class UnaryStreamMultiCallable(object):
timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of
the RPC.
+ protocol_options: A value specified by the provider of a Face interface
+ implementation affording custom state and behavior.
Returns:
An object that is both a Call for the RPC and an iterator of response
@@ -437,7 +456,7 @@ class UnaryStreamMultiCallable(object):
@abc.abstractmethod
def event(
self, request, receiver, abortion_callback, timeout,
- metadata=None):
+ metadata=None, protocol_options=None):
"""Asynchronously invokes the underlying RPC.
Args:
@@ -448,6 +467,8 @@ class UnaryStreamMultiCallable(object):
timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of
the RPC.
+ protocol_options: A value specified by the provider of a Face interface
+ implementation affording custom state and behavior.
Returns:
A Call object for the RPC.
@@ -462,7 +483,7 @@ class StreamUnaryMultiCallable(object):
@abc.abstractmethod
def __call__(
self, request_iterator, timeout, metadata=None,
- with_call=False):
+ with_call=False, protocol_options=None):
"""Synchronously invokes the underlying RPC.
Args:
@@ -472,6 +493,8 @@ class StreamUnaryMultiCallable(object):
the RPC.
with_call: Whether or not to include return a Call for the RPC in addition
to the reponse.
+ protocol_options: A value specified by the provider of a Face interface
+ implementation affording custom state and behavior.
Returns:
The response value for the RPC, and a Call for the RPC if with_call was
@@ -483,7 +506,8 @@ class StreamUnaryMultiCallable(object):
raise NotImplementedError()
@abc.abstractmethod
- def future(self, request_iterator, timeout, metadata=None):
+ def future(
+ self, request_iterator, timeout, metadata=None, protocol_options=None):
"""Asynchronously invokes the underlying RPC.
Args:
@@ -491,6 +515,8 @@ class StreamUnaryMultiCallable(object):
timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of
the RPC.
+ protocol_options: A value specified by the provider of a Face interface
+ implementation affording custom state and behavior.
Returns:
An object that is both a Call for the RPC and a future.Future. In the
@@ -502,7 +528,8 @@ class StreamUnaryMultiCallable(object):
@abc.abstractmethod
def event(
- self, receiver, abortion_callback, timeout, metadata=None):
+ self, receiver, abortion_callback, timeout, metadata=None,
+ protocol_options=None):
"""Asynchronously invokes the underlying RPC.
Args:
@@ -512,6 +539,8 @@ class StreamUnaryMultiCallable(object):
timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of
the RPC.
+ protocol_options: A value specified by the provider of a Face interface
+ implementation affording custom state and behavior.
Returns:
A single object that is both a Call object for the RPC and a
@@ -525,7 +554,8 @@ class StreamStreamMultiCallable(object):
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
- def __call__(self, request_iterator, timeout, metadata=None):
+ def __call__(
+ self, request_iterator, timeout, metadata=None, protocol_options=None):
"""Invokes the underlying RPC.
Args:
@@ -533,6 +563,8 @@ class StreamStreamMultiCallable(object):
timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of
the RPC.
+ protocol_options: A value specified by the provider of a Face interface
+ implementation affording custom state and behavior.
Returns:
An object that is both a Call for the RPC and an iterator of response
@@ -543,7 +575,8 @@ class StreamStreamMultiCallable(object):
@abc.abstractmethod
def event(
- self, receiver, abortion_callback, timeout, metadata=None):
+ self, receiver, abortion_callback, timeout, metadata=None,
+ protocol_options=None):
"""Asynchronously invokes the underlying RPC.
Args:
@@ -553,6 +586,8 @@ class StreamStreamMultiCallable(object):
timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of
the RPC.
+ protocol_options: A value specified by the provider of a Face interface
+ implementation affording custom state and behavior.
Returns:
A single object that is both a Call object for the RPC and a
@@ -646,7 +681,7 @@ class GenericStub(object):
@abc.abstractmethod
def blocking_unary_unary(
self, group, method, request, timeout, metadata=None,
- with_call=False):
+ with_call=False, protocol_options=None):
"""Invokes a unary-request-unary-response method.
This method blocks until either returning the response value of the RPC
@@ -661,6 +696,8 @@ class GenericStub(object):
metadata: A metadata value to be passed to the service-side of the RPC.
with_call: Whether or not to include return a Call for the RPC in addition
to the reponse.
+ protocol_options: A value specified by the provider of a Face interface
+ implementation affording custom state and behavior.
Returns:
The response value for the RPC, and a Call for the RPC if with_call was
@@ -673,7 +710,8 @@ class GenericStub(object):
@abc.abstractmethod
def future_unary_unary(
- self, group, method, request, timeout, metadata=None):
+ self, group, method, request, timeout, metadata=None,
+ protocol_options=None):
"""Invokes a unary-request-unary-response method.
Args:
@@ -682,6 +720,8 @@ class GenericStub(object):
request: The request value for the RPC.
timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of the RPC.
+ protocol_options: A value specified by the provider of a Face interface
+ implementation affording custom state and behavior.
Returns:
An object that is both a Call for the RPC and a future.Future. In the
@@ -693,7 +733,8 @@ class GenericStub(object):
@abc.abstractmethod
def inline_unary_stream(
- self, group, method, request, timeout, metadata=None):
+ self, group, method, request, timeout, metadata=None,
+ protocol_options=None):
"""Invokes a unary-request-stream-response method.
Args:
@@ -702,6 +743,8 @@ class GenericStub(object):
request: The request value for the RPC.
timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of the RPC.
+ protocol_options: A value specified by the provider of a Face interface
+ implementation affording custom state and behavior.
Returns:
An object that is both a Call for the RPC and an iterator of response
@@ -713,7 +756,7 @@ class GenericStub(object):
@abc.abstractmethod
def blocking_stream_unary(
self, group, method, request_iterator, timeout, metadata=None,
- with_call=False):
+ with_call=False, protocol_options=None):
"""Invokes a stream-request-unary-response method.
This method blocks until either returning the response value of the RPC
@@ -728,6 +771,8 @@ class GenericStub(object):
metadata: A metadata value to be passed to the service-side of the RPC.
with_call: Whether or not to include return a Call for the RPC in addition
to the reponse.
+ protocol_options: A value specified by the provider of a Face interface
+ implementation affording custom state and behavior.
Returns:
The response value for the RPC, and a Call for the RPC if with_call was
@@ -740,7 +785,8 @@ class GenericStub(object):
@abc.abstractmethod
def future_stream_unary(
- self, group, method, request_iterator, timeout, metadata=None):
+ self, group, method, request_iterator, timeout, metadata=None,
+ protocol_options=None):
"""Invokes a stream-request-unary-response method.
Args:
@@ -749,6 +795,8 @@ class GenericStub(object):
request_iterator: An iterator that yields request values for the RPC.
timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of the RPC.
+ protocol_options: A value specified by the provider of a Face interface
+ implementation affording custom state and behavior.
Returns:
An object that is both a Call for the RPC and a future.Future. In the
@@ -760,7 +808,8 @@ class GenericStub(object):
@abc.abstractmethod
def inline_stream_stream(
- self, group, method, request_iterator, timeout, metadata=None):
+ self, group, method, request_iterator, timeout, metadata=None,
+ protocol_options=None):
"""Invokes a stream-request-stream-response method.
Args:
@@ -769,6 +818,8 @@ class GenericStub(object):
request_iterator: An iterator that yields request values for the RPC.
timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of the RPC.
+ protocol_options: A value specified by the provider of a Face interface
+ implementation affording custom state and behavior.
Returns:
An object that is both a Call for the RPC and an iterator of response
@@ -780,7 +831,7 @@ class GenericStub(object):
@abc.abstractmethod
def event_unary_unary(
self, group, method, request, receiver, abortion_callback, timeout,
- metadata=None):
+ metadata=None, protocol_options=None):
"""Event-driven invocation of a unary-request-unary-response method.
Args:
@@ -792,6 +843,8 @@ class GenericStub(object):
in the event of RPC abortion.
timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of the RPC.
+ protocol_options: A value specified by the provider of a Face interface
+ implementation affording custom state and behavior.
Returns:
A Call for the RPC.
@@ -801,7 +854,7 @@ class GenericStub(object):
@abc.abstractmethod
def event_unary_stream(
self, group, method, request, receiver, abortion_callback, timeout,
- metadata=None):
+ metadata=None, protocol_options=None):
"""Event-driven invocation of a unary-request-stream-response method.
Args:
@@ -813,6 +866,8 @@ class GenericStub(object):
in the event of RPC abortion.
timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of the RPC.
+ protocol_options: A value specified by the provider of a Face interface
+ implementation affording custom state and behavior.
Returns:
A Call for the RPC.
@@ -822,7 +877,7 @@ class GenericStub(object):
@abc.abstractmethod
def event_stream_unary(
self, group, method, receiver, abortion_callback, timeout,
- metadata=None):
+ metadata=None, protocol_options=None):
"""Event-driven invocation of a unary-request-unary-response method.
Args:
@@ -833,6 +888,8 @@ class GenericStub(object):
in the event of RPC abortion.
timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of the RPC.
+ protocol_options: A value specified by the provider of a Face interface
+ implementation affording custom state and behavior.
Returns:
A pair of a Call object for the RPC and a stream.Consumer to which the
@@ -843,7 +900,7 @@ class GenericStub(object):
@abc.abstractmethod
def event_stream_stream(
self, group, method, receiver, abortion_callback, timeout,
- metadata=None):
+ metadata=None, protocol_options=None):
"""Event-driven invocation of a unary-request-stream-response method.
Args:
@@ -854,6 +911,8 @@ class GenericStub(object):
in the event of RPC abortion.
timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of the RPC.
+ protocol_options: A value specified by the provider of a Face interface
+ implementation affording custom state and behavior.
Returns:
A pair of a Call object for the RPC and a stream.Consumer to which the
diff --git a/src/python/grpcio/grpc/framework/interfaces/links/links.py b/src/python/grpcio/grpc/framework/interfaces/links/links.py
index b98a30a399..24f0e3b354 100644
--- a/src/python/grpcio/grpc/framework/interfaces/links/links.py
+++ b/src/python/grpcio/grpc/framework/interfaces/links/links.py
@@ -34,14 +34,13 @@ import collections
import enum
-class Transport(collections.namedtuple('Transport', ('kind', 'value',))):
- """A sum type for handles to an underlying transport system.
+class Protocol(collections.namedtuple('Protocol', ('kind', 'value',))):
+ """A sum type for handles to a system that transmits tickets.
Attributes:
- kind: A Kind value identifying the kind of value being passed to or from
- the underlying transport.
- value: The value being passed through RPC Framework between the high-level
- application and the underlying transport.
+ kind: A Kind value identifying the kind of value being passed.
+ value: The value being passed between the high-level application and the
+ system affording ticket transport.
"""
@enum.unique
@@ -56,8 +55,7 @@ class Ticket(
'Ticket',
('operation_id', 'sequence_number', 'group', 'method', 'subscription',
'timeout', 'allowance', 'initial_metadata', 'payload',
- 'terminal_metadata', 'code', 'message', 'termination',
- 'transport',))):
+ 'terminal_metadata', 'code', 'message', 'termination', 'protocol',))):
"""A sum type for all values sent from a front to a back.
Attributes:
@@ -99,8 +97,8 @@ class Ticket(
termination: A Termination value describing the end of the operation, or
None if the operation has not yet terminated. If set, no further tickets
may be sent in the same direction.
- transport: A Transport value or None, with further semantics being a matter
- between high-level application and underlying transport.
+ protocol: A Protocol value or None, with further semantics being a matter
+ between high-level application and underlying ticket transport.
"""
@enum.unique
diff --git a/src/python/grpcio/requirements.txt b/src/python/grpcio/requirements.txt
index 43395df03b..608ba402e0 100644
--- a/src/python/grpcio/requirements.txt
+++ b/src/python/grpcio/requirements.txt
@@ -1,3 +1,2 @@
enum34==1.0.4
futures==2.2.0
-protobuf==3.0.0a3
diff --git a/src/python/grpcio/setup.py b/src/python/grpcio/setup.py
index caa71a4f7c..151b2bfcb4 100644
--- a/src/python/grpcio/setup.py
+++ b/src/python/grpcio/setup.py
@@ -104,7 +104,7 @@ _COMMAND_CLASS = {
setuptools.setup(
name='grpcio',
- version='0.10.0a0',
+ version='0.11.0b0',
ext_modules=_EXTENSION_MODULES,
packages=list(_PACKAGES),
package_dir=_PACKAGE_DIRECTORIES,
diff --git a/src/python/grpcio_health_checking/setup.py b/src/python/grpcio_health_checking/setup.py
index fcde0dab8c..35253ba312 100644
--- a/src/python/grpcio_health_checking/setup.py
+++ b/src/python/grpcio_health_checking/setup.py
@@ -51,7 +51,7 @@ _PACKAGE_DIRECTORIES = {
}
_INSTALL_REQUIRES = (
- 'grpcio>=0.10.0a0',
+ 'grpcio>=0.11.0b0',
)
_SETUP_REQUIRES = _INSTALL_REQUIRES
@@ -63,7 +63,7 @@ _COMMAND_CLASS = {
setuptools.setup(
name='grpcio_health_checking',
- version='0.10.0a0',
+ version='0.11.0b0',
packages=list(_PACKAGES),
package_dir=_PACKAGE_DIRECTORIES,
install_requires=_INSTALL_REQUIRES,
diff --git a/src/python/grpcio_test/grpc_protoc_plugin/beta_python_plugin_test.py b/src/python/grpcio_test/grpc_protoc_plugin/beta_python_plugin_test.py
index 4c8c64b06d..259b978de2 100644
--- a/src/python/grpcio_test/grpc_protoc_plugin/beta_python_plugin_test.py
+++ b/src/python/grpcio_test/grpc_protoc_plugin/beta_python_plugin_test.py
@@ -42,7 +42,7 @@ import threading
import time
import unittest
-from grpc.beta import beta
+from grpc.beta import implementations
from grpc.framework.foundation import future
from grpc.framework.interfaces.face import face
from grpc_test.framework.common import test_constants
@@ -170,7 +170,7 @@ def _CreateService(test_pb2):
server = getattr(test_pb2, SERVER_FACTORY_IDENTIFIER)(servicer)
port = server.add_insecure_port('[::]:0')
server.start()
- channel = beta.create_insecure_channel('localhost', port)
+ channel = implementations.insecure_channel('localhost', port)
stub = getattr(test_pb2, STUB_FACTORY_IDENTIFIER)(channel)
yield servicer_methods, stub
server.stop(0)
diff --git a/src/python/grpcio_test/grpc_test/_adapter/_intermediary_low_test.py b/src/python/grpcio_test/grpc_test/_adapter/_intermediary_low_test.py
index 27a5b82e9c..90ad0b9bcb 100644
--- a/src/python/grpcio_test/grpc_test/_adapter/_intermediary_low_test.py
+++ b/src/python/grpcio_test/grpc_test/_adapter/_intermediary_low_test.py
@@ -191,7 +191,7 @@ class EchoTest(unittest.TestCase):
metadata[server_leading_binary_metadata_key])
for datum in test_data:
- client_call.write(datum, write_tag)
+ client_call.write(datum, write_tag, _low.WriteFlags.WRITE_NO_COMPRESS)
write_accepted = self.client_events.get()
self.assertIsNotNone(write_accepted)
self.assertIs(write_accepted.kind, _low.Event.Kind.WRITE_ACCEPTED)
@@ -206,7 +206,7 @@ class EchoTest(unittest.TestCase):
self.assertIsNotNone(read_accepted.bytes)
server_data.append(read_accepted.bytes)
- server_call.write(read_accepted.bytes, write_tag)
+ server_call.write(read_accepted.bytes, write_tag, 0)
write_accepted = self.server_events.get()
self.assertIsNotNone(write_accepted)
self.assertEqual(_low.Event.Kind.WRITE_ACCEPTED, write_accepted.kind)
@@ -370,14 +370,14 @@ class CancellationTest(unittest.TestCase):
self.assertIsNotNone(metadata_accepted)
for datum in test_data:
- client_call.write(datum, write_tag)
+ client_call.write(datum, write_tag, 0)
write_accepted = self.client_events.get()
server_call.read(read_tag)
read_accepted = self.server_events.get()
server_data.append(read_accepted.bytes)
- server_call.write(read_accepted.bytes, write_tag)
+ server_call.write(read_accepted.bytes, write_tag, 0)
write_accepted = self.server_events.get()
self.assertIsNotNone(write_accepted)
diff --git a/src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py b/src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py
index f0bd989ea6..cafb6b6eae 100644
--- a/src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py
+++ b/src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py
@@ -38,6 +38,7 @@ import unittest
from grpc._adapter import _intermediary_low
from grpc._links import invocation
from grpc._links import service
+from grpc.beta import interfaces as beta_interfaces
from grpc.framework.core import implementations
from grpc.framework.interfaces.base import utilities
from grpc_test import test_common as grpc_test_common
@@ -45,8 +46,6 @@ from grpc_test.framework.common import test_constants
from grpc_test.framework.interfaces.base import test_cases
from grpc_test.framework.interfaces.base import test_interfaces
-_CODE = _intermediary_low.Code.OK
-
class _SerializationBehaviors(
collections.namedtuple(
@@ -124,8 +123,8 @@ class _Implementation(test_interfaces.Implementation):
def service_completion(self):
return utilities.completion(
- grpc_test_common.SERVICE_TERMINAL_METADATA, _CODE,
- grpc_test_common.DETAILS)
+ grpc_test_common.SERVICE_TERMINAL_METADATA,
+ beta_interfaces.StatusCode.OK, grpc_test_common.DETAILS)
def metadata_transmitted(self, original_metadata, transmitted_metadata):
return original_metadata is None or grpc_test_common.metadata_transmitted(
diff --git a/src/python/grpcio_test/grpc_test/_crust_over_core_over_links_face_interface_test.py b/src/python/grpcio_test/grpc_test/_crust_over_core_over_links_face_interface_test.py
index 28c0619f7c..a4d4dee38c 100644
--- a/src/python/grpcio_test/grpc_test/_crust_over_core_over_links_face_interface_test.py
+++ b/src/python/grpcio_test/grpc_test/_crust_over_core_over_links_face_interface_test.py
@@ -35,6 +35,7 @@ import unittest
from grpc._adapter import _intermediary_low
from grpc._links import invocation
from grpc._links import service
+from grpc.beta import interfaces as beta_interfaces
from grpc.framework.core import implementations as core_implementations
from grpc.framework.crust import implementations as crust_implementations
from grpc.framework.foundation import logging_pool
@@ -139,7 +140,7 @@ class _Implementation(test_interfaces.Implementation):
return grpc_test_common.SERVICE_TERMINAL_METADATA
def code(self):
- return _intermediary_low.Code.OK
+ return beta_interfaces.StatusCode.OK
def details(self):
return grpc_test_common.DETAILS
diff --git a/src/python/grpcio_test/grpc_test/_links/_transmission_test.py b/src/python/grpcio_test/grpc_test/_links/_transmission_test.py
index 716323cc20..77e83d5561 100644
--- a/src/python/grpcio_test/grpc_test/_links/_transmission_test.py
+++ b/src/python/grpcio_test/grpc_test/_links/_transmission_test.py
@@ -34,6 +34,7 @@ import unittest
from grpc._adapter import _intermediary_low
from grpc._links import invocation
from grpc._links import service
+from grpc.beta import interfaces as beta_interfaces
from grpc.framework.interfaces.links import links
from grpc_test import test_common
from grpc_test._links import _proto_scenarios
@@ -93,7 +94,8 @@ class TransmissionTest(test_cases.TransmissionTest, unittest.TestCase):
return None, None
def create_service_completion(self):
- return _intermediary_low.Code.OK, 'An exuberant test "details" message!'
+ return (
+ beta_interfaces.StatusCode.OK, b'An exuberant test "details" message!')
def assertMetadataTransmitted(self, original_metadata, transmitted_metadata):
self.assertTrue(
@@ -110,7 +112,7 @@ class RoundTripTest(unittest.TestCase):
test_group = 'test package.Test Group'
test_method = 'test method'
identity_transformation = {(test_group, test_method): _IDENTITY}
- test_code = _intermediary_low.Code.OK
+ test_code = beta_interfaces.StatusCode.OK
test_message = 'a test message'
service_link = service.service_link(
@@ -150,11 +152,13 @@ class RoundTripTest(unittest.TestCase):
self.assertIs(
invocation_mate.tickets()[-1].termination,
links.Ticket.Termination.COMPLETION)
+ self.assertIs(invocation_mate.tickets()[-1].code, test_code)
+ self.assertEqual(invocation_mate.tickets()[-1].message, test_message)
def _perform_scenario_test(self, scenario):
test_operation_id = object()
test_group, test_method = scenario.group_and_method()
- test_code = _intermediary_low.Code.OK
+ test_code = beta_interfaces.StatusCode.OK
test_message = 'a scenario test message'
service_link = service.service_link(
diff --git a/src/python/grpcio_test/grpc_test/beta/_beta_features_test.py b/src/python/grpcio_test/grpc_test/beta/_beta_features_test.py
new file mode 100644
index 0000000000..fad57da9d0
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/beta/_beta_features_test.py
@@ -0,0 +1,232 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Tests Face interface compliance of the gRPC Python Beta API."""
+
+import threading
+import unittest
+
+from grpc.beta import implementations
+from grpc.beta import interfaces
+from grpc.framework.common import cardinality
+from grpc.framework.interfaces.face import utilities
+from grpc_test import resources
+from grpc_test.beta import test_utilities
+from grpc_test.framework.common import test_constants
+
+_SERVER_HOST_OVERRIDE = 'foo.test.google.fr'
+
+_GROUP = 'group'
+_UNARY_UNARY = 'unary-unary'
+_UNARY_STREAM = 'unary-stream'
+_STREAM_UNARY = 'stream-unary'
+_STREAM_STREAM = 'stream-stream'
+
+_REQUEST = b'abc'
+_RESPONSE = b'123'
+
+
+class _Servicer(object):
+
+ def __init__(self):
+ self._condition = threading.Condition()
+ self._peer = None
+ self._serviced = False
+
+ def unary_unary(self, request, context):
+ with self._condition:
+ self._request = request
+ self._peer = context.protocol_context().peer()
+ context.protocol_context().disable_next_response_compression()
+ self._serviced = True
+ self._condition.notify_all()
+ return _RESPONSE
+
+ def unary_stream(self, request, context):
+ with self._condition:
+ self._request = request
+ self._peer = context.protocol_context().peer()
+ context.protocol_context().disable_next_response_compression()
+ self._serviced = True
+ self._condition.notify_all()
+ return
+ yield
+
+ def stream_unary(self, request_iterator, context):
+ for request in request_iterator:
+ self._request = request
+ with self._condition:
+ self._peer = context.protocol_context().peer()
+ context.protocol_context().disable_next_response_compression()
+ self._serviced = True
+ self._condition.notify_all()
+ return _RESPONSE
+
+ def stream_stream(self, request_iterator, context):
+ for request in request_iterator:
+ with self._condition:
+ self._peer = context.protocol_context().peer()
+ context.protocol_context().disable_next_response_compression()
+ yield _RESPONSE
+ with self._condition:
+ self._serviced = True
+ self._condition.notify_all()
+
+ def peer(self):
+ with self._condition:
+ return self._peer
+
+ def block_until_serviced(self):
+ with self._condition:
+ while not self._serviced:
+ self._condition.wait()
+
+
+class _BlockingIterator(object):
+
+ def __init__(self, upstream):
+ self._condition = threading.Condition()
+ self._upstream = upstream
+ self._allowed = []
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ with self._condition:
+ while True:
+ if self._allowed is None:
+ raise StopIteration()
+ elif self._allowed:
+ return self._allowed.pop(0)
+ else:
+ self._condition.wait()
+
+ def allow(self):
+ with self._condition:
+ try:
+ self._allowed.append(next(self._upstream))
+ except StopIteration:
+ self._allowed = None
+ self._condition.notify_all()
+
+
+class BetaFeaturesTest(unittest.TestCase):
+
+ def setUp(self):
+ self._servicer = _Servicer()
+ method_implementations = {
+ (_GROUP, _UNARY_UNARY):
+ utilities.unary_unary_inline(self._servicer.unary_unary),
+ (_GROUP, _UNARY_STREAM):
+ utilities.unary_stream_inline(self._servicer.unary_stream),
+ (_GROUP, _STREAM_UNARY):
+ utilities.stream_unary_inline(self._servicer.stream_unary),
+ (_GROUP, _STREAM_STREAM):
+ utilities.stream_stream_inline(self._servicer.stream_stream),
+ }
+
+ cardinalities = {
+ _UNARY_UNARY: cardinality.Cardinality.UNARY_UNARY,
+ _UNARY_STREAM: cardinality.Cardinality.UNARY_STREAM,
+ _STREAM_UNARY: cardinality.Cardinality.STREAM_UNARY,
+ _STREAM_STREAM: cardinality.Cardinality.STREAM_STREAM,
+ }
+
+ server_options = implementations.server_options(
+ thread_pool_size=test_constants.POOL_SIZE)
+ self._server = implementations.server(
+ method_implementations, options=server_options)
+ server_credentials = implementations.ssl_server_credentials(
+ [(resources.private_key(), resources.certificate_chain(),),])
+ port = self._server.add_secure_port('[::]:0', server_credentials)
+ self._server.start()
+ self._client_credentials = implementations.ssl_client_credentials(
+ resources.test_root_certificates(), None, None)
+ channel = test_utilities.not_really_secure_channel(
+ 'localhost', port, self._client_credentials, _SERVER_HOST_OVERRIDE)
+ stub_options = implementations.stub_options(
+ thread_pool_size=test_constants.POOL_SIZE)
+ self._dynamic_stub = implementations.dynamic_stub(
+ channel, _GROUP, cardinalities, options=stub_options)
+
+ def tearDown(self):
+ self._dynamic_stub = None
+ self._server.stop(test_constants.SHORT_TIMEOUT).wait()
+
+ def test_unary_unary(self):
+ call_options = interfaces.grpc_call_options(
+ disable_compression=True, credentials=self._client_credentials)
+ response = getattr(self._dynamic_stub, _UNARY_UNARY)(
+ _REQUEST, test_constants.LONG_TIMEOUT, protocol_options=call_options)
+ self.assertEqual(_RESPONSE, response)
+ self.assertIsNotNone(self._servicer.peer())
+
+ def test_unary_stream(self):
+ call_options = interfaces.grpc_call_options(
+ disable_compression=True, credentials=self._client_credentials)
+ response_iterator = getattr(self._dynamic_stub, _UNARY_STREAM)(
+ _REQUEST, test_constants.LONG_TIMEOUT, protocol_options=call_options)
+ self._servicer.block_until_serviced()
+ self.assertIsNotNone(self._servicer.peer())
+
+ def test_stream_unary(self):
+ call_options = interfaces.grpc_call_options(
+ credentials=self._client_credentials)
+ request_iterator = _BlockingIterator(iter((_REQUEST,)))
+ response_future = getattr(self._dynamic_stub, _STREAM_UNARY).future(
+ request_iterator, test_constants.LONG_TIMEOUT,
+ protocol_options=call_options)
+ response_future.protocol_context().disable_next_request_compression()
+ request_iterator.allow()
+ response_future.protocol_context().disable_next_request_compression()
+ request_iterator.allow()
+ self._servicer.block_until_serviced()
+ self.assertIsNotNone(self._servicer.peer())
+ self.assertEqual(_RESPONSE, response_future.result())
+
+ def test_stream_stream(self):
+ call_options = interfaces.grpc_call_options(
+ credentials=self._client_credentials)
+ request_iterator = _BlockingIterator(iter((_REQUEST,)))
+ response_iterator = getattr(self._dynamic_stub, _STREAM_STREAM)(
+ request_iterator, test_constants.SHORT_TIMEOUT,
+ protocol_options=call_options)
+ response_iterator.protocol_context().disable_next_request_compression()
+ request_iterator.allow()
+ response = next(response_iterator)
+ response_iterator.protocol_context().disable_next_request_compression()
+ request_iterator.allow()
+ self._servicer.block_until_serviced()
+ self.assertIsNotNone(self._servicer.peer())
+ self.assertEqual(_RESPONSE, response)
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_test/grpc_test/beta/_connectivity_channel_test.py b/src/python/grpcio_test/grpc_test/beta/_connectivity_channel_test.py
index 038464889d..b3c05bdb0c 100644
--- a/src/python/grpcio_test/grpc_test/beta/_connectivity_channel_test.py
+++ b/src/python/grpcio_test/grpc_test/beta/_connectivity_channel_test.py
@@ -36,14 +36,9 @@ import unittest
from grpc._adapter import _low
from grpc._adapter import _types
from grpc.beta import _connectivity_channel
+from grpc.beta import interfaces
from grpc_test.framework.common import test_constants
-_MAPPING_FUNCTION = lambda integer: integer * 200 + 17
-_MAPPING = {
- state: _MAPPING_FUNCTION(state) for state in _types.ConnectivityState}
-_IDLE, _CONNECTING, _READY, _TRANSIENT_FAILURE, _FATAL_FAILURE = map(
- _MAPPING_FUNCTION, _types.ConnectivityState)
-
def _drive_completion_queue(completion_queue):
while True:
@@ -84,7 +79,7 @@ class ChannelConnectivityTest(unittest.TestCase):
callback = _Callback()
connectivity_channel = _connectivity_channel.ConnectivityChannel(
- low_channel, _MAPPING)
+ low_channel)
connectivity_channel.subscribe(callback.update, try_to_connect=False)
first_connectivities = callback.block_until_connectivities_satisfy(bool)
connectivity_channel.subscribe(callback.update, try_to_connect=True)
@@ -98,11 +93,16 @@ class ChannelConnectivityTest(unittest.TestCase):
connectivity_channel.unsubscribe(callback.update)
fifth_connectivities = callback.connectivities()
- self.assertSequenceEqual((_IDLE,), first_connectivities)
- self.assertNotIn(_READY, second_connectivities)
- self.assertNotIn(_READY, third_connectivities)
- self.assertNotIn(_READY, fourth_connectivities)
- self.assertNotIn(_READY, fifth_connectivities)
+ self.assertSequenceEqual(
+ (interfaces.ChannelConnectivity.IDLE,), first_connectivities)
+ self.assertNotIn(
+ interfaces.ChannelConnectivity.READY, second_connectivities)
+ self.assertNotIn(
+ interfaces.ChannelConnectivity.READY, third_connectivities)
+ self.assertNotIn(
+ interfaces.ChannelConnectivity.READY, fourth_connectivities)
+ self.assertNotIn(
+ interfaces.ChannelConnectivity.READY, fifth_connectivities)
def test_immediately_connectable_channel_connectivity(self):
server_completion_queue = _low.CompletionQueue()
@@ -117,7 +117,7 @@ class ChannelConnectivityTest(unittest.TestCase):
second_callback = _Callback()
connectivity_channel = _connectivity_channel.ConnectivityChannel(
- low_channel, _MAPPING)
+ low_channel)
connectivity_channel.subscribe(first_callback.update, try_to_connect=False)
first_connectivities = first_callback.block_until_connectivities_satisfy(
bool)
@@ -132,9 +132,11 @@ class ChannelConnectivityTest(unittest.TestCase):
bool)
# Wait for a connection that will happen (or may already have happened).
first_callback.block_until_connectivities_satisfy(
- lambda connectivities: _READY in connectivities)
+ lambda connectivities:
+ interfaces.ChannelConnectivity.READY in connectivities)
second_callback.block_until_connectivities_satisfy(
- lambda connectivities: _READY in connectivities)
+ lambda connectivities:
+ interfaces.ChannelConnectivity.READY in connectivities)
connectivity_channel.unsubscribe(first_callback.update)
connectivity_channel.unsubscribe(second_callback.update)
@@ -142,12 +144,19 @@ class ChannelConnectivityTest(unittest.TestCase):
server_completion_queue.shutdown()
server_completion_queue_thread.join()
- self.assertSequenceEqual((_IDLE,), first_connectivities)
- self.assertSequenceEqual((_IDLE,), second_connectivities)
- self.assertNotIn(_TRANSIENT_FAILURE, third_connectivities)
- self.assertNotIn(_FATAL_FAILURE, third_connectivities)
- self.assertNotIn(_TRANSIENT_FAILURE, fourth_connectivities)
- self.assertNotIn(_FATAL_FAILURE, fourth_connectivities)
+ self.assertSequenceEqual(
+ (interfaces.ChannelConnectivity.IDLE,), first_connectivities)
+ self.assertSequenceEqual(
+ (interfaces.ChannelConnectivity.IDLE,), second_connectivities)
+ self.assertNotIn(
+ interfaces.ChannelConnectivity.TRANSIENT_FAILURE, third_connectivities)
+ self.assertNotIn(
+ interfaces.ChannelConnectivity.FATAL_FAILURE, third_connectivities)
+ self.assertNotIn(
+ interfaces.ChannelConnectivity.TRANSIENT_FAILURE,
+ fourth_connectivities)
+ self.assertNotIn(
+ interfaces.ChannelConnectivity.FATAL_FAILURE, fourth_connectivities)
def test_reachable_then_unreachable_channel_connectivity(self):
server_completion_queue = _low.CompletionQueue()
@@ -161,14 +170,16 @@ class ChannelConnectivityTest(unittest.TestCase):
callback = _Callback()
connectivity_channel = _connectivity_channel.ConnectivityChannel(
- low_channel, _MAPPING)
+ low_channel)
connectivity_channel.subscribe(callback.update, try_to_connect=True)
callback.block_until_connectivities_satisfy(
- lambda connectivities: _READY in connectivities)
+ lambda connectivities:
+ interfaces.ChannelConnectivity.READY in connectivities)
# Now take down the server and confirm that channel readiness is repudiated.
server.shutdown()
callback.block_until_connectivities_satisfy(
- lambda connectivities: connectivities[-1] is not _READY)
+ lambda connectivities:
+ connectivities[-1] is not interfaces.ChannelConnectivity.READY)
connectivity_channel.unsubscribe(callback.update)
server.shutdown()
diff --git a/src/python/grpcio_test/grpc_test/beta/_face_interface_test.py b/src/python/grpcio_test/grpc_test/beta/_face_interface_test.py
index ce4c59c0ee..aa33e1e6f8 100644
--- a/src/python/grpcio_test/grpc_test/beta/_face_interface_test.py
+++ b/src/python/grpcio_test/grpc_test/beta/_face_interface_test.py
@@ -32,8 +32,8 @@
import collections
import unittest
-from grpc._adapter import _intermediary_low
-from grpc.beta import beta
+from grpc.beta import implementations
+from grpc.beta import interfaces
from grpc_test import resources
from grpc_test import test_common as grpc_test_common
from grpc_test.beta import test_utilities
@@ -81,25 +81,26 @@ class _Implementation(test_interfaces.Implementation):
method: method_object.cardinality()
for (group, method), method_object in methods.iteritems()}
- server_options = beta.server_options(
+ server_options = implementations.server_options(
request_deserializers=serialization_behaviors.request_deserializers,
response_serializers=serialization_behaviors.response_serializers,
thread_pool_size=test_constants.POOL_SIZE)
- server = beta.server(method_implementations, options=server_options)
- server_credentials = beta.ssl_server_credentials(
+ server = implementations.server(
+ method_implementations, options=server_options)
+ server_credentials = implementations.ssl_server_credentials(
[(resources.private_key(), resources.certificate_chain(),),])
port = server.add_secure_port('[::]:0', server_credentials)
server.start()
- client_credentials = beta.ssl_client_credentials(
+ client_credentials = implementations.ssl_client_credentials(
resources.test_root_certificates(), None, None)
- channel = test_utilities.create_not_really_secure_channel(
+ channel = test_utilities.not_really_secure_channel(
'localhost', port, client_credentials, _SERVER_HOST_OVERRIDE)
- stub_options = beta.stub_options(
+ stub_options = implementations.stub_options(
request_serializers=serialization_behaviors.request_serializers,
response_deserializers=serialization_behaviors.response_deserializers,
thread_pool_size=test_constants.POOL_SIZE)
- generic_stub = beta.generic_stub(channel, options=stub_options)
- dynamic_stub = beta.dynamic_stub(
+ generic_stub = implementations.generic_stub(channel, options=stub_options)
+ dynamic_stub = implementations.dynamic_stub(
channel, service, cardinalities, options=stub_options)
return generic_stub, {service: dynamic_stub}, server
@@ -116,7 +117,7 @@ class _Implementation(test_interfaces.Implementation):
return grpc_test_common.SERVICE_TERMINAL_METADATA
def code(self):
- return _intermediary_low.Code.OK
+ return interfaces.StatusCode.OK
def details(self):
return grpc_test_common.DETAILS
diff --git a/src/python/grpcio_test/grpc_test/beta/_not_found_test.py b/src/python/grpcio_test/grpc_test/beta/_not_found_test.py
new file mode 100644
index 0000000000..5feb997fef
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/beta/_not_found_test.py
@@ -0,0 +1,75 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Tests of RPC-method-not-found behavior."""
+
+import unittest
+
+from grpc.beta import implementations
+from grpc.beta import interfaces
+from grpc.framework.interfaces.face import face
+from grpc_test.framework.common import test_constants
+
+
+class NotFoundTest(unittest.TestCase):
+
+ def setUp(self):
+ self._server = implementations.server({})
+ port = self._server.add_insecure_port('[::]:0')
+ channel = implementations.insecure_channel('localhost', port)
+ self._generic_stub = implementations.generic_stub(channel)
+ self._server.start()
+
+ def tearDown(self):
+ self._server.stop(0).wait()
+ self._generic_stub = None
+
+ def test_blocking_unary_unary_not_found(self):
+ with self.assertRaises(face.LocalError) as exception_assertion_context:
+ self._generic_stub.blocking_unary_unary(
+ 'groop', 'meffod', b'abc', test_constants.LONG_TIMEOUT,
+ with_call=True)
+ self.assertIs(
+ exception_assertion_context.exception.code,
+ interfaces.StatusCode.UNIMPLEMENTED)
+
+ def test_future_stream_unary_not_found(self):
+ rpc_future = self._generic_stub.future_stream_unary(
+ 'grupe', 'mevvod', b'def', test_constants.LONG_TIMEOUT)
+ with self.assertRaises(face.LocalError) as exception_assertion_context:
+ rpc_future.result()
+ self.assertIs(
+ exception_assertion_context.exception.code,
+ interfaces.StatusCode.UNIMPLEMENTED)
+ self.assertIs(
+ rpc_future.exception().code, interfaces.StatusCode.UNIMPLEMENTED)
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_test/grpc_test/beta/_utilities_test.py b/src/python/grpcio_test/grpc_test/beta/_utilities_test.py
index 998e74ccf4..996cea9118 100644
--- a/src/python/grpcio_test/grpc_test/beta/_utilities_test.py
+++ b/src/python/grpcio_test/grpc_test/beta/_utilities_test.py
@@ -35,7 +35,7 @@ import unittest
from grpc._adapter import _low
from grpc._adapter import _types
-from grpc.beta import beta
+from grpc.beta import implementations
from grpc.beta import utilities
from grpc.framework.foundation import future
from grpc_test.framework.common import test_constants
@@ -69,7 +69,7 @@ class _Callback(object):
class ChannelConnectivityTest(unittest.TestCase):
def test_lonely_channel_connectivity(self):
- channel = beta.create_insecure_channel('localhost', 12345)
+ channel = implementations.insecure_channel('localhost', 12345)
callback = _Callback()
ready_future = utilities.channel_ready_future(channel)
@@ -94,7 +94,7 @@ class ChannelConnectivityTest(unittest.TestCase):
server_completion_queue_thread = threading.Thread(
target=_drive_completion_queue, args=(server_completion_queue,))
server_completion_queue_thread.start()
- channel = beta.create_insecure_channel('localhost', port)
+ channel = implementations.insecure_channel('localhost', port)
callback = _Callback()
try:
diff --git a/src/python/grpcio_test/grpc_test/beta/test_utilities.py b/src/python/grpcio_test/grpc_test/beta/test_utilities.py
index 338670478d..24a8600e12 100644
--- a/src/python/grpcio_test/grpc_test/beta/test_utilities.py
+++ b/src/python/grpcio_test/grpc_test/beta/test_utilities.py
@@ -30,25 +30,27 @@
"""Test-appropriate entry points into the gRPC Python Beta API."""
from grpc._adapter import _intermediary_low
-from grpc.beta import beta
+from grpc.beta import implementations
-def create_not_really_secure_channel(
+def not_really_secure_channel(
host, port, client_credentials, server_host_override):
"""Creates an insecure Channel to a remote host.
Args:
host: The name of the remote host to which to connect.
port: The port of the remote host to which to connect.
- client_credentials: The beta.ClientCredentials with which to connect.
+ client_credentials: The implementations.ClientCredentials with which to
+ connect.
server_host_override: The target name used for SSL host name checking.
Returns:
- A beta.Channel to the remote host through which RPCs may be conducted.
+ An implementations.Channel to the remote host through which RPCs may be
+ conducted.
"""
hostport = '%s:%d' % (host, port)
intermediary_low_channel = _intermediary_low.Channel(
hostport, client_credentials._intermediary_low_credentials,
server_host_override=server_host_override)
- return beta.Channel(
+ return implementations.Channel(
intermediary_low_channel._internal, intermediary_low_channel)
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/base/_control.py b/src/python/grpcio_test/grpc_test/framework/interfaces/base/_control.py
index e4d2a7a0d7..46a01876d8 100644
--- a/src/python/grpcio_test/grpc_test/framework/interfaces/base/_control.py
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/base/_control.py
@@ -236,8 +236,8 @@ class Instruction(
collections.namedtuple(
'Instruction',
('kind', 'advance_args', 'advance_kwargs', 'conclude_success',
- 'conclude_message', 'conclude_invocation_outcome',
- 'conclude_service_outcome',))):
+ 'conclude_message', 'conclude_invocation_outcome_kind',
+ 'conclude_service_outcome_kind',))):
""""""
@enum.unique
@@ -532,24 +532,24 @@ class _SequenceController(Controller):
self._state.service_side_outcome = outcome
if self._todo is not None or self._remaining_elements:
self._failed('Premature service-side outcome %s!' % (outcome,))
- elif outcome is not self._sequence.outcome.service:
+ elif outcome.kind is not self._sequence.outcome_kinds.service:
self._failed(
- 'Incorrect service-side outcome: %s should have been %s' % (
- outcome, self._sequence.outcome.service))
+ 'Incorrect service-side outcome kind: %s should have been %s' % (
+ outcome.kind, self._sequence.outcome_kinds.service))
elif self._state.invocation_side_outcome is not None:
- self._passed(self._state.invocation_side_outcome, outcome)
+ self._passed(self._state.invocation_side_outcome.kind, outcome.kind)
def invocation_on_termination(self, outcome):
with self._condition:
self._state.invocation_side_outcome = outcome
if self._todo is not None or self._remaining_elements:
self._failed('Premature invocation-side outcome %s!' % (outcome,))
- elif outcome is not self._sequence.outcome.invocation:
+ elif outcome.kind is not self._sequence.outcome_kinds.invocation:
self._failed(
- 'Incorrect invocation-side outcome: %s should have been %s' % (
- outcome, self._sequence.outcome.invocation))
+ 'Incorrect invocation-side outcome kind: %s should have been %s' % (
+ outcome.kind, self._sequence.outcome_kinds.invocation))
elif self._state.service_side_outcome is not None:
- self._passed(outcome, self._state.service_side_outcome)
+ self._passed(outcome.kind, self._state.service_side_outcome.kind)
class _SequenceControllerCreator(ControllerCreator):
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/base/_sequence.py b/src/python/grpcio_test/grpc_test/framework/interfaces/base/_sequence.py
index 1d77aaebe6..f547d91681 100644
--- a/src/python/grpcio_test/grpc_test/framework/interfaces/base/_sequence.py
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/base/_sequence.py
@@ -103,13 +103,14 @@ class Element(collections.namedtuple('Element', ('kind', 'transmission',))):
SERVICE_FAILURE = 'service failure'
-class Outcome(collections.namedtuple('Outcome', ('invocation', 'service',))):
+class OutcomeKinds(
+ collections.namedtuple('Outcome', ('invocation', 'service',))):
"""A description of the expected outcome of an operation test.
Attributes:
- invocation: The base.Outcome value expected on the invocation side of the
- operation.
- service: The base.Outcome value expected on the service side of the
+ invocation: The base.Outcome.Kind value expected on the invocation side of
+ the operation.
+ service: The base.Outcome.Kind value expected on the service side of the
operation.
"""
@@ -117,7 +118,8 @@ class Outcome(collections.namedtuple('Outcome', ('invocation', 'service',))):
class Sequence(
collections.namedtuple(
'Sequence',
- ('name', 'maximum_duration', 'invocation', 'elements', 'outcome',))):
+ ('name', 'maximum_duration', 'invocation', 'elements',
+ 'outcome_kinds',))):
"""Describes at a high level steps to perform in a test.
Attributes:
@@ -128,7 +130,8 @@ class Sequence(
under test.
elements: A sequence of Element values describing at coarse granularity
actions to take during the operation under test.
- outcome: An Outcome value describing the expected outcome of the test.
+ outcome_kinds: An OutcomeKinds value describing the expected outcome kinds
+ of the test.
"""
_EASY = Sequence(
@@ -139,7 +142,7 @@ _EASY = Sequence(
Element(
Element.Kind.SERVICE_TRANSMISSION, Transmission(True, True, True)),
),
- Outcome(base.Outcome.COMPLETED, base.Outcome.COMPLETED))
+ OutcomeKinds(base.Outcome.Kind.COMPLETED, base.Outcome.Kind.COMPLETED))
_PEASY = Sequence(
'Peasy',
@@ -154,7 +157,7 @@ _PEASY = Sequence(
Element(
Element.Kind.SERVICE_TRANSMISSION, Transmission(False, True, True)),
),
- Outcome(base.Outcome.COMPLETED, base.Outcome.COMPLETED))
+ OutcomeKinds(base.Outcome.Kind.COMPLETED, base.Outcome.Kind.COMPLETED))
# TODO(issue 2959): Finish this test suite. This tuple of sequences should
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py b/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py
index 87332cf612..ddda1018c3 100644
--- a/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py
@@ -44,7 +44,8 @@ from grpc_test.framework.interfaces.base import test_interfaces
_SYNCHRONICITY_VARIATION = (('Sync', False), ('Async', True))
-_EMPTY_OUTCOME_DICT = {outcome: 0 for outcome in base.Outcome}
+_EMPTY_OUTCOME_KIND_DICT = {
+ outcome_kind: 0 for outcome_kind in base.Outcome.Kind}
class _Serialization(test_interfaces.Serialization):
@@ -118,8 +119,19 @@ class _Operator(base.Operator):
'Deliberately raised exception from Operator.advance (in a test)!')
+class _ProtocolReceiver(base.ProtocolReceiver):
+
+ def __init__(self):
+ self._condition = threading.Condition()
+ self._contexts = []
+
+ def context(self, protocol_context):
+ with self._condition:
+ self._contexts.append(protocol_context)
+
+
class _Servicer(base.Servicer):
- """An base.Servicer with instrumented for testing."""
+ """A base.Servicer with instrumented for testing."""
def __init__(self, group, method, controllers, pool):
self._condition = threading.Condition()
@@ -143,7 +155,7 @@ class _Servicer(base.Servicer):
controller.service_on_termination)
if outcome is not None:
controller.service_on_termination(outcome)
- return utilities.full_subscription(operator)
+ return utilities.full_subscription(operator, _ProtocolReceiver())
class _OperationTest(unittest.TestCase):
@@ -168,7 +180,8 @@ class _OperationTest(unittest.TestCase):
test_operator = _Operator(
self._controller, self._controller.on_invocation_advance,
self._pool, None)
- subscription = utilities.full_subscription(test_operator)
+ subscription = utilities.full_subscription(
+ test_operator, _ProtocolReceiver())
else:
# TODO(nathaniel): support and test other subscription kinds.
self.fail('Non-full subscriptions not yet supported!')
@@ -223,11 +236,12 @@ class _OperationTest(unittest.TestCase):
self.assertTrue(
instruction.conclude_success, msg=instruction.conclude_message)
- expected_invocation_stats = dict(_EMPTY_OUTCOME_DICT)
- expected_invocation_stats[instruction.conclude_invocation_outcome] += 1
+ expected_invocation_stats = dict(_EMPTY_OUTCOME_KIND_DICT)
+ expected_invocation_stats[
+ instruction.conclude_invocation_outcome_kind] += 1
self.assertDictEqual(expected_invocation_stats, invocation_stats)
- expected_service_stats = dict(_EMPTY_OUTCOME_DICT)
- expected_service_stats[instruction.conclude_service_outcome] += 1
+ expected_service_stats = dict(_EMPTY_OUTCOME_KIND_DICT)
+ expected_service_stats[instruction.conclude_service_outcome_kind] += 1
self.assertDictEqual(expected_service_stats, service_stats)
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_blocking_invocation_inline_service.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_blocking_invocation_inline_service.py
index b7dd5d4d17..2d2a081955 100644
--- a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_blocking_invocation_inline_service.py
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_blocking_invocation_inline_service.py
@@ -82,8 +82,8 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
for test_messages in test_messages_sequence:
request = test_messages.request()
- response = self._invoker.blocking(group, method)(
- request, test_constants.LONG_TIMEOUT)
+ response, call = self._invoker.blocking(group, method)(
+ request, test_constants.LONG_TIMEOUT, with_call=True)
test_messages.verify(request, response, self)
@@ -105,8 +105,8 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
for test_messages in test_messages_sequence:
requests = test_messages.requests()
- response = self._invoker.blocking(group, method)(
- iter(requests), test_constants.LONG_TIMEOUT)
+ response, call = self._invoker.blocking(group, method)(
+ iter(requests), test_constants.LONG_TIMEOUT, with_call=True)
test_messages.verify(requests, response, self)
diff --git a/src/python/grpcio_test/requirements.txt b/src/python/grpcio_test/requirements.txt
index 856198def5..fea80ca07f 100644
--- a/src/python/grpcio_test/requirements.txt
+++ b/src/python/grpcio_test/requirements.txt
@@ -1,5 +1,6 @@
+grpcio>=0.11.0b0
+oauth2client>=1.4.7
+protobuf>=3.0.0a3
pytest>=2.6
pytest-cov>=2.0
pytest-xdist>=1.11
-oauth2client>=1.4.7
-grpcio>=0.10.0a0
diff --git a/src/python/grpcio_test/setup.py b/src/python/grpcio_test/setup.py
index 802dd1e53a..216119f0e7 100644
--- a/src/python/grpcio_test/setup.py
+++ b/src/python/grpcio_test/setup.py
@@ -71,7 +71,7 @@ _SETUP_REQUIRES = (
_INSTALL_REQUIRES = (
'oauth2client>=1.4.7',
- 'grpcio>=0.10.0a0',
+ 'grpcio>=0.11.0b0',
)
_COMMAND_CLASS = {
@@ -80,7 +80,7 @@ _COMMAND_CLASS = {
setuptools.setup(
name='grpcio_test',
- version='0.10.0a0',
+ version='0.11.0b0',
packages=_PACKAGES,
package_dir=_PACKAGE_DIRECTORIES,
package_data=_PACKAGE_DATA,