aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python
diff options
context:
space:
mode:
Diffstat (limited to 'src/python')
-rw-r--r--src/python/README.md37
-rw-r--r--src/python/grpcio/README.rst3
-rw-r--r--src/python/grpcio/grpc/_adapter/_c/module.c6
-rw-r--r--src/python/grpcio/grpc/_adapter/_c/types.h9
-rw-r--r--src/python/grpcio/grpc/_adapter/_c/types/call.c14
-rw-r--r--src/python/grpcio/grpc/_adapter/_c/types/channel.c58
-rw-r--r--src/python/grpcio/grpc/_adapter/_c/types/completion_queue.c4
-rw-r--r--src/python/grpcio/grpc/_adapter/_c/types/server.c6
-rw-r--r--src/python/grpcio/grpc/_adapter/_c/utility.c22
-rw-r--r--src/python/grpcio/grpc/_adapter/_intermediary_low.py2
-rw-r--r--src/python/grpcio/grpc/_adapter/_low.py18
-rw-r--r--src/python/grpcio/grpc/_adapter/_types.py95
-rw-r--r--src/python/grpcio/grpc/_links/service.py43
-rw-r--r--src/python/grpcio/grpc/early_adopter/implementations.py30
-rw-r--r--src/python/grpcio/grpc/framework/core/__init__.py30
-rw-r--r--src/python/grpcio/grpc/framework/core/_constants.py59
-rw-r--r--src/python/grpcio/grpc/framework/core/_context.py92
-rw-r--r--src/python/grpcio/grpc/framework/core/_emission.py97
-rw-r--r--src/python/grpcio/grpc/framework/core/_end.py251
-rw-r--r--src/python/grpcio/grpc/framework/core/_expiration.py152
-rw-r--r--src/python/grpcio/grpc/framework/core/_ingestion.py410
-rw-r--r--src/python/grpcio/grpc/framework/core/_interfaces.py308
-rw-r--r--src/python/grpcio/grpc/framework/core/_operation.py192
-rw-r--r--src/python/grpcio/grpc/framework/core/_reception.py137
-rw-r--r--src/python/grpcio/grpc/framework/core/_termination.py212
-rw-r--r--src/python/grpcio/grpc/framework/core/_transmission.py294
-rw-r--r--src/python/grpcio/grpc/framework/core/_utilities.py46
-rw-r--r--src/python/grpcio/grpc/framework/core/implementations.py62
-rw-r--r--src/python/grpcio/grpc/framework/interfaces/base/__init__.py30
-rw-r--r--src/python/grpcio/grpc/framework/interfaces/base/base.py290
-rw-r--r--src/python/grpcio/grpc/framework/interfaces/base/utilities.py79
-rw-r--r--src/python/grpcio/grpc/framework/interfaces/links/links.py2
-rw-r--r--src/python/grpcio_health_checking/MANIFEST.in2
-rw-r--r--src/python/grpcio_health_checking/README.rst9
-rw-r--r--src/python/grpcio_health_checking/commands.py80
-rw-r--r--src/python/grpcio_health_checking/grpc/__init__.py30
-rw-r--r--src/python/grpcio_health_checking/grpc/health/__init__.py30
-rw-r--r--src/python/grpcio_health_checking/grpc/health/v1alpha/__init__.py30
-rw-r--r--src/python/grpcio_health_checking/grpc/health/v1alpha/health.proto49
-rw-r--r--src/python/grpcio_health_checking/grpc/health/v1alpha/health.py129
-rw-r--r--src/python/grpcio_health_checking/setup.py72
-rw-r--r--src/python/grpcio_test/grpc_interop/_interop_test_case.py3
-rw-r--r--src/python/grpcio_test/grpc_interop/methods.py23
-rw-r--r--src/python/grpcio_test/grpc_protoc_plugin/__init__.py30
-rw-r--r--src/python/grpcio_test/grpc_protoc_plugin/python_plugin_test.py541
-rw-r--r--src/python/grpcio_test/grpc_protoc_plugin/test.proto139
-rw-r--r--src/python/grpcio_test/grpc_test/_adapter/_low_test.py30
-rw-r--r--src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py165
-rw-r--r--src/python/grpcio_test/grpc_test/framework/common/test_constants.py10
-rw-r--r--src/python/grpcio_test/grpc_test/framework/core/__init__.py30
-rw-r--r--src/python/grpcio_test/grpc_test/framework/core/_base_interface_test.py96
-rw-r--r--src/python/grpcio_test/grpc_test/framework/face/testing/blocking_invocation_inline_service_test_case.py34
-rw-r--r--src/python/grpcio_test/grpc_test/framework/face/testing/event_invocation_synchronous_event_service_test_case.py64
-rw-r--r--src/python/grpcio_test/grpc_test/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py45
-rw-r--r--src/python/grpcio_test/grpc_test/framework/interfaces/base/__init__.py30
-rw-r--r--src/python/grpcio_test/grpc_test/framework/interfaces/base/_control.py568
-rw-r--r--src/python/grpcio_test/grpc_test/framework/interfaces/base/_sequence.py168
-rw-r--r--src/python/grpcio_test/grpc_test/framework/interfaces/base/_state.py55
-rw-r--r--src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py262
-rw-r--r--src/python/grpcio_test/grpc_test/framework/interfaces/base/test_interfaces.py186
-rw-r--r--src/python/grpcio_test/grpc_test/framework/interfaces/links/test_cases.py9
-rw-r--r--src/python/grpcio_test/grpc_test/framework/interfaces/links/test_utilities.py101
-rw-r--r--src/python/grpcio_test/setup.py11
63 files changed, 5979 insertions, 142 deletions
diff --git a/src/python/README.md b/src/python/README.md
index 2beb3a913a..de0142db05 100644
--- a/src/python/README.md
+++ b/src/python/README.md
@@ -9,12 +9,36 @@ Alpha : Ready for early adopters
PREREQUISITES
-------------
- Python 2.7, virtualenv, pip
-- [homebrew][] on Mac OS X, [linuxbrew][] on Linux. These simplify the installation of the gRPC C core.
+- [homebrew][] on Mac OS X. These simplify the installation of the gRPC C core.
INSTALLATION
-------------
-On Mac OS X, install [homebrew][]. On Linux, install [linuxbrew][].
-Run the following command to install gRPC Python.
+
+**Linux (Debian):**
+
+Add [Debian unstable][] to your `sources.list` file. Example:
+
+```sh
+echo "deb http://ftp.us.debian.org/debian unstable main contrib non-free" | \
+sudo tee -a /etc/apt/sources.list
+```
+
+Install the gRPC Debian package
+
+```sh
+sudo apt-get update
+sudo apt-get install libgrpc-dev
+```
+
+Install the gRPC Python module
+
+```sh
+sudo pip install grpcio
+```
+
+**Mac OS X**
+
+Install [homebrew][]. Run the following command to install gRPC Python.
```sh
$ curl -fsSL https://goo.gl/getgrpc | bash -s python
```
@@ -27,11 +51,6 @@ Please read our online documentation for a [Quick Start][] and a [detailed examp
BUILDING FROM SOURCE
---------------------
- Clone this repository
-- Build the gRPC core from the root of the
- [gRPC Git repository](https://github.com/grpc/grpc)
-```
-$ make shared_c static_c
-```
- Use build_python.sh to build the Python code and install it into a virtual environment
```
@@ -60,7 +79,7 @@ $ ../../tools/distrib/python/submit.py
```
[homebrew]:http://brew.sh
-[linuxbrew]:https://github.com/Homebrew/linuxbrew#installation
[gRPC install script]:https://raw.githubusercontent.com/grpc/homebrew-grpc/master/scripts/install
[Quick Start]:http://www.grpc.io/docs/tutorials/basic/python.html
[detailed example]:http://www.grpc.io/docs/installation/python.html
+[Debian unstable]:https://www.debian.org/releases/sid/
diff --git a/src/python/grpcio/README.rst b/src/python/grpcio/README.rst
index 00bdecf56f..c7b5a3bde4 100644
--- a/src/python/grpcio/README.rst
+++ b/src/python/grpcio/README.rst
@@ -6,7 +6,7 @@ Package for GRPC Python.
Dependencies
------------
-Ensure you have installed the gRPC core. On Mac OS X, install homebrew_. On Linux, install linuxbrew_.
+Ensure you have installed the gRPC core. On Mac OS X, install homebrew_.
Run the following command to install gRPC Python.
::
@@ -19,5 +19,4 @@ Otherwise, `install from source`_
.. _`install from source`: https://github.com/grpc/grpc/blob/master/src/python/README.md#building-from-source
.. _homebrew: http://brew.sh
-.. _linuxbrew: https://github.com/Homebrew/linuxbrew#installation
.. _`gRPC install script`: https://raw.githubusercontent.com/grpc/homebrew-grpc/master/scripts/install
diff --git a/src/python/grpcio/grpc/_adapter/_c/module.c b/src/python/grpcio/grpc/_adapter/_c/module.c
index 1f3aedd9d8..9b93b051f6 100644
--- a/src/python/grpcio/grpc/_adapter/_c/module.c
+++ b/src/python/grpcio/grpc/_adapter/_c/module.c
@@ -53,6 +53,12 @@ PyMODINIT_FUNC init_c(void) {
return;
}
+ if (PyModule_AddStringConstant(
+ module, "PRIMARY_USER_AGENT_KEY",
+ GRPC_ARG_PRIMARY_USER_AGENT_STRING) < 0) {
+ return;
+ }
+
/* GRPC maintains an internal counter of how many times it has been
initialized and handles multiple pairs of grpc_init()/grpc_shutdown()
invocations accordingly. */
diff --git a/src/python/grpcio/grpc/_adapter/_c/types.h b/src/python/grpcio/grpc/_adapter/_c/types.h
index 4e0da4a28a..f646465c63 100644
--- a/src/python/grpcio/grpc/_adapter/_c/types.h
+++ b/src/python/grpcio/grpc/_adapter/_c/types.h
@@ -113,6 +113,7 @@ Call *pygrpc_Call_new_empty(CompletionQueue *cq);
void pygrpc_Call_dealloc(Call *self);
PyObject *pygrpc_Call_start_batch(Call *self, PyObject *args, PyObject *kwargs);
PyObject *pygrpc_Call_cancel(Call *self, PyObject *args, PyObject *kwargs);
+PyObject *pygrpc_Call_peer(Call *self);
extern PyTypeObject pygrpc_Call_type;
@@ -129,6 +130,11 @@ Channel *pygrpc_Channel_new(
void pygrpc_Channel_dealloc(Channel *self);
Call *pygrpc_Channel_create_call(
Channel *self, PyObject *args, PyObject *kwargs);
+PyObject *pygrpc_Channel_check_connectivity_state(Channel *self, PyObject *args,
+ PyObject *kwargs);
+PyObject *pygrpc_Channel_watch_connectivity_state(Channel *self, PyObject *args,
+ PyObject *kwargs);
+PyObject *pygrpc_Channel_target(Channel *self);
extern PyTypeObject pygrpc_Channel_type;
@@ -181,6 +187,9 @@ pygrpc_tag *pygrpc_produce_request_tag(PyObject *user_tag, Call *empty_call);
/* Construct a tag associated with a server shutdown. */
pygrpc_tag *pygrpc_produce_server_shutdown_tag(PyObject *user_tag);
+/* Construct a tag associated with a channel state change. */
+pygrpc_tag *pygrpc_produce_channel_state_change_tag(PyObject *user_tag);
+
/* Frees all resources owned by the tag and the tag itself. */
void pygrpc_discard_tag(pygrpc_tag *tag);
diff --git a/src/python/grpcio/grpc/_adapter/_c/types/call.c b/src/python/grpcio/grpc/_adapter/_c/types/call.c
index 0739070044..42a50151f6 100644
--- a/src/python/grpcio/grpc/_adapter/_c/types/call.c
+++ b/src/python/grpcio/grpc/_adapter/_c/types/call.c
@@ -42,6 +42,7 @@
PyMethodDef pygrpc_Call_methods[] = {
{"start_batch", (PyCFunction)pygrpc_Call_start_batch, METH_KEYWORDS, ""},
{"cancel", (PyCFunction)pygrpc_Call_cancel, METH_KEYWORDS, ""},
+ {"peer", (PyCFunction)pygrpc_Call_peer, METH_NOARGS, ""},
{NULL}
};
const char pygrpc_Call_doc[] = "See grpc._adapter._types.Call.";
@@ -131,7 +132,7 @@ PyObject *pygrpc_Call_start_batch(Call *self, PyObject *args, PyObject *kwargs)
}
}
tag = pygrpc_produce_batch_tag(user_tag, self, ops, nops);
- errcode = grpc_call_start_batch(self->c_call, tag->ops, tag->nops, tag);
+ errcode = grpc_call_start_batch(self->c_call, tag->ops, tag->nops, tag, NULL);
gpr_free(ops);
return PyInt_FromLong(errcode);
}
@@ -151,13 +152,20 @@ PyObject *pygrpc_Call_cancel(Call *self, PyObject *args, PyObject *kwargs) {
return NULL;
}
code = PyInt_AsLong(py_code);
- errcode = grpc_call_cancel_with_status(self->c_call, code, details);
+ errcode = grpc_call_cancel_with_status(self->c_call, code, details, NULL);
} else if (py_code != NULL || details != NULL) {
PyErr_SetString(PyExc_ValueError,
"if `code` is specified, so must `details`");
return NULL;
} else {
- errcode = grpc_call_cancel(self->c_call);
+ errcode = grpc_call_cancel(self->c_call, NULL);
}
return PyInt_FromLong(errcode);
}
+
+PyObject *pygrpc_Call_peer(Call *self) {
+ char *peer = grpc_call_get_peer(self->c_call);
+ PyObject *py_peer = PyString_FromString(peer);
+ gpr_free(peer);
+ return py_peer;
+}
diff --git a/src/python/grpcio/grpc/_adapter/_c/types/channel.c b/src/python/grpcio/grpc/_adapter/_c/types/channel.c
index 963104742f..c577ac05eb 100644
--- a/src/python/grpcio/grpc/_adapter/_c/types/channel.c
+++ b/src/python/grpcio/grpc/_adapter/_c/types/channel.c
@@ -36,10 +36,14 @@
#define PY_SSIZE_T_CLEAN
#include <Python.h>
#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
PyMethodDef pygrpc_Channel_methods[] = {
{"create_call", (PyCFunction)pygrpc_Channel_create_call, METH_KEYWORDS, ""},
+ {"check_connectivity_state", (PyCFunction)pygrpc_Channel_check_connectivity_state, METH_KEYWORDS, ""},
+ {"watch_connectivity_state", (PyCFunction)pygrpc_Channel_watch_connectivity_state, METH_KEYWORDS, ""},
+ {"target", (PyCFunction)pygrpc_Channel_target, METH_NOARGS, ""},
{NULL}
};
const char pygrpc_Channel_doc[] = "See grpc._adapter._types.Channel.";
@@ -104,7 +108,7 @@ Channel *pygrpc_Channel_new(
if (creds) {
self->c_chan = grpc_secure_channel_create(creds->c_creds, target, &c_args);
} else {
- self->c_chan = grpc_insecure_channel_create(target, &c_args);
+ self->c_chan = grpc_insecure_channel_create(target, &c_args, NULL);
}
pygrpc_discard_channel_args(c_args);
return self;
@@ -122,13 +126,61 @@ Call *pygrpc_Channel_create_call(
const char *host;
double deadline;
char *keywords[] = {"cq", "method", "host", "deadline", NULL};
- if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O!ssd:create_call", keywords,
+ if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O!szd:create_call", keywords,
&pygrpc_CompletionQueue_type, &cq, &method, &host, &deadline)) {
return NULL;
}
call = pygrpc_Call_new_empty(cq);
call->c_call = grpc_channel_create_call(
self->c_chan, NULL, GRPC_PROPAGATE_DEFAULTS, cq->c_cq, method, host,
- pygrpc_cast_double_to_gpr_timespec(deadline));
+ pygrpc_cast_double_to_gpr_timespec(deadline), NULL);
return call;
}
+
+PyObject *pygrpc_Channel_check_connectivity_state(
+ Channel *self, PyObject *args, PyObject *kwargs) {
+ PyObject *py_try_to_connect;
+ int try_to_connect;
+ char *keywords[] = {"try_to_connect", NULL};
+ grpc_connectivity_state state;
+ if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O:connectivity_state", keywords,
+ &py_try_to_connect)) {
+ return NULL;
+ }
+ if (!PyBool_Check(py_try_to_connect)) {
+ Py_XDECREF(py_try_to_connect);
+ return NULL;
+ }
+ try_to_connect = Py_True == py_try_to_connect;
+ Py_DECREF(py_try_to_connect);
+ state = grpc_channel_check_connectivity_state(self->c_chan, try_to_connect);
+ return PyInt_FromLong(state);
+}
+
+PyObject *pygrpc_Channel_watch_connectivity_state(
+ Channel *self, PyObject *args, PyObject *kwargs) {
+ PyObject *tag;
+ double deadline;
+ int last_observed_state;
+ CompletionQueue *completion_queue;
+ char *keywords[] = {"last_observed_state", "deadline",
+ "completion_queue", "tag"};
+ if (!PyArg_ParseTupleAndKeywords(
+ args, kwargs, "idO!O:watch_connectivity_state", keywords,
+ &last_observed_state, &deadline, &pygrpc_CompletionQueue_type,
+ &completion_queue, &tag)) {
+ return NULL;
+ }
+ grpc_channel_watch_connectivity_state(
+ self->c_chan, (grpc_connectivity_state)last_observed_state,
+ pygrpc_cast_double_to_gpr_timespec(deadline), completion_queue->c_cq,
+ pygrpc_produce_channel_state_change_tag(tag));
+ Py_RETURN_NONE;
+}
+
+PyObject *pygrpc_Channel_target(Channel *self) {
+ char *target = grpc_channel_get_target(self->c_chan);
+ PyObject *py_target = PyString_FromString(target);
+ gpr_free(target);
+ return py_target;
+}
diff --git a/src/python/grpcio/grpc/_adapter/_c/types/completion_queue.c b/src/python/grpcio/grpc/_adapter/_c/types/completion_queue.c
index 2dd44b6ddd..d8bb89ca4b 100644
--- a/src/python/grpcio/grpc/_adapter/_c/types/completion_queue.c
+++ b/src/python/grpcio/grpc/_adapter/_c/types/completion_queue.c
@@ -90,7 +90,7 @@ PyTypeObject pygrpc_CompletionQueue_type = {
CompletionQueue *pygrpc_CompletionQueue_new(
PyTypeObject *type, PyObject *args, PyObject *kwargs) {
CompletionQueue *self = (CompletionQueue *)type->tp_alloc(type, 0);
- self->c_cq = grpc_completion_queue_create();
+ self->c_cq = grpc_completion_queue_create(NULL);
return self;
}
@@ -111,7 +111,7 @@ PyObject *pygrpc_CompletionQueue_next(
}
Py_BEGIN_ALLOW_THREADS;
event = grpc_completion_queue_next(
- self->c_cq, pygrpc_cast_double_to_gpr_timespec(deadline));
+ self->c_cq, pygrpc_cast_double_to_gpr_timespec(deadline), NULL);
Py_END_ALLOW_THREADS;
transliterated_event = pygrpc_consume_event(event);
return transliterated_event;
diff --git a/src/python/grpcio/grpc/_adapter/_c/types/server.c b/src/python/grpcio/grpc/_adapter/_c/types/server.c
index c2190ea672..15c98f28eb 100644
--- a/src/python/grpcio/grpc/_adapter/_c/types/server.c
+++ b/src/python/grpcio/grpc/_adapter/_c/types/server.c
@@ -96,7 +96,7 @@ Server *pygrpc_Server_new(PyTypeObject *type, PyObject *args, PyObject *kwargs)
PyObject *py_args;
grpc_channel_args c_args;
char *keywords[] = {"cq", "args", NULL};
- if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O!O:Channel", keywords,
+ if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O!O:Server", keywords,
&pygrpc_CompletionQueue_type, &cq, &py_args)) {
return NULL;
}
@@ -104,8 +104,8 @@ Server *pygrpc_Server_new(PyTypeObject *type, PyObject *args, PyObject *kwargs)
return NULL;
}
self = (Server *)type->tp_alloc(type, 0);
- self->c_serv = grpc_server_create(&c_args);
- grpc_server_register_completion_queue(self->c_serv, cq->c_cq);
+ self->c_serv = grpc_server_create(&c_args, NULL);
+ grpc_server_register_completion_queue(self->c_serv, cq->c_cq, NULL);
pygrpc_discard_channel_args(c_args);
self->cq = cq;
Py_INCREF(self->cq);
diff --git a/src/python/grpcio/grpc/_adapter/_c/utility.c b/src/python/grpcio/grpc/_adapter/_c/utility.c
index 51f3c9be01..590f7e013a 100644
--- a/src/python/grpcio/grpc/_adapter/_c/utility.c
+++ b/src/python/grpcio/grpc/_adapter/_c/utility.c
@@ -88,6 +88,19 @@ pygrpc_tag *pygrpc_produce_server_shutdown_tag(PyObject *user_tag) {
return tag;
}
+pygrpc_tag *pygrpc_produce_channel_state_change_tag(PyObject *user_tag) {
+ pygrpc_tag *tag = gpr_malloc(sizeof(pygrpc_tag));
+ tag->user_tag = user_tag;
+ Py_XINCREF(tag->user_tag);
+ tag->call = NULL;
+ tag->ops = NULL;
+ tag->nops = 0;
+ grpc_call_details_init(&tag->request_call_details);
+ grpc_metadata_array_init(&tag->request_metadata);
+ tag->is_new_call = 0;
+ return tag;
+}
+
void pygrpc_discard_tag(pygrpc_tag *tag) {
if (!tag) {
return;
@@ -139,7 +152,7 @@ PyObject *pygrpc_consume_event(grpc_event event) {
}
int pygrpc_produce_op(PyObject *op, grpc_op *result) {
- static const int OP_TUPLE_SIZE = 5;
+ static const int OP_TUPLE_SIZE = 6;
static const int STATUS_TUPLE_SIZE = 2;
static const int TYPE_INDEX = 0;
static const int INITIAL_METADATA_INDEX = 1;
@@ -148,6 +161,7 @@ int pygrpc_produce_op(PyObject *op, grpc_op *result) {
static const int STATUS_INDEX = 4;
static const int STATUS_CODE_INDEX = 0;
static const int STATUS_DETAILS_INDEX = 1;
+ static const int WRITE_FLAGS_INDEX = 5;
int type;
Py_ssize_t message_size;
char *message;
@@ -170,7 +184,11 @@ int pygrpc_produce_op(PyObject *op, grpc_op *result) {
return 0;
}
c_op.op = type;
- c_op.flags = 0;
+ c_op.reserved = NULL;
+ c_op.flags = PyInt_AsLong(PyTuple_GET_ITEM(op, WRITE_FLAGS_INDEX));
+ if (PyErr_Occurred()) {
+ return 0;
+ }
switch (type) {
case GRPC_OP_SEND_INITIAL_METADATA:
if (!pygrpc_cast_pyseq_to_send_metadata(
diff --git a/src/python/grpcio/grpc/_adapter/_intermediary_low.py b/src/python/grpcio/grpc/_adapter/_intermediary_low.py
index 3c7f0a2619..e7bf9dc462 100644
--- a/src/python/grpcio/grpc/_adapter/_intermediary_low.py
+++ b/src/python/grpcio/grpc/_adapter/_intermediary_low.py
@@ -127,7 +127,7 @@ class Call(object):
def write(self, message, tag):
return self._internal.start_batch([
- _types.OpArgs.send_message(message)
+ _types.OpArgs.send_message(message, 0)
], _TagAdapter(tag, Event.Kind.WRITE_ACCEPTED))
def complete(self, tag):
diff --git a/src/python/grpcio/grpc/_adapter/_low.py b/src/python/grpcio/grpc/_adapter/_low.py
index dcf67dbc11..147086e725 100644
--- a/src/python/grpcio/grpc/_adapter/_low.py
+++ b/src/python/grpcio/grpc/_adapter/_low.py
@@ -27,9 +27,12 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+from grpc import _grpcio_metadata
from grpc._adapter import _c
from grpc._adapter import _types
+_USER_AGENT = 'Python-gRPC-{}'.format(_grpcio_metadata.__version__)
+
ClientCredentials = _c.ClientCredentials
ServerCredentials = _c.ServerCredentials
@@ -72,10 +75,14 @@ class Call(_types.Call):
else:
return self.call.cancel(code, details)
+ def peer(self):
+ return self.call.peer()
+
class Channel(_types.Channel):
def __init__(self, target, args, creds=None):
+ args = list(args) + [(_c.PRIMARY_USER_AGENT_KEY, _USER_AGENT)]
if creds is None:
self.channel = _c.Channel(target, args)
else:
@@ -84,6 +91,17 @@ class Channel(_types.Channel):
def create_call(self, completion_queue, method, host, deadline=None):
return Call(self.channel.create_call(completion_queue.completion_queue, method, host, deadline))
+ def check_connectivity_state(self, try_to_connect):
+ return self.channel.check_connectivity_state(try_to_connect)
+
+ def watch_connectivity_state(self, last_observed_state, deadline,
+ completion_queue, tag):
+ self.channel.watch_connectivity_state(
+ last_observed_state, deadline, completion_queue.completion_queue, tag)
+
+ def target(self):
+ return self.channel.target()
+
_NO_TAG = object()
diff --git a/src/python/grpcio/grpc/_adapter/_types.py b/src/python/grpcio/grpc/_adapter/_types.py
index 5ddb1774ea..5470d2de4a 100644
--- a/src/python/grpcio/grpc/_adapter/_types.py
+++ b/src/python/grpcio/grpc/_adapter/_types.py
@@ -31,13 +31,12 @@ import abc
import collections
import enum
-# TODO(atash): decide whether or not to move these enums to the _c module to
-# force build errors with upstream changes.
class GrpcChannelArgumentKeys(enum.Enum):
"""Mirrors keys used in grpc_channel_args for GRPC-specific arguments."""
SSL_TARGET_NAME_OVERRIDE = 'grpc.ssl_target_name_override'
+
@enum.unique
class CallError(enum.IntEnum):
"""Mirrors grpc_call_error in the C core."""
@@ -53,6 +52,7 @@ class CallError(enum.IntEnum):
ERROR_INVALID_FLAGS = 9
ERROR_INVALID_METADATA = 10
+
@enum.unique
class StatusCode(enum.IntEnum):
"""Mirrors grpc_status_code in the C core."""
@@ -74,6 +74,14 @@ class StatusCode(enum.IntEnum):
DATA_LOSS = 15
UNAUTHENTICATED = 16
+
+@enum.unique
+class OpWriteFlags(enum.IntEnum):
+ """Mirrors defined write-flag constants in the C core."""
+ WRITE_BUFFER_HINT = 1
+ WRITE_NO_COMPRESS = 2
+
+
@enum.unique
class OpType(enum.IntEnum):
"""Mirrors grpc_op_type in the C core."""
@@ -86,12 +94,24 @@ class OpType(enum.IntEnum):
RECV_STATUS_ON_CLIENT = 6
RECV_CLOSE_ON_SERVER = 7
+
@enum.unique
class EventType(enum.IntEnum):
"""Mirrors grpc_completion_type in the C core."""
- QUEUE_SHUTDOWN = 0
- QUEUE_TIMEOUT = 1 # if seen on the Python side, something went horridly wrong
- OP_COMPLETE = 2
+ QUEUE_SHUTDOWN = 0
+ QUEUE_TIMEOUT = 1 # if seen on the Python side, something went horridly wrong
+ OP_COMPLETE = 2
+
+
+@enum.unique
+class ConnectivityState(enum.IntEnum):
+ """Mirrors grpc_connectivity_state in the C core."""
+ IDLE = 0
+ CONNECTING = 1
+ READY = 2
+ TRANSIENT_FAILURE = 3
+ FATAL_FAILURE = 4
+
class Status(collections.namedtuple(
'Status', [
@@ -105,6 +125,7 @@ class Status(collections.namedtuple(
details (str): ...
"""
+
class CallDetails(collections.namedtuple(
'CallDetails', [
'method',
@@ -119,6 +140,7 @@ class CallDetails(collections.namedtuple(
deadline (float): ...
"""
+
class OpArgs(collections.namedtuple(
'OpArgs', [
'type',
@@ -126,6 +148,7 @@ class OpArgs(collections.namedtuple(
'trailing_metadata',
'message',
'status',
+ 'write_flags',
])):
"""Arguments passed into a GRPC operation.
@@ -138,39 +161,40 @@ class OpArgs(collections.namedtuple(
message (bytes): Only valid if type == OpType.SEND_MESSAGE, else is None.
status (Status): Only valid if type == OpType.SEND_STATUS_FROM_SERVER, else
is None.
+ write_flags (int): a bit OR'ing of 0 or more OpWriteFlags values.
"""
@staticmethod
def send_initial_metadata(initial_metadata):
- return OpArgs(OpType.SEND_INITIAL_METADATA, initial_metadata, None, None, None)
+ return OpArgs(OpType.SEND_INITIAL_METADATA, initial_metadata, None, None, None, 0)
@staticmethod
- def send_message(message):
- return OpArgs(OpType.SEND_MESSAGE, None, None, message, None)
+ def send_message(message, flags):
+ return OpArgs(OpType.SEND_MESSAGE, None, None, message, None, flags)
@staticmethod
def send_close_from_client():
- return OpArgs(OpType.SEND_CLOSE_FROM_CLIENT, None, None, None, None)
+ return OpArgs(OpType.SEND_CLOSE_FROM_CLIENT, None, None, None, None, 0)
@staticmethod
def send_status_from_server(trailing_metadata, status_code, status_details):
- return OpArgs(OpType.SEND_STATUS_FROM_SERVER, None, trailing_metadata, None, Status(status_code, status_details))
+ return OpArgs(OpType.SEND_STATUS_FROM_SERVER, None, trailing_metadata, None, Status(status_code, status_details), 0)
@staticmethod
def recv_initial_metadata():
- return OpArgs(OpType.RECV_INITIAL_METADATA, None, None, None, None);
+ return OpArgs(OpType.RECV_INITIAL_METADATA, None, None, None, None, 0);
@staticmethod
def recv_message():
- return OpArgs(OpType.RECV_MESSAGE, None, None, None, None)
+ return OpArgs(OpType.RECV_MESSAGE, None, None, None, None, 0)
@staticmethod
def recv_status_on_client():
- return OpArgs(OpType.RECV_STATUS_ON_CLIENT, None, None, None, None)
+ return OpArgs(OpType.RECV_STATUS_ON_CLIENT, None, None, None, None, 0)
@staticmethod
def recv_close_on_server():
- return OpArgs(OpType.RECV_CLOSE_ON_SERVER, None, None, None, None)
+ return OpArgs(OpType.RECV_CLOSE_ON_SERVER, None, None, None, None, 0)
class OpResult(collections.namedtuple(
@@ -290,6 +314,15 @@ class Call:
"""
return CallError.ERROR
+ @abc.abstractmethod
+ def peer(self):
+ """Get the peer of this call.
+
+ Returns:
+ str: the peer of this call.
+ """
+ return None
+
class Channel:
__metaclass__ = abc.ABCMeta
@@ -321,6 +354,40 @@ class Channel:
"""
return None
+ @abc.abstractmethod
+ def check_connectivity_state(self, try_to_connect):
+ """Check and optionally repair the connectivity state of the channel.
+
+ Args:
+ try_to_connect (bool): whether or not to try to connect the channel if
+ disconnected.
+
+ Returns:
+ ConnectivityState: state of the channel at the time of this invocation.
+ """
+ return None
+
+ @abc.abstractmethod
+ def watch_connectivity_state(self, last_observed_state, deadline,
+ completion_queue, tag):
+ """Watch for connectivity state changes from the last_observed_state.
+
+ Args:
+ last_observed_state (ConnectivityState): ...
+ deadline (float): ...
+ completion_queue (CompletionQueue): ...
+ tag (object) ...
+ """
+
+ @abc.abstractmethod
+ def target(self):
+ """Get the target of this channel.
+
+ Returns:
+ str: the target of this channel.
+ """
+ return None
+
class Server:
__metaclass__ = abc.ABCMeta
diff --git a/src/python/grpcio/grpc/_links/service.py b/src/python/grpcio/grpc/_links/service.py
index 7783e91824..5c636d61ab 100644
--- a/src/python/grpcio/grpc/_links/service.py
+++ b/src/python/grpcio/grpc/_links/service.py
@@ -44,7 +44,10 @@ from grpc.framework.interfaces.links import links
@enum.unique
class _Read(enum.Enum):
READING = 'reading'
- AWAITING_ALLOWANCE = 'awaiting allowance'
+ # TODO(issue 2916): This state will again be necessary after eliminating the
+ # "early_read" field of _RPCState and going back to only reading when granted
+ # allowance to read.
+ # AWAITING_ALLOWANCE = 'awaiting allowance'
CLOSED = 'closed'
@@ -67,12 +70,15 @@ class _RPCState(object):
def __init__(
self, request_deserializer, response_serializer, sequence_number, read,
- allowance, high_write, low_write, premetadataed, terminal_metadata, code,
- message):
+ early_read, allowance, high_write, low_write, premetadataed,
+ terminal_metadata, code, message):
self.request_deserializer = request_deserializer
self.response_serializer = response_serializer
self.sequence_number = sequence_number
self.read = read
+ # TODO(issue 2916): Eliminate this by eliminating the necessity of calling
+ # call.read just to advance the RPC.
+ self.early_read = early_read # A raw (not deserialized) read.
self.allowance = allowance
self.high_write = high_write
self.low_write = low_write
@@ -120,7 +126,7 @@ class _Kernel(object):
call.read(call)
self._rpc_states[call] = _RPCState(
- request_deserializer, response_serializer, 1, _Read.READING, 0,
+ request_deserializer, response_serializer, 1, _Read.READING, None, 1,
_HighWrite.OPEN, _LowWrite.OPEN, False, None, None, None)
ticket = links.Ticket(
call, 0, group, method, links.Ticket.Subscription.FULL,
@@ -140,12 +146,15 @@ class _Kernel(object):
termination = links.Ticket.Termination.COMPLETION
else:
if 0 < rpc_state.allowance:
+ payload = rpc_state.request_deserializer(event.bytes)
+ termination = None
rpc_state.allowance -= 1
call.read(call)
else:
- rpc_state.read = _Read.AWAITING_ALLOWANCE
- payload = rpc_state.request_deserializer(event.bytes)
- termination = None
+ rpc_state.early_read = event.bytes
+ return
+ # TODO(issue 2916): Instead of returning:
+ # rpc_state.read = _Read.AWAITING_ALLOWANCE
ticket = links.Ticket(
call, rpc_state.sequence_number, None, None, None, None, None, None,
payload, None, None, None, termination)
@@ -237,12 +246,22 @@ class _Kernel(object):
rpc_state.premetadataed = True
if ticket.allowance is not None:
- if rpc_state.read is _Read.AWAITING_ALLOWANCE:
- rpc_state.allowance += ticket.allowance - 1
- call.read(call)
- rpc_state.read = _Read.READING
- else:
+ if rpc_state.early_read is None:
rpc_state.allowance += ticket.allowance
+ else:
+ payload = rpc_state.request_deserializer(rpc_state.early_read)
+ rpc_state.allowance += ticket.allowance - 1
+ rpc_state.early_read = None
+ if rpc_state.read is _Read.READING:
+ call.read(call)
+ termination = None
+ else:
+ termination = links.Ticket.Termination.COMPLETION
+ ticket = links.Ticket(
+ call, rpc_state.sequence_number, None, None, None, None, None,
+ None, payload, None, None, None, termination)
+ rpc_state.sequence_number += 1
+ self._relay.add_value(ticket)
if ticket.payload is not None:
call.write(rpc_state.response_serializer(ticket.payload), call)
diff --git a/src/python/grpcio/grpc/early_adopter/implementations.py b/src/python/grpcio/grpc/early_adopter/implementations.py
index 10919fae69..9c396aa7ad 100644
--- a/src/python/grpcio/grpc/early_adopter/implementations.py
+++ b/src/python/grpcio/grpc/early_adopter/implementations.py
@@ -41,13 +41,15 @@ from grpc.framework.base import util as _base_utilities
from grpc.framework.face import implementations as _face_implementations
from grpc.framework.foundation import logging_pool
-_THREAD_POOL_SIZE = 8
+_DEFAULT_THREAD_POOL_SIZE = 8
_ONE_DAY_IN_SECONDS = 24 * 60 * 60
class _Server(interfaces.Server):
- def __init__(self, breakdown, port, private_key, certificate_chain):
+ def __init__(
+ self, breakdown, port, private_key, certificate_chain,
+ thread_pool_size=_DEFAULT_THREAD_POOL_SIZE):
self._lock = threading.Lock()
self._breakdown = breakdown
self._port = port
@@ -56,6 +58,7 @@ class _Server(interfaces.Server):
else:
self._key_chain_pairs = ((private_key, certificate_chain),)
+ self._pool_size = thread_pool_size
self._pool = None
self._back = None
self._fore_link = None
@@ -63,7 +66,7 @@ class _Server(interfaces.Server):
def _start(self):
with self._lock:
if self._pool is None:
- self._pool = logging_pool.pool(_THREAD_POOL_SIZE)
+ self._pool = logging_pool.pool(self._pool_size)
servicer = _face_implementations.servicer(
self._pool, self._breakdown.implementations, None)
self._back = _base_implementations.back_link(
@@ -114,7 +117,8 @@ class _Stub(interfaces.Stub):
def __init__(
self, breakdown, host, port, secure, root_certificates, private_key,
- certificate_chain, metadata_transformer=None, server_host_override=None):
+ certificate_chain, metadata_transformer=None, server_host_override=None,
+ thread_pool_size=_DEFAULT_THREAD_POOL_SIZE):
self._lock = threading.Lock()
self._breakdown = breakdown
self._host = host
@@ -126,6 +130,7 @@ class _Stub(interfaces.Stub):
self._metadata_transformer = metadata_transformer
self._server_host_override = server_host_override
+ self._pool_size = thread_pool_size
self._pool = None
self._front = None
self._rear_link = None
@@ -134,7 +139,7 @@ class _Stub(interfaces.Stub):
def __enter__(self):
with self._lock:
if self._pool is None:
- self._pool = logging_pool.pool(_THREAD_POOL_SIZE)
+ self._pool = logging_pool.pool(self._pool_size)
self._front = _base_implementations.front_link(
self._pool, self._pool, self._pool)
self._rear_link = _rear.RearLink(
@@ -193,7 +198,7 @@ class _Stub(interfaces.Stub):
def stub(
service_name, methods, host, port, metadata_transformer=None, secure=False,
root_certificates=None, private_key=None, certificate_chain=None,
- server_host_override=None):
+ server_host_override=None, thread_pool_size=_DEFAULT_THREAD_POOL_SIZE):
"""Constructs an interfaces.Stub.
Args:
@@ -216,6 +221,8 @@ def stub(
certificate chain should be used.
server_host_override: (For testing only) the target name used for SSL
host name checking.
+ thread_pool_size: The maximum number of threads to allow in the backing
+ thread pool.
Returns:
An interfaces.Stub affording RPC invocation.
@@ -224,11 +231,13 @@ def stub(
return _Stub(
breakdown, host, port, secure, root_certificates, private_key,
certificate_chain, server_host_override=server_host_override,
- metadata_transformer=metadata_transformer)
+ metadata_transformer=metadata_transformer,
+ thread_pool_size=thread_pool_size)
def server(
- service_name, methods, port, private_key=None, certificate_chain=None):
+ service_name, methods, port, private_key=None, certificate_chain=None,
+ thread_pool_size=_DEFAULT_THREAD_POOL_SIZE):
"""Constructs an interfaces.Server.
Args:
@@ -242,9 +251,12 @@ def server(
private_key: A pem-encoded private key, or None for an insecure server.
certificate_chain: A pem-encoded certificate chain, or None for an insecure
server.
+ thread_pool_size: The maximum number of threads to allow in the backing
+ thread pool.
Returns:
An interfaces.Server that will serve secure traffic.
"""
breakdown = _face_utilities.break_down_service(service_name, methods)
- return _Server(breakdown, port, private_key, certificate_chain)
+ return _Server(breakdown, port, private_key, certificate_chain,
+ thread_pool_size=thread_pool_size)
diff --git a/src/python/grpcio/grpc/framework/core/__init__.py b/src/python/grpcio/grpc/framework/core/__init__.py
new file mode 100644
index 0000000000..7086519106
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/__init__.py
@@ -0,0 +1,30 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
diff --git a/src/python/grpcio/grpc/framework/core/_constants.py b/src/python/grpcio/grpc/framework/core/_constants.py
new file mode 100644
index 0000000000..d3be3a4c4a
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_constants.py
@@ -0,0 +1,59 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Private constants for the package."""
+
+from grpc.framework.interfaces.base import base
+from grpc.framework.interfaces.links import links
+
+TICKET_SUBSCRIPTION_FOR_BASE_SUBSCRIPTION_KIND = {
+ base.Subscription.Kind.NONE: links.Ticket.Subscription.NONE,
+ base.Subscription.Kind.TERMINATION_ONLY:
+ links.Ticket.Subscription.TERMINATION,
+ base.Subscription.Kind.FULL: links.Ticket.Subscription.FULL,
+ }
+
+# Mapping from abortive operation outcome to ticket termination to be
+# sent to the other side of the operation, or None to indicate that no
+# ticket should be sent to the other side in the event of such an
+# outcome.
+ABORTION_OUTCOME_TO_TICKET_TERMINATION = {
+ base.Outcome.CANCELLED: links.Ticket.Termination.CANCELLATION,
+ base.Outcome.EXPIRED: links.Ticket.Termination.EXPIRATION,
+ base.Outcome.LOCAL_SHUTDOWN: links.Ticket.Termination.SHUTDOWN,
+ base.Outcome.REMOTE_SHUTDOWN: None,
+ base.Outcome.RECEPTION_FAILURE: links.Ticket.Termination.RECEPTION_FAILURE,
+ base.Outcome.TRANSMISSION_FAILURE: None,
+ base.Outcome.LOCAL_FAILURE: links.Ticket.Termination.LOCAL_FAILURE,
+ base.Outcome.REMOTE_FAILURE: links.Ticket.Termination.REMOTE_FAILURE,
+}
+
+INTERNAL_ERROR_LOG_MESSAGE = ':-( RPC Framework (Core) internal error! )-:'
+TERMINATION_CALLBACK_EXCEPTION_LOG_MESSAGE = (
+ 'Exception calling termination callback!')
diff --git a/src/python/grpcio/grpc/framework/core/_context.py b/src/python/grpcio/grpc/framework/core/_context.py
new file mode 100644
index 0000000000..24a12b612e
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_context.py
@@ -0,0 +1,92 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""State and behavior for operation context."""
+
+import time
+
+# _interfaces is referenced from specification in this module.
+from grpc.framework.core import _interfaces # pylint: disable=unused-import
+from grpc.framework.interfaces.base import base
+
+
+class OperationContext(base.OperationContext):
+ """An implementation of interfaces.OperationContext."""
+
+ def __init__(
+ self, lock, termination_manager, transmission_manager,
+ expiration_manager):
+ """Constructor.
+
+ Args:
+ lock: The operation-wide lock.
+ termination_manager: The _interfaces.TerminationManager for the operation.
+ transmission_manager: The _interfaces.TransmissionManager for the
+ operation.
+ expiration_manager: The _interfaces.ExpirationManager for the operation.
+ """
+ self._lock = lock
+ self._termination_manager = termination_manager
+ self._transmission_manager = transmission_manager
+ self._expiration_manager = expiration_manager
+
+ def _abort(self, outcome):
+ with self._lock:
+ if self._termination_manager.outcome is None:
+ self._termination_manager.abort(outcome)
+ self._transmission_manager.abort(outcome)
+ self._expiration_manager.terminate()
+
+ def outcome(self):
+ """See base.OperationContext.outcome for specification."""
+ with self._lock:
+ return self._termination_manager.outcome
+
+ def add_termination_callback(self, callback):
+ """See base.OperationContext.add_termination_callback."""
+ with self._lock:
+ if self._termination_manager.outcome is None:
+ self._termination_manager.add_callback(callback)
+ return None
+ else:
+ return self._termination_manager.outcome
+
+ def time_remaining(self):
+ """See base.OperationContext.time_remaining for specification."""
+ with self._lock:
+ deadline = self._expiration_manager.deadline()
+ return max(0.0, deadline - time.time())
+
+ def cancel(self):
+ """See base.OperationContext.cancel for specification."""
+ self._abort(base.Outcome.CANCELLED)
+
+ def fail(self, exception):
+ """See base.OperationContext.fail for specification."""
+ self._abort(base.Outcome.LOCAL_FAILURE)
diff --git a/src/python/grpcio/grpc/framework/core/_emission.py b/src/python/grpcio/grpc/framework/core/_emission.py
new file mode 100644
index 0000000000..7c702ab2ce
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_emission.py
@@ -0,0 +1,97 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""State and behavior for handling emitted values."""
+
+from grpc.framework.core import _interfaces
+from grpc.framework.interfaces.base import base
+
+
+class EmissionManager(_interfaces.EmissionManager):
+ """An EmissionManager implementation."""
+
+ def __init__(
+ self, lock, termination_manager, transmission_manager,
+ expiration_manager):
+ """Constructor.
+
+ Args:
+ lock: The operation-wide lock.
+ termination_manager: The _interfaces.TerminationManager for the operation.
+ transmission_manager: The _interfaces.TransmissionManager for the
+ operation.
+ expiration_manager: The _interfaces.ExpirationManager for the operation.
+ """
+ self._lock = lock
+ self._termination_manager = termination_manager
+ self._transmission_manager = transmission_manager
+ self._expiration_manager = expiration_manager
+ self._ingestion_manager = None
+
+ self._initial_metadata_seen = False
+ self._payload_seen = False
+ self._completion_seen = False
+
+ def set_ingestion_manager(self, ingestion_manager):
+ """Sets the ingestion manager with which this manager will cooperate.
+
+ Args:
+ ingestion_manager: The _interfaces.IngestionManager for the operation.
+ """
+ self._ingestion_manager = ingestion_manager
+
+ def advance(
+ self, initial_metadata=None, payload=None, completion=None,
+ allowance=None):
+ initial_metadata_present = initial_metadata is not None
+ payload_present = payload is not None
+ completion_present = completion is not None
+ allowance_present = allowance is not None
+ with self._lock:
+ if self._termination_manager.outcome is None:
+ if (initial_metadata_present and (
+ self._initial_metadata_seen or self._payload_seen or
+ self._completion_seen) or
+ payload_present and self._completion_seen or
+ completion_present and self._completion_seen or
+ allowance_present and allowance <= 0):
+ self._termination_manager.abort(base.Outcome.LOCAL_FAILURE)
+ self._transmission_manager.abort(base.Outcome.LOCAL_FAILURE)
+ self._expiration_manager.terminate()
+ else:
+ self._initial_metadata_seen |= initial_metadata_present
+ self._payload_seen |= payload_present
+ self._completion_seen |= completion_present
+ if completion_present:
+ self._termination_manager.emission_complete()
+ self._ingestion_manager.local_emissions_done()
+ self._transmission_manager.advance(
+ initial_metadata, payload, completion, allowance)
+ if allowance_present:
+ self._ingestion_manager.add_local_allowance(allowance)
diff --git a/src/python/grpcio/grpc/framework/core/_end.py b/src/python/grpcio/grpc/framework/core/_end.py
new file mode 100644
index 0000000000..fb2c532df6
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_end.py
@@ -0,0 +1,251 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Implementation of base.End."""
+
+import abc
+import enum
+import threading
+import uuid
+
+from grpc.framework.core import _operation
+from grpc.framework.core import _utilities
+from grpc.framework.foundation import callable_util
+from grpc.framework.foundation import later
+from grpc.framework.foundation import logging_pool
+from grpc.framework.interfaces.base import base
+from grpc.framework.interfaces.links import links
+from grpc.framework.interfaces.links import utilities
+
+_IDLE_ACTION_EXCEPTION_LOG_MESSAGE = 'Exception calling idle action!'
+
+
+class End(base.End, links.Link):
+ """A bridge between base.End and links.Link.
+
+ Implementations of this interface translate arriving tickets into
+ calls on application objects implementing base interfaces and
+ translate calls from application objects implementing base interfaces
+ into tickets sent to a joined link.
+ """
+ __metaclass__ = abc.ABCMeta
+
+
+class _Cycle(object):
+ """State for a single start-stop End lifecycle."""
+
+ def __init__(self, pool):
+ self.pool = pool
+ self.grace = False
+ self.futures = []
+ self.operations = {}
+ self.idle_actions = []
+
+
+def _abort(operations):
+ for operation in operations:
+ operation.abort(base.Outcome.LOCAL_SHUTDOWN)
+
+
+def _cancel_futures(futures):
+ for future in futures:
+ futures.cancel()
+
+
+def _future_shutdown(lock, cycle, event):
+ def in_future():
+ with lock:
+ _abort(cycle.operations.values())
+ _cancel_futures(cycle.futures)
+ pool = cycle.pool
+ cycle.pool.shutdown(wait=True)
+ return in_future
+
+
+def _termination_action(lock, stats, operation_id, cycle):
+ """Constructs the termination action for a single operation.
+
+ Args:
+ lock: A lock to hold during the termination action.
+ states: A mapping from base.Outcome values to integers to increment with
+ the outcome given to the termination action.
+ operation_id: The operation ID for the termination action.
+ cycle: A _Cycle value to be updated during the termination action.
+
+ Returns:
+ A callable that takes an operation outcome as its sole parameter and that
+ should be used as the termination action for the operation associated
+ with the given operation ID.
+ """
+ def termination_action(outcome):
+ with lock:
+ stats[outcome] += 1
+ cycle.operations.pop(operation_id, None)
+ if not cycle.operations:
+ for action in cycle.idle_actions:
+ cycle.pool.submit(action)
+ cycle.idle_actions = []
+ if cycle.grace:
+ _cancel_futures(cycle.futures)
+ return termination_action
+
+
+class _End(End):
+ """An End implementation."""
+
+ def __init__(self, servicer_package):
+ """Constructor.
+
+ Args:
+ servicer_package: A _ServicerPackage for servicing operations or None if
+ this end will not be used to service operations.
+ """
+ self._lock = threading.Condition()
+ self._servicer_package = servicer_package
+
+ self._stats = {outcome: 0 for outcome in base.Outcome}
+
+ self._mate = None
+
+ self._cycle = None
+
+ def start(self):
+ """See base.End.start for specification."""
+ with self._lock:
+ if self._cycle is not None:
+ raise ValueError('Tried to start a not-stopped End!')
+ else:
+ self._cycle = _Cycle(logging_pool.pool(1))
+
+ def stop(self, grace):
+ """See base.End.stop for specification."""
+ with self._lock:
+ if self._cycle is None:
+ event = threading.Event()
+ event.set()
+ return event
+ elif not self._cycle.operations:
+ event = threading.Event()
+ self._cycle.pool.submit(event.set)
+ self._cycle.pool.shutdown(wait=False)
+ self._cycle = None
+ return event
+ else:
+ self._cycle.grace = True
+ event = threading.Event()
+ self._cycle.idle_actions.append(event.set)
+ if 0 < grace:
+ future = later.later(
+ grace, _future_shutdown(self._lock, self._cycle, event))
+ self._cycle.futures.append(future)
+ else:
+ _abort(self._cycle.operations.values())
+ return event
+
+ def operate(
+ self, group, method, subscription, timeout, initial_metadata=None,
+ payload=None, completion=None):
+ """See base.End.operate for specification."""
+ operation_id = uuid.uuid4()
+ with self._lock:
+ if self._cycle is None or self._cycle.grace:
+ raise ValueError('Can\'t operate on stopped or stopping End!')
+ termination_action = _termination_action(
+ self._lock, self._stats, operation_id, self._cycle)
+ operation = _operation.invocation_operate(
+ operation_id, group, method, subscription, timeout, initial_metadata,
+ payload, completion, self._mate.accept_ticket, termination_action,
+ self._cycle.pool)
+ self._cycle.operations[operation_id] = operation
+ return operation.context, operation.operator
+
+ def operation_stats(self):
+ """See base.End.operation_stats for specification."""
+ with self._lock:
+ return dict(self._stats)
+
+ def add_idle_action(self, action):
+ """See base.End.add_idle_action for specification."""
+ with self._lock:
+ if self._cycle is None:
+ raise ValueError('Can\'t add idle action to stopped End!')
+ action_with_exceptions_logged = callable_util.with_exceptions_logged(
+ action, _IDLE_ACTION_EXCEPTION_LOG_MESSAGE)
+ if self._cycle.operations:
+ self._cycle.idle_actions.append(action_with_exceptions_logged)
+ else:
+ self._cycle.pool.submit(action_with_exceptions_logged)
+
+ def accept_ticket(self, ticket):
+ """See links.Link.accept_ticket for specification."""
+ with self._lock:
+ if self._cycle is not None and not self._cycle.grace:
+ operation = self._cycle.operations.get(ticket.operation_id)
+ if operation is not None:
+ operation.handle_ticket(ticket)
+ elif self._servicer_package is not None:
+ termination_action = _termination_action(
+ self._lock, self._stats, ticket.operation_id, self._cycle)
+ operation = _operation.service_operate(
+ self._servicer_package, ticket, self._mate.accept_ticket,
+ termination_action, self._cycle.pool)
+ if operation is not None:
+ self._cycle.operations[ticket.operation_id] = operation
+
+ def join_link(self, link):
+ """See links.Link.join_link for specification."""
+ with self._lock:
+ self._mate = utilities.NULL_LINK if link is None else link
+
+
+def serviceless_end_link():
+ """Constructs an End usable only for invoking operations.
+
+ Returns:
+ An End usable for translating operations into ticket exchange.
+ """
+ return _End(None)
+
+
+def serviceful_end_link(servicer, default_timeout, maximum_timeout):
+ """Constructs an End capable of servicing operations.
+
+ Args:
+ servicer: An interfaces.Servicer for servicing operations.
+ default_timeout: A length of time in seconds to be used as the default
+ time alloted for a single operation.
+ maximum_timeout: A length of time in seconds to be used as the maximum
+ time alloted for a single operation.
+
+ Returns:
+ An End capable of servicing the operations requested of it through ticket
+ exchange.
+ """
+ return _End(
+ _utilities.ServicerPackage(servicer, default_timeout, maximum_timeout))
diff --git a/src/python/grpcio/grpc/framework/core/_expiration.py b/src/python/grpcio/grpc/framework/core/_expiration.py
new file mode 100644
index 0000000000..d94bdf2d2b
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_expiration.py
@@ -0,0 +1,152 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""State and behavior for operation expiration."""
+
+import time
+
+from grpc.framework.core import _interfaces
+from grpc.framework.foundation import later
+from grpc.framework.interfaces.base import base
+
+
+class _ExpirationManager(_interfaces.ExpirationManager):
+ """An implementation of _interfaces.ExpirationManager."""
+
+ def __init__(
+ self, commencement, timeout, maximum_timeout, lock, termination_manager,
+ transmission_manager):
+ """Constructor.
+
+ Args:
+ commencement: The time in seconds since the epoch at which the operation
+ began.
+ timeout: A length of time in seconds to allow for the operation to run.
+ maximum_timeout: The maximum length of time in seconds to allow for the
+ operation to run despite what is requested via this object's
+ change_timout method.
+ lock: The operation-wide lock.
+ termination_manager: The _interfaces.TerminationManager for the operation.
+ transmission_manager: The _interfaces.TransmissionManager for the
+ operation.
+ """
+ self._lock = lock
+ self._termination_manager = termination_manager
+ self._transmission_manager = transmission_manager
+ self._commencement = commencement
+ self._maximum_timeout = maximum_timeout
+
+ self._timeout = timeout
+ self._deadline = commencement + timeout
+ self._index = None
+ self._future = None
+
+ def _expire(self, index):
+ def expire():
+ with self._lock:
+ if self._future is not None and index == self._index:
+ self._future = None
+ self._termination_manager.expire()
+ self._transmission_manager.abort(base.Outcome.EXPIRED)
+ return expire
+
+ def start(self):
+ self._index = 0
+ self._future = later.later(self._timeout, self._expire(0))
+
+ def change_timeout(self, timeout):
+ if self._future is not None and timeout != self._timeout:
+ self._future.cancel()
+ new_timeout = min(timeout, self._maximum_timeout)
+ new_index = self._index + 1
+ self._timeout = new_timeout
+ self._deadline = self._commencement + new_timeout
+ self._index = new_index
+ delay = self._deadline - time.time()
+ self._future = later.later(delay, self._expire(new_index))
+ if new_timeout != timeout:
+ self._transmission_manager.timeout(new_timeout)
+
+ def deadline(self):
+ return self._deadline
+
+ def terminate(self):
+ if self._future:
+ self._future.cancel()
+ self._future = None
+ self._deadline_index = None
+
+
+def invocation_expiration_manager(
+ timeout, lock, termination_manager, transmission_manager):
+ """Creates an _interfaces.ExpirationManager appropriate for front-side use.
+
+ Args:
+ timeout: A length of time in seconds to allow for the operation to run.
+ lock: The operation-wide lock.
+ termination_manager: The _interfaces.TerminationManager for the operation.
+ transmission_manager: The _interfaces.TransmissionManager for the
+ operation.
+
+ Returns:
+ An _interfaces.ExpirationManager appropriate for invocation-side use.
+ """
+ expiration_manager = _ExpirationManager(
+ time.time(), timeout, timeout, lock, termination_manager,
+ transmission_manager)
+ expiration_manager.start()
+ return expiration_manager
+
+
+def service_expiration_manager(
+ timeout, default_timeout, maximum_timeout, lock, termination_manager,
+ transmission_manager):
+ """Creates an _interfaces.ExpirationManager appropriate for back-side use.
+
+ Args:
+ timeout: A length of time in seconds to allow for the operation to run. May
+ be None in which case default_timeout will be used.
+ default_timeout: The default length of time in seconds to allow for the
+ operation to run if the front-side customer has not specified such a value
+ (or if the value they specified is not yet known).
+ maximum_timeout: The maximum length of time in seconds to allow for the
+ operation to run.
+ lock: The operation-wide lock.
+ termination_manager: The _interfaces.TerminationManager for the operation.
+ transmission_manager: The _interfaces.TransmissionManager for the
+ operation.
+
+ Returns:
+ An _interfaces.ExpirationManager appropriate for service-side use.
+ """
+ expiration_manager = _ExpirationManager(
+ time.time(), default_timeout if timeout is None else timeout,
+ maximum_timeout, lock, termination_manager, transmission_manager)
+ expiration_manager.start()
+ return expiration_manager
diff --git a/src/python/grpcio/grpc/framework/core/_ingestion.py b/src/python/grpcio/grpc/framework/core/_ingestion.py
new file mode 100644
index 0000000000..59f7f8adc8
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_ingestion.py
@@ -0,0 +1,410 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""State and behavior for ingestion during an operation."""
+
+import abc
+import collections
+
+from grpc.framework.core import _constants
+from grpc.framework.core import _interfaces
+from grpc.framework.foundation import abandonment
+from grpc.framework.foundation import callable_util
+from grpc.framework.interfaces.base import base
+
+_CREATE_SUBSCRIPTION_EXCEPTION_LOG_MESSAGE = 'Exception initializing ingestion!'
+_INGESTION_EXCEPTION_LOG_MESSAGE = 'Exception during ingestion!'
+
+
+class _SubscriptionCreation(collections.namedtuple(
+ '_SubscriptionCreation', ('subscription', 'remote_error', 'abandoned'))):
+ """A sum type for the outcome of ingestion initialization.
+
+ Either subscription will be non-None, remote_error will be True, or abandoned
+ will be True.
+
+ Attributes:
+ subscription: A base.Subscription describing the customer's interest in
+ operation values from the other side.
+ remote_error: A boolean indicating that the subscription could not be
+ created due to an error on the remote side of the operation.
+ abandoned: A boolean indicating that subscription creation was abandoned.
+ """
+
+
+class _SubscriptionCreator(object):
+ """Common specification of subscription-creating behavior."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def create(self, group, method):
+ """Creates the base.Subscription of the local customer.
+
+ Any exceptions raised by this method should be attributed to and treated as
+ defects in the customer code called by this method.
+
+ Args:
+ group: The group identifier of the operation.
+ method: The method identifier of the operation.
+
+ Returns:
+ A _SubscriptionCreation describing the result of subscription creation.
+ """
+ raise NotImplementedError()
+
+
+class _ServiceSubscriptionCreator(_SubscriptionCreator):
+ """A _SubscriptionCreator appropriate for service-side use."""
+
+ def __init__(self, servicer, operation_context, output_operator):
+ """Constructor.
+
+ Args:
+ servicer: The base.Servicer that will service the operation.
+ operation_context: A base.OperationContext for the operation to be passed
+ to the customer.
+ output_operator: A base.Operator for the operation to be passed to the
+ customer and to be called by the customer to accept operation data
+ emitted by the customer.
+ """
+ self._servicer = servicer
+ self._operation_context = operation_context
+ self._output_operator = output_operator
+
+ def create(self, group, method):
+ try:
+ subscription = self._servicer.service(
+ group, method, self._operation_context, self._output_operator)
+ except base.NoSuchMethodError:
+ return _SubscriptionCreation(None, True, False)
+ except abandonment.Abandoned:
+ return _SubscriptionCreation(None, False, True)
+ else:
+ return _SubscriptionCreation(subscription, False, False)
+
+
+def _wrap(behavior):
+ def wrapped(*args, **kwargs):
+ try:
+ behavior(*args, **kwargs)
+ except abandonment.Abandoned:
+ return False
+ else:
+ return True
+ return wrapped
+
+
+class _IngestionManager(_interfaces.IngestionManager):
+ """An implementation of _interfaces.IngestionManager."""
+
+ def __init__(
+ self, lock, pool, subscription, subscription_creator, termination_manager,
+ transmission_manager, expiration_manager):
+ """Constructor.
+
+ Args:
+ lock: The operation-wide lock.
+ pool: A thread pool in which to execute customer code.
+ subscription: A base.Subscription describing the customer's interest in
+ operation values from the other side. May be None if
+ subscription_creator is not None.
+ subscription_creator: A _SubscriptionCreator wrapping the portion of
+ customer code that when called returns the base.Subscription describing
+ the customer's interest in operation values from the other side. May be
+ None if subscription is not None.
+ termination_manager: The _interfaces.TerminationManager for the operation.
+ transmission_manager: The _interfaces.TransmissionManager for the
+ operation.
+ expiration_manager: The _interfaces.ExpirationManager for the operation.
+ """
+ self._lock = lock
+ self._pool = pool
+ self._termination_manager = termination_manager
+ self._transmission_manager = transmission_manager
+ self._expiration_manager = expiration_manager
+
+ if subscription is None:
+ self._subscription_creator = subscription_creator
+ self._wrapped_operator = None
+ elif subscription.kind is base.Subscription.Kind.FULL:
+ self._subscription_creator = None
+ self._wrapped_operator = _wrap(subscription.operator.advance)
+ else:
+ # TODO(nathaniel): Support other subscriptions.
+ raise ValueError('Unsupported subscription "%s"!' % subscription.kind)
+ self._pending_initial_metadata = None
+ self._pending_payloads = []
+ self._pending_completion = None
+ self._local_allowance = 1
+ # A nonnegative integer or None, with None indicating that the local
+ # customer is done emitting anyway so there's no need to bother it by
+ # informing it that the remote customer has granted it further permission to
+ # emit.
+ self._remote_allowance = 0
+ self._processing = False
+
+ def _abort_internal_only(self):
+ self._subscription_creator = None
+ self._wrapped_operator = None
+ self._pending_initial_metadata = None
+ self._pending_payloads = None
+ self._pending_completion = None
+
+ def _abort_and_notify(self, outcome):
+ self._abort_internal_only()
+ self._termination_manager.abort(outcome)
+ self._transmission_manager.abort(outcome)
+ self._expiration_manager.terminate()
+
+ def _operator_next(self):
+ """Computes the next step for full-subscription ingestion.
+
+ Returns:
+ An initial_metadata, payload, completion, allowance, continue quintet
+ indicating what operation values (if any) are available to pass into
+ customer code and whether or not there is anything immediately
+ actionable to call customer code to do.
+ """
+ if self._wrapped_operator is None:
+ return None, None, None, None, False
+ else:
+ initial_metadata, payload, completion, allowance, action = [None] * 5
+ if self._pending_initial_metadata is not None:
+ initial_metadata = self._pending_initial_metadata
+ self._pending_initial_metadata = None
+ action = True
+ if self._pending_payloads and 0 < self._local_allowance:
+ payload = self._pending_payloads.pop(0)
+ self._local_allowance -= 1
+ action = True
+ if not self._pending_payloads and self._pending_completion is not None:
+ completion = self._pending_completion
+ self._pending_completion = None
+ action = True
+ if self._remote_allowance is not None and 0 < self._remote_allowance:
+ allowance = self._remote_allowance
+ self._remote_allowance = 0
+ action = True
+ return initial_metadata, payload, completion, allowance, bool(action)
+
+ def _operator_process(
+ self, wrapped_operator, initial_metadata, payload,
+ completion, allowance):
+ while True:
+ advance_outcome = callable_util.call_logging_exceptions(
+ wrapped_operator, _INGESTION_EXCEPTION_LOG_MESSAGE,
+ initial_metadata=initial_metadata, payload=payload,
+ completion=completion, allowance=allowance)
+ if advance_outcome.exception is None:
+ if advance_outcome.return_value:
+ with self._lock:
+ if self._termination_manager.outcome is not None:
+ return
+ if completion is not None:
+ self._termination_manager.ingestion_complete()
+ initial_metadata, payload, completion, allowance, moar = (
+ self._operator_next())
+ if not moar:
+ self._processing = False
+ return
+ else:
+ with self._lock:
+ if self._termination_manager.outcome is None:
+ self._abort_and_notify(base.Outcome.LOCAL_FAILURE)
+ return
+ else:
+ with self._lock:
+ if self._termination_manager.outcome is None:
+ self._abort_and_notify(base.Outcome.LOCAL_FAILURE)
+ return
+
+ def _operator_post_create(self, subscription):
+ wrapped_operator = _wrap(subscription.operator.advance)
+ with self._lock:
+ if self._termination_manager.outcome is not None:
+ return
+ self._wrapped_operator = wrapped_operator
+ self._subscription_creator = None
+ metadata, payload, completion, allowance, moar = self._operator_next()
+ if not moar:
+ self._processing = False
+ return
+ self._operator_process(
+ wrapped_operator, metadata, payload, completion, allowance)
+
+ def _create(self, subscription_creator, group, name):
+ outcome = callable_util.call_logging_exceptions(
+ subscription_creator.create, _CREATE_SUBSCRIPTION_EXCEPTION_LOG_MESSAGE,
+ group, name)
+ if outcome.return_value is None:
+ with self._lock:
+ if self._termination_manager.outcome is None:
+ self._abort_and_notify(base.Outcome.LOCAL_FAILURE)
+ elif outcome.return_value.abandoned:
+ with self._lock:
+ if self._termination_manager.outcome is None:
+ self._abort_and_notify(base.Outcome.LOCAL_FAILURE)
+ elif outcome.return_value.remote_error:
+ with self._lock:
+ if self._termination_manager.outcome is None:
+ self._abort_and_notify(base.Outcome.REMOTE_FAILURE)
+ elif outcome.return_value.subscription.kind is base.Subscription.Kind.FULL:
+ self._operator_post_create(outcome.return_value.subscription)
+ else:
+ # TODO(nathaniel): Support other subscriptions.
+ raise ValueError(
+ 'Unsupported "%s"!' % outcome.return_value.subscription.kind)
+
+ def _store_advance(self, initial_metadata, payload, completion, allowance):
+ if initial_metadata is not None:
+ self._pending_initial_metadata = initial_metadata
+ if payload is not None:
+ self._pending_payloads.append(payload)
+ if completion is not None:
+ self._pending_completion = completion
+ if allowance is not None and self._remote_allowance is not None:
+ self._remote_allowance += allowance
+
+ def _operator_advance(self, initial_metadata, payload, completion, allowance):
+ if self._processing:
+ self._store_advance(initial_metadata, payload, completion, allowance)
+ else:
+ action = False
+ if initial_metadata is not None:
+ action = True
+ if payload is not None:
+ if 0 < self._local_allowance:
+ self._local_allowance -= 1
+ action = True
+ else:
+ self._pending_payloads.append(payload)
+ payload = False
+ if completion is not None:
+ if self._pending_payloads:
+ self._pending_completion = completion
+ else:
+ action = True
+ if allowance is not None and self._remote_allowance is not None:
+ allowance += self._remote_allowance
+ self._remote_allowance = 0
+ action = True
+ if action:
+ self._pool.submit(
+ callable_util.with_exceptions_logged(
+ self._operator_process, _constants.INTERNAL_ERROR_LOG_MESSAGE),
+ self._wrapped_operator, initial_metadata, payload, completion,
+ allowance)
+
+ def set_group_and_method(self, group, method):
+ """See _interfaces.IngestionManager.set_group_and_method for spec."""
+ if self._subscription_creator is not None and not self._processing:
+ self._pool.submit(
+ callable_util.with_exceptions_logged(
+ self._create, _constants.INTERNAL_ERROR_LOG_MESSAGE),
+ self._subscription_creator, group, method)
+ self._processing = True
+
+ def add_local_allowance(self, allowance):
+ """See _interfaces.IngestionManager.add_local_allowance for spec."""
+ if any((self._subscription_creator, self._wrapped_operator,)):
+ self._local_allowance += allowance
+ if not self._processing:
+ initial_metadata, payload, completion, allowance, moar = (
+ self._operator_next())
+ if moar:
+ self._pool.submit(
+ callable_util.with_exceptions_logged(
+ self._operator_process,
+ _constants.INTERNAL_ERROR_LOG_MESSAGE),
+ initial_metadata, payload, completion, allowance)
+
+ def local_emissions_done(self):
+ self._remote_allowance = None
+
+ def advance(self, initial_metadata, payload, completion, allowance):
+ """See _interfaces.IngestionManager.advance for specification."""
+ if self._subscription_creator is not None:
+ self._store_advance(initial_metadata, payload, completion, allowance)
+ elif self._wrapped_operator is not None:
+ self._operator_advance(initial_metadata, payload, completion, allowance)
+
+
+def invocation_ingestion_manager(
+ subscription, lock, pool, termination_manager, transmission_manager,
+ expiration_manager):
+ """Creates an IngestionManager appropriate for invocation-side use.
+
+ Args:
+ subscription: A base.Subscription indicating the customer's interest in the
+ data and results from the service-side of the operation.
+ lock: The operation-wide lock.
+ pool: A thread pool in which to execute customer code.
+ termination_manager: The _interfaces.TerminationManager for the operation.
+ transmission_manager: The _interfaces.TransmissionManager for the
+ operation.
+ expiration_manager: The _interfaces.ExpirationManager for the operation.
+
+ Returns:
+ An IngestionManager appropriate for invocation-side use.
+ """
+ return _IngestionManager(
+ lock, pool, subscription, None, termination_manager, transmission_manager,
+ expiration_manager)
+
+
+def service_ingestion_manager(
+ servicer, operation_context, output_operator, lock, pool,
+ termination_manager, transmission_manager, expiration_manager):
+ """Creates an IngestionManager appropriate for service-side use.
+
+ The returned IngestionManager will require its set_group_and_name method to be
+ called before its advance method may be called.
+
+ Args:
+ servicer: A base.Servicer for servicing the operation.
+ operation_context: A base.OperationContext for the operation to be passed to
+ the customer.
+ output_operator: A base.Operator for the operation to be passed to the
+ customer and to be called by the customer to accept operation data output
+ by the customer.
+ lock: The operation-wide lock.
+ pool: A thread pool in which to execute customer code.
+ termination_manager: The _interfaces.TerminationManager for the operation.
+ transmission_manager: The _interfaces.TransmissionManager for the
+ operation.
+ expiration_manager: The _interfaces.ExpirationManager for the operation.
+
+ Returns:
+ An IngestionManager appropriate for service-side use.
+ """
+ subscription_creator = _ServiceSubscriptionCreator(
+ servicer, operation_context, output_operator)
+ return _IngestionManager(
+ lock, pool, None, subscription_creator, termination_manager,
+ transmission_manager, expiration_manager)
diff --git a/src/python/grpcio/grpc/framework/core/_interfaces.py b/src/python/grpcio/grpc/framework/core/_interfaces.py
new file mode 100644
index 0000000000..a626b9f767
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_interfaces.py
@@ -0,0 +1,308 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Package-internal interfaces."""
+
+import abc
+
+from grpc.framework.interfaces.base import base
+
+
+class TerminationManager(object):
+ """An object responsible for handling the termination of an operation.
+
+ Attributes:
+ outcome: None if the operation is active or a base.Outcome value if it has
+ terminated.
+ """
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def add_callback(self, callback):
+ """Registers a callback to be called on operation termination.
+
+ If the operation has already terminated the callback will not be called.
+
+ Args:
+ callback: A callable that will be passed an interfaces.Outcome value.
+
+ Returns:
+ None if the operation has not yet terminated and the passed callback will
+ be called when it does, or a base.Outcome value describing the operation
+ termination if the operation has terminated and the callback will not be
+ called as a result of this method call.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def emission_complete(self):
+ """Indicates that emissions from customer code have completed."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def transmission_complete(self):
+ """Indicates that transmissions to the remote end are complete.
+
+ Returns:
+ True if the operation has terminated or False if the operation remains
+ ongoing.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def reception_complete(self):
+ """Indicates that reception from the other side is complete."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def ingestion_complete(self):
+ """Indicates that customer code ingestion of received values is complete."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def expire(self):
+ """Indicates that the operation must abort because it has taken too long."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def abort(self, outcome):
+ """Indicates that the operation must abort for the indicated reason.
+
+ Args:
+ outcome: An interfaces.Outcome indicating operation abortion.
+ """
+ raise NotImplementedError()
+
+
+class TransmissionManager(object):
+ """A manager responsible for transmitting to the other end of an operation."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def kick_off(
+ self, group, method, timeout, initial_metadata, payload, completion,
+ allowance):
+ """Transmits the values associated with operation invocation."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def advance(self, initial_metadata, payload, completion, allowance):
+ """Accepts values for transmission to the other end of the operation.
+
+ Args:
+ initial_metadata: An initial metadata value to be transmitted to the other
+ side of the operation. May only ever be non-None once.
+ payload: A payload value.
+ completion: A base.Completion value. May only ever be non-None in the last
+ transmission to be made to the other side.
+ allowance: A positive integer communicating the number of additional
+ payloads allowed to be transmitted from the other side to this side of
+ the operation, or None if no additional allowance is being granted in
+ this call.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def timeout(self, timeout):
+ """Accepts for transmission to the other side a new timeout value.
+
+ Args:
+ timeout: A positive float used as the new timeout value for the operation
+ to be transmitted to the other side.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def allowance(self, allowance):
+ """Indicates to this manager that the remote customer is allowing payloads.
+
+ Args:
+ allowance: A positive integer indicating the number of additional payloads
+ the remote customer is allowing to be transmitted from this side of the
+ operation.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def remote_complete(self):
+ """Indicates to this manager that data from the remote side is complete."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def abort(self, outcome):
+ """Indicates that the operation has aborted.
+
+ Args:
+ outcome: An interfaces.Outcome for the operation. If None, indicates that
+ the operation abortion should not be communicated to the other side of
+ the operation.
+ """
+ raise NotImplementedError()
+
+
+class ExpirationManager(object):
+ """A manager responsible for aborting the operation if it runs out of time."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def change_timeout(self, timeout):
+ """Changes the timeout allotted for the operation.
+
+ Operation duration is always measure from the beginning of the operation;
+ calling this method changes the operation's allotted time to timeout total
+ seconds, not timeout seconds from the time of this method call.
+
+ Args:
+ timeout: A length of time in seconds to allow for the operation.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def deadline(self):
+ """Returns the time until which the operation is allowed to run.
+
+ Returns:
+ The time (seconds since the epoch) at which the operation will expire.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def terminate(self):
+ """Indicates to this manager that the operation has terminated."""
+ raise NotImplementedError()
+
+
+class EmissionManager(base.Operator):
+ """A manager of values emitted by customer code."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def advance(
+ self, initial_metadata=None, payload=None, completion=None,
+ allowance=None):
+ """Accepts a value emitted by customer code.
+
+ This method should only be called by customer code.
+
+ Args:
+ initial_metadata: An initial metadata value emitted by the local customer
+ to be sent to the other side of the operation.
+ payload: A payload value emitted by the local customer to be sent to the
+ other side of the operation.
+ completion: A Completion value emitted by the local customer to be sent to
+ the other side of the operation.
+ allowance: A positive integer indicating an additional number of payloads
+ that the local customer is willing to accept from the other side of the
+ operation.
+ """
+ raise NotImplementedError()
+
+
+class IngestionManager(object):
+ """A manager responsible for executing customer code.
+
+ This name of this manager comes from its responsibility to pass successive
+ values from the other side of the operation into the code of the local
+ customer.
+ """
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def set_group_and_method(self, group, method):
+ """Communicates to this IngestionManager the operation group and method.
+
+ Args:
+ group: The group identifier of the operation.
+ method: The method identifier of the operation.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def add_local_allowance(self, allowance):
+ """Communicates to this IngestionManager that more payloads may be ingested.
+
+ Args:
+ allowance: A positive integer indicating an additional number of payloads
+ that the local customer is willing to ingest.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def local_emissions_done(self):
+ """Indicates to this manager that local emissions are done."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def advance(self, initial_metadata, payload, completion, allowance):
+ """Advances the operation by passing values to the local customer."""
+ raise NotImplementedError()
+
+
+class ReceptionManager(object):
+ """A manager responsible for receiving tickets from the other end."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def receive_ticket(self, ticket):
+ """Handle a ticket from the other side of the operation.
+
+ Args:
+ ticket: An interfaces.BackToFrontTicket or interfaces.FrontToBackTicket
+ appropriate to this end of the operation and this object.
+ """
+ raise NotImplementedError()
+
+
+class Operation(object):
+ """An ongoing operation.
+
+ Attributes:
+ context: A base.OperationContext object for the operation.
+ operator: A base.Operator object for the operation for use by the customer
+ of the operation.
+ """
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def handle_ticket(self, ticket):
+ """Handle a ticket from the other side of the operation.
+
+ Args:
+ ticket: A links.Ticket from the other side of the operation.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def abort(self, outcome):
+ """Aborts the operation.
+
+ Args:
+ outcome: A base.Outcome value indicating operation abortion.
+ """
+ raise NotImplementedError()
diff --git a/src/python/grpcio/grpc/framework/core/_operation.py b/src/python/grpcio/grpc/framework/core/_operation.py
new file mode 100644
index 0000000000..d20e40a53d
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_operation.py
@@ -0,0 +1,192 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Implementation of operations."""
+
+import threading
+
+# _utilities is referenced from specification in this module.
+from grpc.framework.core import _context
+from grpc.framework.core import _emission
+from grpc.framework.core import _expiration
+from grpc.framework.core import _ingestion
+from grpc.framework.core import _interfaces
+from grpc.framework.core import _reception
+from grpc.framework.core import _termination
+from grpc.framework.core import _transmission
+from grpc.framework.core import _utilities # pylint: disable=unused-import
+
+
+class _EasyOperation(_interfaces.Operation):
+ """A trivial implementation of interfaces.Operation."""
+
+ def __init__(
+ self, lock, termination_manager, transmission_manager, expiration_manager,
+ context, operator, reception_manager):
+ """Constructor.
+
+ Args:
+ lock: The operation-wide lock.
+ termination_manager: The _interfaces.TerminationManager for the operation.
+ transmission_manager: The _interfaces.TransmissionManager for the
+ operation.
+ expiration_manager: The _interfaces.ExpirationManager for the operation.
+ context: A base.OperationContext for use by the customer during the
+ operation.
+ operator: A base.Operator for use by the customer during the operation.
+ reception_manager: The _interfaces.ReceptionManager for the operation.
+ """
+ self._lock = lock
+ self._termination_manager = termination_manager
+ self._transmission_manager = transmission_manager
+ self._expiration_manager = expiration_manager
+ self._reception_manager = reception_manager
+
+ self.context = context
+ self.operator = operator
+
+ def handle_ticket(self, ticket):
+ with self._lock:
+ self._reception_manager.receive_ticket(ticket)
+
+ def abort(self, outcome):
+ with self._lock:
+ if self._termination_manager.outcome is None:
+ self._termination_manager.abort(outcome)
+ self._transmission_manager.abort(outcome)
+ self._expiration_manager.terminate()
+
+
+def invocation_operate(
+ operation_id, group, method, subscription, timeout, initial_metadata,
+ payload, completion, ticket_sink, termination_action, pool):
+ """Constructs objects necessary for front-side operation management.
+
+ Args:
+ operation_id: An object identifying the operation.
+ group: The group identifier of the operation.
+ method: The method identifier of the operation.
+ subscription: A base.Subscription describing the customer's interest in the
+ results of the operation.
+ timeout: A length of time in seconds to allow for the operation.
+ initial_metadata: An initial metadata value to be sent to the other side of
+ the operation. May be None if the initial metadata will be passed later or
+ if there will be no initial metadata passed at all.
+ payload: The first payload value to be transmitted to the other side. May be
+ None if there is no such value or if the customer chose not to pass it at
+ operation invocation.
+ completion: A base.Completion value indicating the end of values passed to
+ the other side of the operation.
+ ticket_sink: A callable that accepts links.Tickets and delivers them to the
+ other side of the operation.
+ termination_action: A callable that accepts the outcome of the operation as
+ a base.Outcome value to be called on operation completion.
+ pool: A thread pool with which to do the work of the operation.
+
+ Returns:
+ An _interfaces.Operation for the operation.
+ """
+ lock = threading.Lock()
+ with lock:
+ termination_manager = _termination.invocation_termination_manager(
+ termination_action, pool)
+ transmission_manager = _transmission.TransmissionManager(
+ operation_id, ticket_sink, lock, pool, termination_manager)
+ expiration_manager = _expiration.invocation_expiration_manager(
+ timeout, lock, termination_manager, transmission_manager)
+ operation_context = _context.OperationContext(
+ lock, termination_manager, transmission_manager, expiration_manager)
+ emission_manager = _emission.EmissionManager(
+ lock, termination_manager, transmission_manager, expiration_manager)
+ ingestion_manager = _ingestion.invocation_ingestion_manager(
+ subscription, lock, pool, termination_manager, transmission_manager,
+ expiration_manager)
+ reception_manager = _reception.ReceptionManager(
+ termination_manager, transmission_manager, expiration_manager,
+ ingestion_manager)
+
+ termination_manager.set_expiration_manager(expiration_manager)
+ transmission_manager.set_expiration_manager(expiration_manager)
+ emission_manager.set_ingestion_manager(ingestion_manager)
+
+ transmission_manager.kick_off(
+ group, method, timeout, initial_metadata, payload, completion, None)
+
+ return _EasyOperation(
+ lock, termination_manager, transmission_manager, expiration_manager,
+ operation_context, emission_manager, reception_manager)
+
+
+def service_operate(
+ servicer_package, ticket, ticket_sink, termination_action, pool):
+ """Constructs an Operation for service of an operation.
+
+ Args:
+ servicer_package: A _utilities.ServicerPackage to be used servicing the
+ operation.
+ ticket: The first links.Ticket received for the operation.
+ ticket_sink: A callable that accepts links.Tickets and delivers them to the
+ other side of the operation.
+ termination_action: A callable that accepts the outcome of the operation as
+ a base.Outcome value to be called on operation completion.
+ pool: A thread pool with which to do the work of the operation.
+
+ Returns:
+ An _interfaces.Operation for the operation.
+ """
+ lock = threading.Lock()
+ with lock:
+ termination_manager = _termination.service_termination_manager(
+ termination_action, pool)
+ transmission_manager = _transmission.TransmissionManager(
+ ticket.operation_id, ticket_sink, lock, pool, termination_manager)
+ expiration_manager = _expiration.service_expiration_manager(
+ ticket.timeout, servicer_package.default_timeout,
+ servicer_package.maximum_timeout, lock, termination_manager,
+ transmission_manager)
+ operation_context = _context.OperationContext(
+ lock, termination_manager, transmission_manager, expiration_manager)
+ emission_manager = _emission.EmissionManager(
+ lock, termination_manager, transmission_manager, expiration_manager)
+ ingestion_manager = _ingestion.service_ingestion_manager(
+ servicer_package.servicer, operation_context, emission_manager, lock,
+ pool, termination_manager, transmission_manager, expiration_manager)
+ reception_manager = _reception.ReceptionManager(
+ termination_manager, transmission_manager, expiration_manager,
+ ingestion_manager)
+
+ termination_manager.set_expiration_manager(expiration_manager)
+ transmission_manager.set_expiration_manager(expiration_manager)
+ emission_manager.set_ingestion_manager(ingestion_manager)
+
+ reception_manager.receive_ticket(ticket)
+
+ return _EasyOperation(
+ lock, termination_manager, transmission_manager, expiration_manager,
+ operation_context, emission_manager, reception_manager)
diff --git a/src/python/grpcio/grpc/framework/core/_reception.py b/src/python/grpcio/grpc/framework/core/_reception.py
new file mode 100644
index 0000000000..b64faf8146
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_reception.py
@@ -0,0 +1,137 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""State and behavior for ticket reception."""
+
+from grpc.framework.core import _interfaces
+from grpc.framework.interfaces.base import base
+from grpc.framework.interfaces.base import utilities
+from grpc.framework.interfaces.links import links
+
+_REMOTE_TICKET_TERMINATION_TO_LOCAL_OUTCOME = {
+ links.Ticket.Termination.CANCELLATION: base.Outcome.CANCELLED,
+ links.Ticket.Termination.EXPIRATION: base.Outcome.EXPIRED,
+ links.Ticket.Termination.SHUTDOWN: base.Outcome.REMOTE_SHUTDOWN,
+ links.Ticket.Termination.RECEPTION_FAILURE: base.Outcome.RECEPTION_FAILURE,
+ links.Ticket.Termination.TRANSMISSION_FAILURE:
+ base.Outcome.TRANSMISSION_FAILURE,
+ links.Ticket.Termination.LOCAL_FAILURE: base.Outcome.REMOTE_FAILURE,
+}
+
+
+class ReceptionManager(_interfaces.ReceptionManager):
+ """A ReceptionManager based around a _Receiver passed to it."""
+
+ def __init__(
+ self, termination_manager, transmission_manager, expiration_manager,
+ ingestion_manager):
+ """Constructor.
+
+ Args:
+ termination_manager: The operation's _interfaces.TerminationManager.
+ transmission_manager: The operation's _interfaces.TransmissionManager.
+ expiration_manager: The operation's _interfaces.ExpirationManager.
+ ingestion_manager: The operation's _interfaces.IngestionManager.
+ """
+ self._termination_manager = termination_manager
+ self._transmission_manager = transmission_manager
+ self._expiration_manager = expiration_manager
+ self._ingestion_manager = ingestion_manager
+
+ self._lowest_unseen_sequence_number = 0
+ self._out_of_sequence_tickets = {}
+ self._aborted = False
+
+ def _abort(self, outcome):
+ self._aborted = True
+ self._termination_manager.abort(outcome)
+ self._transmission_manager.abort(outcome)
+ self._expiration_manager.terminate()
+
+ def _sequence_failure(self, ticket):
+ """Determines a just-arrived ticket's sequential legitimacy.
+
+ Args:
+ ticket: A just-arrived ticket.
+
+ Returns:
+ True if the ticket is sequentially legitimate; False otherwise.
+ """
+ if ticket.sequence_number < self._lowest_unseen_sequence_number:
+ return True
+ elif ticket.sequence_number in self._out_of_sequence_tickets:
+ return True
+ else:
+ return False
+
+ def _process_one(self, ticket):
+ if ticket.sequence_number == 0:
+ self._ingestion_manager.set_group_and_method(ticket.group, ticket.method)
+ if ticket.timeout is not None:
+ self._expiration_manager.change_timeout(ticket.timeout)
+ if ticket.termination is None:
+ completion = None
+ else:
+ completion = utilities.completion(
+ ticket.terminal_metadata, ticket.code, ticket.message)
+ self._ingestion_manager.advance(
+ ticket.initial_metadata, ticket.payload, completion, ticket.allowance)
+ if ticket.allowance is not None:
+ self._transmission_manager.allowance(ticket.allowance)
+
+ def _process(self, ticket):
+ """Process those tickets ready to be processed.
+
+ Args:
+ ticket: A just-arrived ticket the sequence number of which matches this
+ _ReceptionManager's _lowest_unseen_sequence_number field.
+ """
+ while True:
+ self._process_one(ticket)
+ next_ticket = self._out_of_sequence_tickets.pop(
+ ticket.sequence_number + 1, None)
+ if next_ticket is None:
+ self._lowest_unseen_sequence_number = ticket.sequence_number + 1
+ return
+ else:
+ ticket = next_ticket
+
+ def receive_ticket(self, ticket):
+ """See _interfaces.ReceptionManager.receive_ticket for specification."""
+ if self._aborted:
+ return
+ elif self._sequence_failure(ticket):
+ self._abort(base.Outcome.RECEPTION_FAILURE)
+ elif ticket.termination not in (None, links.Ticket.Termination.COMPLETION):
+ outcome = _REMOTE_TICKET_TERMINATION_TO_LOCAL_OUTCOME[ticket.termination]
+ self._abort(outcome)
+ elif ticket.sequence_number == self._lowest_unseen_sequence_number:
+ self._process(ticket)
+ else:
+ self._out_of_sequence_tickets[ticket.sequence_number] = ticket
diff --git a/src/python/grpcio/grpc/framework/core/_termination.py b/src/python/grpcio/grpc/framework/core/_termination.py
new file mode 100644
index 0000000000..ad9f6123d8
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_termination.py
@@ -0,0 +1,212 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""State and behavior for operation termination."""
+
+import abc
+
+from grpc.framework.core import _constants
+from grpc.framework.core import _interfaces
+from grpc.framework.foundation import callable_util
+from grpc.framework.interfaces.base import base
+
+
+def _invocation_completion_predicate(
+ unused_emission_complete, unused_transmission_complete,
+ unused_reception_complete, ingestion_complete):
+ return ingestion_complete
+
+
+def _service_completion_predicate(
+ unused_emission_complete, transmission_complete, unused_reception_complete,
+ unused_ingestion_complete):
+ return transmission_complete
+
+
+class TerminationManager(_interfaces.TerminationManager):
+ """A _interfaces.TransmissionManager on which another manager may be set."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def set_expiration_manager(self, expiration_manager):
+ """Sets the expiration manager with which this manager will interact.
+
+ Args:
+ expiration_manager: The _interfaces.ExpirationManager associated with the
+ current operation.
+ """
+ raise NotImplementedError()
+
+
+class _TerminationManager(TerminationManager):
+ """An implementation of TerminationManager."""
+
+ def __init__(self, predicate, action, pool):
+ """Constructor.
+
+ Args:
+ predicate: One of _invocation_completion_predicate or
+ _service_completion_predicate to be used to determine when the operation
+ has completed.
+ action: A behavior to pass the operation outcome on operation termination.
+ pool: A thread pool.
+ """
+ self._predicate = predicate
+ self._action = action
+ self._pool = pool
+ self._expiration_manager = None
+
+ self.outcome = None
+ self._callbacks = []
+
+ self._emission_complete = False
+ self._transmission_complete = False
+ self._reception_complete = False
+ self._ingestion_complete = False
+
+ def set_expiration_manager(self, expiration_manager):
+ self._expiration_manager = expiration_manager
+
+ def _terminate_internal_only(self, outcome):
+ """Terminates the operation.
+
+ Args:
+ outcome: A base.Outcome describing the outcome of the operation.
+ """
+ self.outcome = outcome
+ callbacks = list(self._callbacks)
+ self._callbacks = None
+
+ act = callable_util.with_exceptions_logged(
+ self._action, _constants.INTERNAL_ERROR_LOG_MESSAGE)
+
+ if outcome is base.Outcome.LOCAL_FAILURE:
+ self._pool.submit(act, outcome)
+ else:
+ def call_callbacks_and_act(callbacks, outcome):
+ for callback in callbacks:
+ callback_outcome = callable_util.call_logging_exceptions(
+ callback, _constants.TERMINATION_CALLBACK_EXCEPTION_LOG_MESSAGE,
+ outcome)
+ if callback_outcome.exception is not None:
+ outcome = base.Outcome.LOCAL_FAILURE
+ break
+ act(outcome)
+
+ self._pool.submit(
+ callable_util.with_exceptions_logged(
+ call_callbacks_and_act, _constants.INTERNAL_ERROR_LOG_MESSAGE),
+ callbacks, outcome)
+
+ def _terminate_and_notify(self, outcome):
+ self._terminate_internal_only(outcome)
+ self._expiration_manager.terminate()
+
+ def _perhaps_complete(self):
+ if self._predicate(
+ self._emission_complete, self._transmission_complete,
+ self._reception_complete, self._ingestion_complete):
+ self._terminate_and_notify(base.Outcome.COMPLETED)
+ return True
+ else:
+ return False
+
+ def is_active(self):
+ """See _interfaces.TerminationManager.is_active for specification."""
+ return self.outcome is None
+
+ def add_callback(self, callback):
+ """See _interfaces.TerminationManager.add_callback for specification."""
+ if self.outcome is None:
+ self._callbacks.append(callback)
+ return None
+ else:
+ return self.outcome
+
+ def emission_complete(self):
+ """See superclass method for specification."""
+ if self.outcome is None:
+ self._emission_complete = True
+ self._perhaps_complete()
+
+ def transmission_complete(self):
+ """See superclass method for specification."""
+ if self.outcome is None:
+ self._transmission_complete = True
+ return self._perhaps_complete()
+ else:
+ return False
+
+ def reception_complete(self):
+ """See superclass method for specification."""
+ if self.outcome is None:
+ self._reception_complete = True
+ self._perhaps_complete()
+
+ def ingestion_complete(self):
+ """See superclass method for specification."""
+ if self.outcome is None:
+ self._ingestion_complete = True
+ self._perhaps_complete()
+
+ def expire(self):
+ """See _interfaces.TerminationManager.expire for specification."""
+ self._terminate_internal_only(base.Outcome.EXPIRED)
+
+ def abort(self, outcome):
+ """See _interfaces.TerminationManager.abort for specification."""
+ self._terminate_and_notify(outcome)
+
+
+def invocation_termination_manager(action, pool):
+ """Creates a TerminationManager appropriate for invocation-side use.
+
+ Args:
+ action: An action to call on operation termination.
+ pool: A thread pool in which to execute the passed action and any
+ termination callbacks that are registered during the operation.
+
+ Returns:
+ A TerminationManager appropriate for invocation-side use.
+ """
+ return _TerminationManager(_invocation_completion_predicate, action, pool)
+
+
+def service_termination_manager(action, pool):
+ """Creates a TerminationManager appropriate for service-side use.
+
+ Args:
+ action: An action to call on operation termination.
+ pool: A thread pool in which to execute the passed action and any
+ termination callbacks that are registered during the operation.
+
+ Returns:
+ A TerminationManager appropriate for service-side use.
+ """
+ return _TerminationManager(_service_completion_predicate, action, pool)
diff --git a/src/python/grpcio/grpc/framework/core/_transmission.py b/src/python/grpcio/grpc/framework/core/_transmission.py
new file mode 100644
index 0000000000..01894d398d
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_transmission.py
@@ -0,0 +1,294 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""State and behavior for ticket transmission during an operation."""
+
+from grpc.framework.core import _constants
+from grpc.framework.core import _interfaces
+from grpc.framework.foundation import callable_util
+from grpc.framework.interfaces.base import base
+from grpc.framework.interfaces.links import links
+
+_TRANSMISSION_EXCEPTION_LOG_MESSAGE = 'Exception during transmission!'
+
+
+def _explode_completion(completion):
+ if completion is None:
+ return None, None, None, None
+ else:
+ return (
+ completion.terminal_metadata, completion.code, completion.message,
+ links.Ticket.Termination.COMPLETION)
+
+
+class TransmissionManager(_interfaces.TransmissionManager):
+ """An _interfaces.TransmissionManager that sends links.Tickets."""
+
+ def __init__(
+ self, operation_id, ticket_sink, lock, pool, termination_manager):
+ """Constructor.
+
+ Args:
+ operation_id: The operation's ID.
+ ticket_sink: A callable that accepts tickets and sends them to the other
+ side of the operation.
+ lock: The operation-servicing-wide lock object.
+ pool: A thread pool in which the work of transmitting tickets will be
+ performed.
+ termination_manager: The _interfaces.TerminationManager associated with
+ this operation.
+ """
+ self._lock = lock
+ self._pool = pool
+ self._ticket_sink = ticket_sink
+ self._operation_id = operation_id
+ self._termination_manager = termination_manager
+ self._expiration_manager = None
+
+ self._lowest_unused_sequence_number = 0
+ self._remote_allowance = 1
+ self._remote_complete = False
+ self._timeout = None
+ self._local_allowance = 0
+ self._initial_metadata = None
+ self._payloads = []
+ self._completion = None
+ self._aborted = False
+ self._abortion_outcome = None
+ self._transmitting = False
+
+ def set_expiration_manager(self, expiration_manager):
+ """Sets the ExpirationManager with which this manager will cooperate."""
+ self._expiration_manager = expiration_manager
+
+ def _next_ticket(self):
+ """Creates the next ticket to be transmitted.
+
+ Returns:
+ A links.Ticket to be sent to the other side of the operation or None if
+ there is nothing to be sent at this time.
+ """
+ if self._aborted:
+ if self._abortion_outcome is None:
+ return None
+ else:
+ termination = _constants.ABORTION_OUTCOME_TO_TICKET_TERMINATION[
+ self._abortion_outcome]
+ if termination is None:
+ return None
+ else:
+ self._abortion_outcome = None
+ return links.Ticket(
+ self._operation_id, self._lowest_unused_sequence_number, None,
+ None, None, None, None, None, None, None, None, None,
+ termination)
+
+ action = False
+ # TODO(nathaniel): Support other subscriptions.
+ local_subscription = links.Ticket.Subscription.FULL
+ timeout = self._timeout
+ if timeout is not None:
+ self._timeout = None
+ action = True
+ if self._local_allowance <= 0:
+ allowance = None
+ else:
+ allowance = self._local_allowance
+ self._local_allowance = 0
+ action = True
+ initial_metadata = self._initial_metadata
+ if initial_metadata is not None:
+ self._initial_metadata = None
+ action = True
+ if not self._payloads or self._remote_allowance <= 0:
+ payload = None
+ else:
+ payload = self._payloads.pop(0)
+ self._remote_allowance -= 1
+ action = True
+ if self._completion is None or self._payloads:
+ terminal_metadata, code, message, termination = None, None, None, None
+ else:
+ terminal_metadata, code, message, termination = _explode_completion(
+ self._completion)
+ self._completion = None
+ action = True
+
+ if action:
+ ticket = links.Ticket(
+ self._operation_id, self._lowest_unused_sequence_number, None, None,
+ local_subscription, timeout, allowance, initial_metadata, payload,
+ terminal_metadata, code, message, termination)
+ self._lowest_unused_sequence_number += 1
+ return ticket
+ else:
+ return None
+
+ def _transmit(self, ticket):
+ """Commences the transmission loop sending tickets.
+
+ Args:
+ ticket: A links.Ticket to be sent to the other side of the operation.
+ """
+ def transmit(ticket):
+ while True:
+ transmission_outcome = callable_util.call_logging_exceptions(
+ self._ticket_sink, _TRANSMISSION_EXCEPTION_LOG_MESSAGE, ticket)
+ if transmission_outcome.exception is None:
+ with self._lock:
+ if ticket.termination is links.Ticket.Termination.COMPLETION:
+ self._termination_manager.transmission_complete()
+ ticket = self._next_ticket()
+ if ticket is None:
+ self._transmitting = False
+ return
+ else:
+ with self._lock:
+ if self._termination_manager.outcome is None:
+ self._termination_manager.abort(base.Outcome.TRANSMISSION_FAILURE)
+ self._expiration_manager.terminate()
+ return
+
+ self._pool.submit(callable_util.with_exceptions_logged(
+ transmit, _constants.INTERNAL_ERROR_LOG_MESSAGE), ticket)
+ self._transmitting = True
+
+ def kick_off(
+ self, group, method, timeout, initial_metadata, payload, completion,
+ allowance):
+ """See _interfaces.TransmissionManager.kickoff for specification."""
+ # TODO(nathaniel): Support other subscriptions.
+ subscription = links.Ticket.Subscription.FULL
+ terminal_metadata, code, message, termination = _explode_completion(
+ completion)
+ self._remote_allowance = 1 if payload is None else 0
+ ticket = links.Ticket(
+ self._operation_id, 0, group, method, subscription, timeout, allowance,
+ initial_metadata, payload, terminal_metadata, code, message,
+ termination)
+ self._lowest_unused_sequence_number = 1
+ self._transmit(ticket)
+
+ def advance(self, initial_metadata, payload, completion, allowance):
+ """See _interfaces.TransmissionManager.advance for specification."""
+ effective_initial_metadata = initial_metadata
+ effective_payload = payload
+ effective_completion = completion
+ if allowance is not None and not self._remote_complete:
+ effective_allowance = allowance
+ else:
+ effective_allowance = None
+ if self._transmitting:
+ if effective_initial_metadata is not None:
+ self._initial_metadata = effective_initial_metadata
+ if effective_payload is not None:
+ self._payloads.append(effective_payload)
+ if effective_completion is not None:
+ self._completion = effective_completion
+ if effective_allowance is not None:
+ self._local_allowance += effective_allowance
+ else:
+ if effective_payload is not None:
+ if 0 < self._remote_allowance:
+ ticket_payload = effective_payload
+ self._remote_allowance -= 1
+ else:
+ self._payloads.append(effective_payload)
+ ticket_payload = None
+ else:
+ ticket_payload = None
+ if effective_completion is not None and not self._payloads:
+ ticket_completion = effective_completion
+ else:
+ self._completion = effective_completion
+ ticket_completion = None
+ if any(
+ (effective_initial_metadata, ticket_payload, ticket_completion,
+ effective_allowance)):
+ terminal_metadata, code, message, termination = _explode_completion(
+ completion)
+ ticket = links.Ticket(
+ self._operation_id, self._lowest_unused_sequence_number, None, None,
+ None, None, allowance, effective_initial_metadata, ticket_payload,
+ terminal_metadata, code, message, termination)
+ self._lowest_unused_sequence_number += 1
+ self._transmit(ticket)
+
+ def timeout(self, timeout):
+ """See _interfaces.TransmissionManager.timeout for specification."""
+ if self._transmitting:
+ self._timeout = timeout
+ else:
+ ticket = links.Ticket(
+ self._operation_id, self._lowest_unused_sequence_number, None, None,
+ None, timeout, None, None, None, None, None, None, None)
+ self._lowest_unused_sequence_number += 1
+ self._transmit(ticket)
+
+ def allowance(self, allowance):
+ """See _interfaces.TransmissionManager.allowance for specification."""
+ if self._transmitting or not self._payloads:
+ self._remote_allowance += allowance
+ else:
+ self._remote_allowance += allowance - 1
+ payload = self._payloads.pop(0)
+ if self._payloads:
+ completion = None
+ else:
+ completion = self._completion
+ self._completion = None
+ terminal_metadata, code, message, termination = _explode_completion(
+ completion)
+ ticket = links.Ticket(
+ self._operation_id, self._lowest_unused_sequence_number, None, None,
+ None, None, None, None, payload, terminal_metadata, code, message,
+ termination)
+ self._lowest_unused_sequence_number += 1
+ self._transmit(ticket)
+
+ def remote_complete(self):
+ """See _interfaces.TransmissionManager.remote_complete for specification."""
+ self._remote_complete = True
+ self._local_allowance = 0
+
+ def abort(self, outcome):
+ """See _interfaces.TransmissionManager.abort for specification."""
+ if self._transmitting:
+ self._aborted, self._abortion_outcome = True, outcome
+ else:
+ self._aborted = True
+ if outcome is not None:
+ termination = _constants.ABORTION_OUTCOME_TO_TICKET_TERMINATION[
+ outcome]
+ if termination is not None:
+ ticket = links.Ticket(
+ self._operation_id, self._lowest_unused_sequence_number, None,
+ None, None, None, None, None, None, None, None, None,
+ termination)
+ self._transmit(ticket)
diff --git a/src/python/grpcio/grpc/framework/core/_utilities.py b/src/python/grpcio/grpc/framework/core/_utilities.py
new file mode 100644
index 0000000000..5b0d798751
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/_utilities.py
@@ -0,0 +1,46 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Package-internal utilities."""
+
+import collections
+
+
+class ServicerPackage(
+ collections.namedtuple(
+ 'ServicerPackage', ('servicer', 'default_timeout', 'maximum_timeout'))):
+ """A trivial bundle class.
+
+ Attributes:
+ servicer: A base.Servicer.
+ default_timeout: A float indicating the length of time in seconds to allow
+ for an operation invoked without a timeout.
+ maximum_timeout: A float indicating the maximum length of time in seconds to
+ allow for an operation.
+ """
diff --git a/src/python/grpcio/grpc/framework/core/implementations.py b/src/python/grpcio/grpc/framework/core/implementations.py
new file mode 100644
index 0000000000..364a7faed4
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/core/implementations.py
@@ -0,0 +1,62 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Entry points into the ticket-exchange-based base layer implementation."""
+
+# base and links are referenced from specification in this module.
+from grpc.framework.core import _end
+from grpc.framework.interfaces.base import base # pylint: disable=unused-import
+from grpc.framework.interfaces.links import links # pylint: disable=unused-import
+
+
+def invocation_end_link():
+ """Creates a base.End-links.Link suitable for operation invocation.
+
+ Returns:
+ An object that is both a base.End and a links.Link, that supports operation
+ invocation, and that translates operation invocation into ticket exchange.
+ """
+ return _end.serviceless_end_link()
+
+
+def service_end_link(servicer, default_timeout, maximum_timeout):
+ """Creates a base.End-links.Link suitable for operation service.
+
+ Args:
+ servicer: A base.Servicer for servicing operations.
+ default_timeout: A length of time in seconds to be used as the default
+ time alloted for a single operation.
+ maximum_timeout: A length of time in seconds to be used as the maximum
+ time alloted for a single operation.
+
+ Returns:
+ An object that is both a base.End and a links.Link and that services
+ operations that arrive at it through ticket exchange.
+ """
+ return _end.serviceful_end_link(servicer, default_timeout, maximum_timeout)
diff --git a/src/python/grpcio/grpc/framework/interfaces/base/__init__.py b/src/python/grpcio/grpc/framework/interfaces/base/__init__.py
new file mode 100644
index 0000000000..7086519106
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/interfaces/base/__init__.py
@@ -0,0 +1,30 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
diff --git a/src/python/grpcio/grpc/framework/interfaces/base/base.py b/src/python/grpcio/grpc/framework/interfaces/base/base.py
new file mode 100644
index 0000000000..76e0a5bdae
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/interfaces/base/base.py
@@ -0,0 +1,290 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""The base interface of RPC Framework.
+
+Implementations of this interface support the conduct of "operations":
+exchanges between two distinct ends of an arbitrary number of data payloads
+and metadata such as a name for the operation, initial and terminal metadata
+in each direction, and flow control. These operations may be used for transfers
+of data, remote procedure calls, status indication, or anything else
+applications choose.
+"""
+
+# threading is referenced from specification in this module.
+import abc
+import enum
+import threading
+
+# abandonment is referenced from specification in this module.
+from grpc.framework.foundation import abandonment # pylint: disable=unused-import
+
+
+class NoSuchMethodError(Exception):
+ """Indicates that an unrecognized operation has been called."""
+
+
+@enum.unique
+class Outcome(enum.Enum):
+ """Operation outcomes."""
+
+ COMPLETED = 'completed'
+ CANCELLED = 'cancelled'
+ EXPIRED = 'expired'
+ LOCAL_SHUTDOWN = 'local shutdown'
+ REMOTE_SHUTDOWN = 'remote shutdown'
+ RECEPTION_FAILURE = 'reception failure'
+ TRANSMISSION_FAILURE = 'transmission failure'
+ LOCAL_FAILURE = 'local failure'
+ REMOTE_FAILURE = 'remote failure'
+
+
+class Completion(object):
+ """An aggregate of the values exchanged upon operation completion.
+
+ Attributes:
+ terminal_metadata: A terminal metadata value for the operaton.
+ code: A code value for the operation.
+ message: A message value for the operation.
+ """
+ __metaclass__ = abc.ABCMeta
+
+
+class OperationContext(object):
+ """Provides operation-related information and action."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def outcome(self):
+ """Indicates the operation's outcome (or that the operation is ongoing).
+
+ Returns:
+ None if the operation is still active or the Outcome value for the
+ operation if it has terminated.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def add_termination_callback(self, callback):
+ """Adds a function to be called upon operation termination.
+
+ Args:
+ callback: A callable to be passed an Outcome value on operation
+ termination.
+
+ Returns:
+ None if the operation has not yet terminated and the passed callback will
+ later be called when it does terminate, or if the operation has already
+ terminated an Outcome value describing the operation termination and the
+ passed callback will not be called as a result of this method call.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def time_remaining(self):
+ """Describes the length of allowed time remaining for the operation.
+
+ Returns:
+ A nonnegative float indicating the length of allowed time in seconds
+ remaining for the operation to complete before it is considered to have
+ timed out. Zero is returned if the operation has terminated.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def cancel(self):
+ """Cancels the operation if the operation has not yet terminated."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def fail(self, exception):
+ """Indicates that the operation has failed.
+
+ Args:
+ exception: An exception germane to the operation failure. May be None.
+ """
+ raise NotImplementedError()
+
+
+class Operator(object):
+ """An interface through which to participate in an operation."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def advance(
+ self, initial_metadata=None, payload=None, completion=None,
+ allowance=None):
+ """Progresses the operation.
+
+ Args:
+ initial_metadata: An initial metadata value. Only one may ever be
+ communicated in each direction for an operation, and they must be
+ communicated no later than either the first payload or the completion.
+ payload: A payload value.
+ completion: A Completion value. May only ever be non-None once in either
+ direction, and no payloads may be passed after it has been communicated.
+ allowance: A positive integer communicating the number of additional
+ payloads allowed to be passed by the remote side of the operation.
+ """
+ raise NotImplementedError()
+
+
+class Subscription(object):
+ """Describes customer code's interest in values from the other side.
+
+ Attributes:
+ kind: A Kind value describing the overall kind of this value.
+ termination_callback: A callable to be passed the Outcome associated with
+ the operation after it has terminated. Must be non-None if kind is
+ Kind.TERMINATION_ONLY. Must be None otherwise.
+ allowance: A callable behavior that accepts positive integers representing
+ the number of additional payloads allowed to be passed to the other side
+ of the operation. Must be None if kind is Kind.FULL. Must not be None
+ otherwise.
+ operator: An Operator to be passed values from the other side of the
+ operation. Must be non-None if kind is Kind.FULL. Must be None otherwise.
+ """
+
+ @enum.unique
+ class Kind(enum.Enum):
+
+ NONE = 'none'
+ TERMINATION_ONLY = 'termination only'
+ FULL = 'full'
+
+
+class Servicer(object):
+ """Interface for service implementations."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def service(self, group, method, context, output_operator):
+ """Services an operation.
+
+ Args:
+ group: The group identifier of the operation to be serviced.
+ method: The method identifier of the operation to be serviced.
+ context: An OperationContext object affording contextual information and
+ actions.
+ output_operator: An Operator that will accept output values of the
+ operation.
+
+ Returns:
+ A Subscription via which this object may or may not accept more values of
+ the operation.
+
+ Raises:
+ NoSuchMethodError: If this Servicer does not handle operations with the
+ given group and method.
+ abandonment.Abandoned: If the operation has been aborted and there no
+ longer is any reason to service the operation.
+ """
+ raise NotImplementedError()
+
+
+class End(object):
+ """Common type for entry-point objects on both sides of an operation."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def start(self):
+ """Starts this object's service of operations."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def stop(self, grace):
+ """Stops this object's service of operations.
+
+ This object will refuse service of new operations as soon as this method is
+ called but operations under way at the time of the call may be given a
+ grace period during which they are allowed to finish.
+
+ Args:
+ grace: A duration of time in seconds to allow ongoing operations to
+ terminate before being forcefully terminated by the stopping of this
+ End. May be zero to terminate all ongoing operations and immediately
+ stop.
+
+ Returns:
+ A threading.Event that will be set to indicate all operations having
+ terminated and this End having completely stopped. The returned event
+ may not be set until after the full grace period (if some ongoing
+ operation continues for the full length of the period) or it may be set
+ much sooner (if for example this End had no operations in progress at
+ the time its stop method was called).
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def operate(
+ self, group, method, subscription, timeout, initial_metadata=None,
+ payload=None, completion=None):
+ """Commences an operation.
+
+ Args:
+ group: The group identifier of the invoked operation.
+ method: The method identifier of the invoked operation.
+ subscription: A Subscription to which the results of the operation will be
+ passed.
+ timeout: A length of time in seconds to allow for the operation.
+ initial_metadata: An initial metadata value to be sent to the other side
+ of the operation. May be None if the initial metadata will be later
+ passed via the returned operator or if there will be no initial metadata
+ passed at all.
+ payload: An initial payload for the operation.
+ completion: A Completion value indicating the end of transmission to the
+ other side of the operation.
+
+ Returns:
+ A pair of objects affording information about the operation and action
+ continuing the operation. The first element of the returned pair is an
+ OperationContext for the operation and the second element of the
+ returned pair is an Operator to which operation values not passed in
+ this call should later be passed.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def operation_stats(self):
+ """Reports the number of terminated operations broken down by outcome.
+
+ Returns:
+ A dictionary from Outcome value to an integer identifying the number
+ of operations that terminated with that outcome.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def add_idle_action(self, action):
+ """Adds an action to be called when this End has no ongoing operations.
+
+ Args:
+ action: A callable that accepts no arguments.
+ """
+ raise NotImplementedError()
diff --git a/src/python/grpcio/grpc/framework/interfaces/base/utilities.py b/src/python/grpcio/grpc/framework/interfaces/base/utilities.py
new file mode 100644
index 0000000000..a9ee1a0981
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/interfaces/base/utilities.py
@@ -0,0 +1,79 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Utilities for use with the base interface of RPC Framework."""
+
+import collections
+
+from grpc.framework.interfaces.base import base
+
+
+class _Completion(
+ base.Completion,
+ collections.namedtuple(
+ '_Completion', ('terminal_metadata', 'code', 'message',))):
+ """A trivial implementation of base.Completion."""
+
+
+class _Subscription(
+ base.Subscription,
+ collections.namedtuple(
+ '_Subscription',
+ ('kind', 'termination_callback', 'allowance', 'operator',))):
+ """A trivial implementation of base.Subscription."""
+
+_NONE_SUBSCRIPTION = _Subscription(
+ base.Subscription.Kind.NONE, None, None, None)
+
+
+def completion(terminal_metadata, code, message):
+ """Creates a base.Completion aggregating the given operation values.
+
+ Args:
+ terminal_metadata: A terminal metadata value for an operaton.
+ code: A code value for an operation.
+ message: A message value for an operation.
+
+ Returns:
+ A base.Completion aggregating the given operation values.
+ """
+ return _Completion(terminal_metadata, code, message)
+
+
+def full_subscription(operator):
+ """Creates a "full" base.Subscription for the given base.Operator.
+
+ Args:
+ operator: A base.Operator to be used in an operation.
+
+ Returns:
+ A base.Subscription of kind base.Subscription.Kind.FULL wrapping the given
+ base.Operator.
+ """
+ return _Subscription(base.Subscription.Kind.FULL, None, None, operator)
diff --git a/src/python/grpcio/grpc/framework/interfaces/links/links.py b/src/python/grpcio/grpc/framework/interfaces/links/links.py
index 5ebbac8a6f..069ff024dd 100644
--- a/src/python/grpcio/grpc/framework/interfaces/links/links.py
+++ b/src/python/grpcio/grpc/framework/interfaces/links/links.py
@@ -98,7 +98,7 @@ class Ticket(
COMPLETION = 'completion'
CANCELLATION = 'cancellation'
EXPIRATION = 'expiration'
- LOCAL_SHUTDOWN = 'local shutdown'
+ SHUTDOWN = 'shutdown'
RECEPTION_FAILURE = 'reception failure'
TRANSMISSION_FAILURE = 'transmission failure'
LOCAL_FAILURE = 'local failure'
diff --git a/src/python/grpcio_health_checking/MANIFEST.in b/src/python/grpcio_health_checking/MANIFEST.in
new file mode 100644
index 0000000000..498b55f20a
--- /dev/null
+++ b/src/python/grpcio_health_checking/MANIFEST.in
@@ -0,0 +1,2 @@
+graft grpc
+include commands.py
diff --git a/src/python/grpcio_health_checking/README.rst b/src/python/grpcio_health_checking/README.rst
new file mode 100644
index 0000000000..600734e50d
--- /dev/null
+++ b/src/python/grpcio_health_checking/README.rst
@@ -0,0 +1,9 @@
+gRPC Python Health Checking
+===========================
+
+Reference package for GRPC Python health checking.
+
+Dependencies
+------------
+
+Depends on the `grpcio` package, available from PyPI via `pip install grpcio`.
diff --git a/src/python/grpcio_health_checking/commands.py b/src/python/grpcio_health_checking/commands.py
new file mode 100644
index 0000000000..6a95e679c4
--- /dev/null
+++ b/src/python/grpcio_health_checking/commands.py
@@ -0,0 +1,80 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Provides distutils command classes for the GRPC Python setup process."""
+
+import distutils
+import glob
+import os
+import os.path
+import subprocess
+import sys
+
+import setuptools
+from setuptools.command import build_py
+
+
+class BuildProtoModules(setuptools.Command):
+ """Command to generate project *_pb2.py modules from proto files."""
+
+ description = ''
+ user_options = []
+
+ def initialize_options(self):
+ pass
+
+ def finalize_options(self):
+ self.protoc_command = 'protoc'
+ self.grpc_python_plugin_command = distutils.spawn.find_executable(
+ 'grpc_python_plugin')
+
+ def run(self):
+ paths = []
+ root_directory = os.getcwd()
+ for walk_root, directories, filenames in os.walk(root_directory):
+ for filename in filenames:
+ if filename.endswith('.proto'):
+ paths.append(os.path.join(walk_root, filename))
+ command = [
+ self.protoc_command,
+ '--plugin=protoc-gen-python-grpc={}'.format(
+ self.grpc_python_plugin_command),
+ '-I {}'.format(root_directory),
+ '--python_out={}'.format(root_directory),
+ '--python-grpc_out={}'.format(root_directory),
+ ] + paths
+ subprocess.check_call(' '.join(command), cwd=root_directory, shell=True)
+
+
+class BuildPy(build_py.build_py):
+ """Custom project build command."""
+
+ def run(self):
+ self.run_command('build_proto_modules')
+ build_py.build_py.run(self)
diff --git a/src/python/grpcio_health_checking/grpc/__init__.py b/src/python/grpcio_health_checking/grpc/__init__.py
new file mode 100644
index 0000000000..7086519106
--- /dev/null
+++ b/src/python/grpcio_health_checking/grpc/__init__.py
@@ -0,0 +1,30 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
diff --git a/src/python/grpcio_health_checking/grpc/health/__init__.py b/src/python/grpcio_health_checking/grpc/health/__init__.py
new file mode 100644
index 0000000000..7086519106
--- /dev/null
+++ b/src/python/grpcio_health_checking/grpc/health/__init__.py
@@ -0,0 +1,30 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
diff --git a/src/python/grpcio_health_checking/grpc/health/v1alpha/__init__.py b/src/python/grpcio_health_checking/grpc/health/v1alpha/__init__.py
new file mode 100644
index 0000000000..7086519106
--- /dev/null
+++ b/src/python/grpcio_health_checking/grpc/health/v1alpha/__init__.py
@@ -0,0 +1,30 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
diff --git a/src/python/grpcio_health_checking/grpc/health/v1alpha/health.proto b/src/python/grpcio_health_checking/grpc/health/v1alpha/health.proto
new file mode 100644
index 0000000000..57f4aaa9c0
--- /dev/null
+++ b/src/python/grpcio_health_checking/grpc/health/v1alpha/health.proto
@@ -0,0 +1,49 @@
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+syntax = "proto3";
+
+package grpc.health.v1alpha;
+
+message HealthCheckRequest {
+ string service = 1;
+}
+
+message HealthCheckResponse {
+ enum ServingStatus {
+ UNKNOWN = 0;
+ SERVING = 1;
+ NOT_SERVING = 2;
+ }
+ ServingStatus status = 1;
+}
+
+service Health {
+ rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
+}
diff --git a/src/python/grpcio_health_checking/grpc/health/v1alpha/health.py b/src/python/grpcio_health_checking/grpc/health/v1alpha/health.py
new file mode 100644
index 0000000000..9dfcd962f0
--- /dev/null
+++ b/src/python/grpcio_health_checking/grpc/health/v1alpha/health.py
@@ -0,0 +1,129 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Reference implementation for health checking in gRPC Python."""
+
+import abc
+import enum
+import threading
+
+from grpc.health.v1alpha import health_pb2
+
+
+@enum.unique
+class HealthStatus(enum.Enum):
+ """Statuses for a service mirroring the reference health.proto's values."""
+ UNKNOWN = health_pb2.HealthCheckResponse.UNKNOWN
+ SERVING = health_pb2.HealthCheckResponse.SERVING
+ NOT_SERVING = health_pb2.HealthCheckResponse.NOT_SERVING
+
+
+class _HealthServicer(health_pb2.EarlyAdopterHealthServicer):
+ """Servicer handling RPCs for service statuses."""
+
+ def __init__(self):
+ self._server_status_lock = threading.Lock()
+ self._server_status = {}
+
+ def Check(self, request, context):
+ with self._server_status_lock:
+ if request.service not in self._server_status:
+ # TODO(atash): once the Python API has a way of setting the server
+ # status, bring us into conformance with the health check spec by
+ # returning the NOT_FOUND status here.
+ raise NotImplementedError()
+ else:
+ return health_pb2.HealthCheckResponse(
+ status=self._server_status[request.service].value)
+
+ def set(service, status):
+ if not isinstance(status, HealthStatus):
+ raise TypeError('expected grpc.health.v1alpha.health.HealthStatus '
+ 'for argument `status` but got {}'.format(status))
+ with self._server_status_lock:
+ self._server_status[service] = status
+
+
+class HealthServer(health_pb2.EarlyAdopterHealthServer):
+ """Interface for the reference gRPC Python health server."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def start(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def stop(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def set(self, service, status):
+ """Set the status of the given service.
+
+ Args:
+ service (str): service name of the service to set the reported status of
+ status (HealthStatus): status to set for the specified service
+ """
+ raise NotImplementedError()
+
+
+class _HealthServerImplementation(HealthServer):
+ """Implementation for the reference gRPC Python health server."""
+
+ def __init__(self, server, servicer):
+ self._server = server
+ self._servicer = servicer
+
+ def start(self):
+ self._server.start()
+
+ def stop(self):
+ self._server.stop()
+
+ def set(self, service, status):
+ self._servicer.set(service, status)
+
+
+def create_Health_server(port, private_key=None, certificate_chain=None):
+ """Get a HealthServer instance.
+
+ Args:
+ port (int): port number passed through to health_pb2 server creation
+ routine.
+ private_key (str): to-be-created server's desired private key
+ certificate_chain (str): to-be-created server's desired certificate chain
+
+ Returns:
+ An instance of HealthServer (conforming thus to
+ EarlyAdopterHealthServer and providing a method to set server status)."""
+ servicer = _HealthServicer()
+ server = health_pb2.early_adopter_create_Health_server(
+ servicer, port=port, private_key=private_key,
+ certificate_chain=certificate_chain)
+ return _HealthServerImplementation(server, servicer)
diff --git a/src/python/grpcio_health_checking/setup.py b/src/python/grpcio_health_checking/setup.py
new file mode 100644
index 0000000000..fcde0dab8c
--- /dev/null
+++ b/src/python/grpcio_health_checking/setup.py
@@ -0,0 +1,72 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Setup module for the GRPC Python package's optional health checking."""
+
+import os
+import os.path
+import sys
+
+from distutils import core as _core
+import setuptools
+
+# Ensure we're in the proper directory whether or not we're being used by pip.
+os.chdir(os.path.dirname(os.path.abspath(__file__)))
+
+# Break import-style to ensure we can actually find our commands module.
+import commands
+
+_PACKAGES = (
+ setuptools.find_packages('.')
+)
+
+_PACKAGE_DIRECTORIES = {
+ '': '.',
+}
+
+_INSTALL_REQUIRES = (
+ 'grpcio>=0.10.0a0',
+)
+
+_SETUP_REQUIRES = _INSTALL_REQUIRES
+
+_COMMAND_CLASS = {
+ 'build_proto_modules': commands.BuildProtoModules,
+ 'build_py': commands.BuildPy,
+}
+
+setuptools.setup(
+ name='grpcio_health_checking',
+ version='0.10.0a0',
+ packages=list(_PACKAGES),
+ package_dir=_PACKAGE_DIRECTORIES,
+ install_requires=_INSTALL_REQUIRES,
+ setup_requires=_SETUP_REQUIRES,
+ cmdclass=_COMMAND_CLASS
+)
diff --git a/src/python/grpcio_test/grpc_interop/_interop_test_case.py b/src/python/grpcio_test/grpc_interop/_interop_test_case.py
index ed8f7ef009..b6d06b300d 100644
--- a/src/python/grpcio_test/grpc_interop/_interop_test_case.py
+++ b/src/python/grpcio_test/grpc_interop/_interop_test_case.py
@@ -59,3 +59,6 @@ class InteropTestCase(object):
def testCancelAfterFirstResponse(self):
methods.TestCase.CANCEL_AFTER_FIRST_RESPONSE.test_interoperability(self.stub, None)
+
+ def testTimeoutOnSleepingServer(self):
+ methods.TestCase.TIMEOUT_ON_SLEEPING_SERVER.test_interoperability(self.stub, None)
diff --git a/src/python/grpcio_test/grpc_interop/methods.py b/src/python/grpcio_test/grpc_interop/methods.py
index f4c94685ee..7a831f3cbd 100644
--- a/src/python/grpcio_test/grpc_interop/methods.py
+++ b/src/python/grpcio_test/grpc_interop/methods.py
@@ -33,10 +33,12 @@ import enum
import json
import os
import threading
+import time
from oauth2client import client as oauth2client_client
from grpc.framework.alpha import utilities
+from grpc.framework.alpha import exceptions
from grpc_interop import empty_pb2
from grpc_interop import messages_pb2
@@ -318,6 +320,24 @@ def _cancel_after_first_response(stub):
raise ValueError('expected call to be cancelled')
+def _timeout_on_sleeping_server(stub):
+ request_payload_size = 27182
+ with stub, _Pipe() as pipe:
+ response_iterator = stub.FullDuplexCall(pipe, 0.001)
+
+ request = messages_pb2.StreamingOutputCallRequest(
+ response_type=messages_pb2.COMPRESSABLE,
+ payload=messages_pb2.Payload(body=b'\x00' * request_payload_size))
+ pipe.add(request)
+ time.sleep(0.1)
+ try:
+ next(response_iterator)
+ except exceptions.ExpirationError:
+ pass
+ else:
+ raise ValueError('expected call to exceed deadline')
+
+
def _compute_engine_creds(stub, args):
response = _large_unary_common_behavior(stub, True, True)
if args.default_service_account != response.username:
@@ -351,6 +371,7 @@ class TestCase(enum.Enum):
CANCEL_AFTER_FIRST_RESPONSE = 'cancel_after_first_response'
COMPUTE_ENGINE_CREDS = 'compute_engine_creds'
SERVICE_ACCOUNT_CREDS = 'service_account_creds'
+ TIMEOUT_ON_SLEEPING_SERVER = 'timeout_on_sleeping_server'
def test_interoperability(self, stub, args):
if self is TestCase.EMPTY_UNARY:
@@ -367,6 +388,8 @@ class TestCase(enum.Enum):
_cancel_after_begin(stub)
elif self is TestCase.CANCEL_AFTER_FIRST_RESPONSE:
_cancel_after_first_response(stub)
+ elif self is TestCase.TIMEOUT_ON_SLEEPING_SERVER:
+ _timeout_on_sleeping_server(stub)
elif self is TestCase.COMPUTE_ENGINE_CREDS:
_compute_engine_creds(stub, args)
elif self is TestCase.SERVICE_ACCOUNT_CREDS:
diff --git a/src/python/grpcio_test/grpc_protoc_plugin/__init__.py b/src/python/grpcio_test/grpc_protoc_plugin/__init__.py
new file mode 100644
index 0000000000..7086519106
--- /dev/null
+++ b/src/python/grpcio_test/grpc_protoc_plugin/__init__.py
@@ -0,0 +1,30 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
diff --git a/src/python/grpcio_test/grpc_protoc_plugin/python_plugin_test.py b/src/python/grpcio_test/grpc_protoc_plugin/python_plugin_test.py
new file mode 100644
index 0000000000..b200d129a9
--- /dev/null
+++ b/src/python/grpcio_test/grpc_protoc_plugin/python_plugin_test.py
@@ -0,0 +1,541 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import argparse
+import contextlib
+import distutils.spawn
+import errno
+import itertools
+import os
+import pkg_resources
+import shutil
+import subprocess
+import sys
+import tempfile
+import threading
+import time
+import unittest
+
+from grpc.framework.alpha import exceptions
+from grpc.framework.foundation import future
+
+# Identifiers of entities we expect to find in the generated module.
+SERVICER_IDENTIFIER = 'EarlyAdopterTestServiceServicer'
+SERVER_IDENTIFIER = 'EarlyAdopterTestServiceServer'
+STUB_IDENTIFIER = 'EarlyAdopterTestServiceStub'
+SERVER_FACTORY_IDENTIFIER = 'early_adopter_create_TestService_server'
+STUB_FACTORY_IDENTIFIER = 'early_adopter_create_TestService_stub'
+
+# The timeout used in tests of RPCs that are supposed to expire.
+SHORT_TIMEOUT = 2
+# The timeout used in tests of RPCs that are not supposed to expire. The
+# absurdly large value doesn't matter since no passing execution of this test
+# module will ever wait the duration.
+LONG_TIMEOUT = 600
+NO_DELAY = 0
+
+
+class _ServicerMethods(object):
+
+ def __init__(self, test_pb2, delay):
+ self._condition = threading.Condition()
+ self._delay = delay
+ self._paused = False
+ self._fail = False
+ self._test_pb2 = test_pb2
+
+ @contextlib.contextmanager
+ def pause(self): # pylint: disable=invalid-name
+ with self._condition:
+ self._paused = True
+ yield
+ with self._condition:
+ self._paused = False
+ self._condition.notify_all()
+
+ @contextlib.contextmanager
+ def fail(self): # pylint: disable=invalid-name
+ with self._condition:
+ self._fail = True
+ yield
+ with self._condition:
+ self._fail = False
+
+ def _control(self): # pylint: disable=invalid-name
+ with self._condition:
+ if self._fail:
+ raise ValueError()
+ while self._paused:
+ self._condition.wait()
+ time.sleep(self._delay)
+
+ def UnaryCall(self, request, unused_rpc_context):
+ response = self._test_pb2.SimpleResponse()
+ response.payload.payload_type = self._test_pb2.COMPRESSABLE
+ response.payload.payload_compressable = 'a' * request.response_size
+ self._control()
+ return response
+
+ def StreamingOutputCall(self, request, unused_rpc_context):
+ for parameter in request.response_parameters:
+ response = self._test_pb2.StreamingOutputCallResponse()
+ response.payload.payload_type = self._test_pb2.COMPRESSABLE
+ response.payload.payload_compressable = 'a' * parameter.size
+ self._control()
+ yield response
+
+ def StreamingInputCall(self, request_iter, unused_rpc_context):
+ response = self._test_pb2.StreamingInputCallResponse()
+ aggregated_payload_size = 0
+ for request in request_iter:
+ aggregated_payload_size += len(request.payload.payload_compressable)
+ response.aggregated_payload_size = aggregated_payload_size
+ self._control()
+ return response
+
+ def FullDuplexCall(self, request_iter, unused_rpc_context):
+ for request in request_iter:
+ for parameter in request.response_parameters:
+ response = self._test_pb2.StreamingOutputCallResponse()
+ response.payload.payload_type = self._test_pb2.COMPRESSABLE
+ response.payload.payload_compressable = 'a' * parameter.size
+ self._control()
+ yield response
+
+ def HalfDuplexCall(self, request_iter, unused_rpc_context):
+ responses = []
+ for request in request_iter:
+ for parameter in request.response_parameters:
+ response = self._test_pb2.StreamingOutputCallResponse()
+ response.payload.payload_type = self._test_pb2.COMPRESSABLE
+ response.payload.payload_compressable = 'a' * parameter.size
+ self._control()
+ responses.append(response)
+ for response in responses:
+ yield response
+
+
+@contextlib.contextmanager
+def _CreateService(test_pb2, delay):
+ """Provides a servicer backend and a stub.
+
+ The servicer is just the implementation
+ of the actual servicer passed to the face player of the python RPC
+ implementation; the two are detached.
+
+ Non-zero delay puts a delay on each call to the servicer, representative of
+ communication latency. Timeout is the default timeout for the stub while
+ waiting for the service.
+
+ Args:
+ test_pb2: The test_pb2 module generated by this test.
+ delay: Delay in seconds per response from the servicer.
+
+ Yields:
+ A (servicer_methods, servicer, stub) three-tuple where servicer_methods is
+ the back-end of the service bound to the stub and the server and stub
+ are both activated and ready for use.
+ """
+ servicer_methods = _ServicerMethods(test_pb2, delay)
+
+ class Servicer(getattr(test_pb2, SERVICER_IDENTIFIER)):
+
+ def UnaryCall(self, request, context):
+ return servicer_methods.UnaryCall(request, context)
+
+ def StreamingOutputCall(self, request, context):
+ return servicer_methods.StreamingOutputCall(request, context)
+
+ def StreamingInputCall(self, request_iter, context):
+ return servicer_methods.StreamingInputCall(request_iter, context)
+
+ def FullDuplexCall(self, request_iter, context):
+ return servicer_methods.FullDuplexCall(request_iter, context)
+
+ def HalfDuplexCall(self, request_iter, context):
+ return servicer_methods.HalfDuplexCall(request_iter, context)
+
+ servicer = Servicer()
+ server = getattr(
+ test_pb2, SERVER_FACTORY_IDENTIFIER)(servicer, 0)
+ with server:
+ port = server.port()
+ stub = getattr(test_pb2, STUB_FACTORY_IDENTIFIER)('localhost', port)
+ with stub:
+ yield servicer_methods, stub, server
+
+
+def _streaming_input_request_iterator(test_pb2):
+ for _ in range(3):
+ request = test_pb2.StreamingInputCallRequest()
+ request.payload.payload_type = test_pb2.COMPRESSABLE
+ request.payload.payload_compressable = 'a'
+ yield request
+
+
+def _streaming_output_request(test_pb2):
+ request = test_pb2.StreamingOutputCallRequest()
+ sizes = [1, 2, 3]
+ request.response_parameters.add(size=sizes[0], interval_us=0)
+ request.response_parameters.add(size=sizes[1], interval_us=0)
+ request.response_parameters.add(size=sizes[2], interval_us=0)
+ return request
+
+
+def _full_duplex_request_iterator(test_pb2):
+ request = test_pb2.StreamingOutputCallRequest()
+ request.response_parameters.add(size=1, interval_us=0)
+ yield request
+ request = test_pb2.StreamingOutputCallRequest()
+ request.response_parameters.add(size=2, interval_us=0)
+ request.response_parameters.add(size=3, interval_us=0)
+ yield request
+
+
+class PythonPluginTest(unittest.TestCase):
+ """Test case for the gRPC Python protoc-plugin.
+
+ While reading these tests, remember that the futures API
+ (`stub.method.async()`) only gives futures for the *non-streaming* responses,
+ else it behaves like its blocking cousin.
+ """
+
+ def setUp(self):
+ # Assume that the appropriate protoc and grpc_python_plugins are on the
+ # path.
+ protoc_command = 'protoc'
+ protoc_plugin_filename = distutils.spawn.find_executable(
+ 'grpc_python_plugin')
+ test_proto_filename = pkg_resources.resource_filename(
+ 'grpc_protoc_plugin', 'test.proto')
+ if not os.path.isfile(protoc_command):
+ # Assume that if we haven't built protoc that it's on the system.
+ protoc_command = 'protoc'
+
+ # Ensure that the output directory exists.
+ self.outdir = tempfile.mkdtemp()
+
+ # Invoke protoc with the plugin.
+ cmd = [
+ protoc_command,
+ '--plugin=protoc-gen-python-grpc=%s' % protoc_plugin_filename,
+ '-I .',
+ '--python_out=%s' % self.outdir,
+ '--python-grpc_out=%s' % self.outdir,
+ os.path.basename(test_proto_filename),
+ ]
+ subprocess.check_call(' '.join(cmd), shell=True, env=os.environ,
+ cwd=os.path.dirname(test_proto_filename))
+ sys.path.append(self.outdir)
+
+ def tearDown(self):
+ try:
+ shutil.rmtree(self.outdir)
+ except OSError as exc:
+ if exc.errno != errno.ENOENT:
+ raise
+
+ # TODO(atash): Figure out which of these tests is hanging flakily with small
+ # probability.
+
+ def testImportAttributes(self):
+ # check that we can access the generated module and its members.
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ self.assertIsNotNone(getattr(test_pb2, SERVICER_IDENTIFIER, None))
+ self.assertIsNotNone(getattr(test_pb2, SERVER_IDENTIFIER, None))
+ self.assertIsNotNone(getattr(test_pb2, STUB_IDENTIFIER, None))
+ self.assertIsNotNone(getattr(test_pb2, SERVER_FACTORY_IDENTIFIER, None))
+ self.assertIsNotNone(getattr(test_pb2, STUB_FACTORY_IDENTIFIER, None))
+
+ def testUpDown(self):
+ import test_pb2
+ with _CreateService(
+ test_pb2, NO_DELAY) as (servicer, stub, unused_server):
+ request = test_pb2.SimpleRequest(response_size=13)
+
+ def testUnaryCall(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):
+ timeout = 6 # TODO(issue 2039): LONG_TIMEOUT like the other methods.
+ request = test_pb2.SimpleRequest(response_size=13)
+ response = stub.UnaryCall(request, timeout)
+ expected_response = methods.UnaryCall(request, 'not a real RpcContext!')
+ self.assertEqual(expected_response, response)
+
+ def testUnaryCallAsync(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ request = test_pb2.SimpleRequest(response_size=13)
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ # Check that the call does not block waiting for the server to respond.
+ with methods.pause():
+ response_future = stub.UnaryCall.async(request, LONG_TIMEOUT)
+ response = response_future.result()
+ expected_response = methods.UnaryCall(request, 'not a real RpcContext!')
+ self.assertEqual(expected_response, response)
+
+ def testUnaryCallAsyncExpired(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ request = test_pb2.SimpleRequest(response_size=13)
+ with methods.pause():
+ response_future = stub.UnaryCall.async(request, SHORT_TIMEOUT)
+ with self.assertRaises(exceptions.ExpirationError):
+ response_future.result()
+
+ @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
+ 'forever and fix.')
+ def testUnaryCallAsyncCancelled(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ request = test_pb2.SimpleRequest(response_size=13)
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ with methods.pause():
+ response_future = stub.UnaryCall.async(request, 1)
+ response_future.cancel()
+ self.assertTrue(response_future.cancelled())
+
+ def testUnaryCallAsyncFailed(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ request = test_pb2.SimpleRequest(response_size=13)
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ with methods.fail():
+ response_future = stub.UnaryCall.async(request, LONG_TIMEOUT)
+ self.assertIsNotNone(response_future.exception())
+
+ def testStreamingOutputCall(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ request = _streaming_output_request(test_pb2)
+ with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):
+ responses = stub.StreamingOutputCall(request, LONG_TIMEOUT)
+ expected_responses = methods.StreamingOutputCall(
+ request, 'not a real RpcContext!')
+ for expected_response, response in itertools.izip_longest(
+ expected_responses, responses):
+ self.assertEqual(expected_response, response)
+
+ @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
+ 'forever and fix.')
+ def testStreamingOutputCallExpired(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ request = _streaming_output_request(test_pb2)
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ with methods.pause():
+ responses = stub.StreamingOutputCall(request, SHORT_TIMEOUT)
+ with self.assertRaises(exceptions.ExpirationError):
+ list(responses)
+
+ @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
+ 'forever and fix.')
+ def testStreamingOutputCallCancelled(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ request = _streaming_output_request(test_pb2)
+ with _CreateService(test_pb2, NO_DELAY) as (
+ unused_methods, stub, unused_server):
+ responses = stub.StreamingOutputCall(request, SHORT_TIMEOUT)
+ next(responses)
+ responses.cancel()
+ with self.assertRaises(future.CancelledError):
+ next(responses)
+
+ @unittest.skip('TODO(atash,nathaniel): figure out why this times out '
+ 'instead of raising the proper error.')
+ def testStreamingOutputCallFailed(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ request = _streaming_output_request(test_pb2)
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ with methods.fail():
+ responses = stub.StreamingOutputCall(request, 1)
+ self.assertIsNotNone(responses)
+ with self.assertRaises(exceptions.ServicerError):
+ next(responses)
+
+ @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
+ 'forever and fix.')
+ def testStreamingInputCall(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):
+ response = stub.StreamingInputCall(
+ _streaming_input_request_iterator(test_pb2), LONG_TIMEOUT)
+ expected_response = methods.StreamingInputCall(
+ _streaming_input_request_iterator(test_pb2), 'not a real RpcContext!')
+ self.assertEqual(expected_response, response)
+
+ def testStreamingInputCallAsync(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ with methods.pause():
+ response_future = stub.StreamingInputCall.async(
+ _streaming_input_request_iterator(test_pb2), LONG_TIMEOUT)
+ response = response_future.result()
+ expected_response = methods.StreamingInputCall(
+ _streaming_input_request_iterator(test_pb2), 'not a real RpcContext!')
+ self.assertEqual(expected_response, response)
+
+ def testStreamingInputCallAsyncExpired(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ with methods.pause():
+ response_future = stub.StreamingInputCall.async(
+ _streaming_input_request_iterator(test_pb2), SHORT_TIMEOUT)
+ with self.assertRaises(exceptions.ExpirationError):
+ response_future.result()
+ self.assertIsInstance(
+ response_future.exception(), exceptions.ExpirationError)
+
+ def testStreamingInputCallAsyncCancelled(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ with methods.pause():
+ timeout = 6 # TODO(issue 2039): LONG_TIMEOUT like the other methods.
+ response_future = stub.StreamingInputCall.async(
+ _streaming_input_request_iterator(test_pb2), timeout)
+ response_future.cancel()
+ self.assertTrue(response_future.cancelled())
+ with self.assertRaises(future.CancelledError):
+ response_future.result()
+
+ def testStreamingInputCallAsyncFailed(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ with methods.fail():
+ response_future = stub.StreamingInputCall.async(
+ _streaming_input_request_iterator(test_pb2), SHORT_TIMEOUT)
+ self.assertIsNotNone(response_future.exception())
+
+ def testFullDuplexCall(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):
+ responses = stub.FullDuplexCall(
+ _full_duplex_request_iterator(test_pb2), LONG_TIMEOUT)
+ expected_responses = methods.FullDuplexCall(
+ _full_duplex_request_iterator(test_pb2), 'not a real RpcContext!')
+ for expected_response, response in itertools.izip_longest(
+ expected_responses, responses):
+ self.assertEqual(expected_response, response)
+
+ @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
+ 'forever and fix.')
+ def testFullDuplexCallExpired(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ request_iterator = _full_duplex_request_iterator(test_pb2)
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ with methods.pause():
+ responses = stub.FullDuplexCall(request_iterator, SHORT_TIMEOUT)
+ with self.assertRaises(exceptions.ExpirationError):
+ list(responses)
+
+ @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
+ 'forever and fix.')
+ def testFullDuplexCallCancelled(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):
+ request_iterator = _full_duplex_request_iterator(test_pb2)
+ responses = stub.FullDuplexCall(request_iterator, LONG_TIMEOUT)
+ next(responses)
+ responses.cancel()
+ with self.assertRaises(future.CancelledError):
+ next(responses)
+
+ @unittest.skip('TODO(atash,nathaniel): figure out why this hangs forever '
+ 'and fix.')
+ def testFullDuplexCallFailed(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ request_iterator = _full_duplex_request_iterator(test_pb2)
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ with methods.fail():
+ responses = stub.FullDuplexCall(request_iterator, LONG_TIMEOUT)
+ self.assertIsNotNone(responses)
+ with self.assertRaises(exceptions.ServicerError):
+ next(responses)
+
+ @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
+ 'forever and fix.')
+ def testHalfDuplexCall(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ def half_duplex_request_iterator():
+ request = test_pb2.StreamingOutputCallRequest()
+ request.response_parameters.add(size=1, interval_us=0)
+ yield request
+ request = test_pb2.StreamingOutputCallRequest()
+ request.response_parameters.add(size=2, interval_us=0)
+ request.response_parameters.add(size=3, interval_us=0)
+ yield request
+ responses = stub.HalfDuplexCall(
+ half_duplex_request_iterator(), LONG_TIMEOUT)
+ expected_responses = methods.HalfDuplexCall(
+ half_duplex_request_iterator(), 'not a real RpcContext!')
+ for check in itertools.izip_longest(expected_responses, responses):
+ expected_response, response = check
+ self.assertEqual(expected_response, response)
+
+ def testHalfDuplexCallWedged(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ condition = threading.Condition()
+ wait_cell = [False]
+ @contextlib.contextmanager
+ def wait(): # pylint: disable=invalid-name
+ # Where's Python 3's 'nonlocal' statement when you need it?
+ with condition:
+ wait_cell[0] = True
+ yield
+ with condition:
+ wait_cell[0] = False
+ condition.notify_all()
+ def half_duplex_request_iterator():
+ request = test_pb2.StreamingOutputCallRequest()
+ request.response_parameters.add(size=1, interval_us=0)
+ yield request
+ with condition:
+ while wait_cell[0]:
+ condition.wait()
+ with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):
+ with wait():
+ responses = stub.HalfDuplexCall(
+ half_duplex_request_iterator(), SHORT_TIMEOUT)
+ # half-duplex waits for the client to send all info
+ with self.assertRaises(exceptions.ExpirationError):
+ next(responses)
+
+
+if __name__ == '__main__':
+ os.chdir(os.path.dirname(sys.argv[0]))
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_test/grpc_protoc_plugin/test.proto b/src/python/grpcio_test/grpc_protoc_plugin/test.proto
new file mode 100644
index 0000000000..ed7c6a7b79
--- /dev/null
+++ b/src/python/grpcio_test/grpc_protoc_plugin/test.proto
@@ -0,0 +1,139 @@
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+// An integration test service that covers all the method signature permutations
+// of unary/streaming requests/responses.
+// This file is duplicated around the code base. See GitHub issue #526.
+syntax = "proto2";
+
+package grpc.testing;
+
+enum PayloadType {
+ // Compressable text format.
+ COMPRESSABLE= 1;
+
+ // Uncompressable binary format.
+ UNCOMPRESSABLE = 2;
+
+ // Randomly chosen from all other formats defined in this enum.
+ RANDOM = 3;
+}
+
+message Payload {
+ required PayloadType payload_type = 1;
+ oneof payload_body {
+ string payload_compressable = 2;
+ bytes payload_uncompressable = 3;
+ }
+}
+
+message SimpleRequest {
+ // Desired payload type in the response from the server.
+ // If response_type is RANDOM, server randomly chooses one from other formats.
+ optional PayloadType response_type = 1 [default=COMPRESSABLE];
+
+ // Desired payload size in the response from the server.
+ // If response_type is COMPRESSABLE, this denotes the size before compression.
+ optional int32 response_size = 2;
+
+ // Optional input payload sent along with the request.
+ optional Payload payload = 3;
+}
+
+message SimpleResponse {
+ optional Payload payload = 1;
+}
+
+message StreamingInputCallRequest {
+ // Optional input payload sent along with the request.
+ optional Payload payload = 1;
+
+ // Not expecting any payload from the response.
+}
+
+message StreamingInputCallResponse {
+ // Aggregated size of payloads received from the client.
+ optional int32 aggregated_payload_size = 1;
+}
+
+message ResponseParameters {
+ // Desired payload sizes in responses from the server.
+ // If response_type is COMPRESSABLE, this denotes the size before compression.
+ required int32 size = 1;
+
+ // Desired interval between consecutive responses in the response stream in
+ // microseconds.
+ required int32 interval_us = 2;
+}
+
+message StreamingOutputCallRequest {
+ // Desired payload type in the response from the server.
+ // If response_type is RANDOM, the payload from each response in the stream
+ // might be of different types. This is to simulate a mixed type of payload
+ // stream.
+ optional PayloadType response_type = 1 [default=COMPRESSABLE];
+
+ repeated ResponseParameters response_parameters = 2;
+
+ // Optional input payload sent along with the request.
+ optional Payload payload = 3;
+}
+
+message StreamingOutputCallResponse {
+ optional Payload payload = 1;
+}
+
+service TestService {
+ // One request followed by one response.
+ // The server returns the client payload as-is.
+ rpc UnaryCall(SimpleRequest) returns (SimpleResponse);
+
+ // One request followed by a sequence of responses (streamed download).
+ // The server returns the payload with client desired type and sizes.
+ rpc StreamingOutputCall(StreamingOutputCallRequest)
+ returns (stream StreamingOutputCallResponse);
+
+ // A sequence of requests followed by one response (streamed upload).
+ // The server returns the aggregated size of client payload as the result.
+ rpc StreamingInputCall(stream StreamingInputCallRequest)
+ returns (StreamingInputCallResponse);
+
+ // A sequence of requests with each request served by the server immediately.
+ // As one request could lead to multiple responses, this interface
+ // demonstrates the idea of full duplexing.
+ rpc FullDuplexCall(stream StreamingOutputCallRequest)
+ returns (stream StreamingOutputCallResponse);
+
+ // A sequence of requests followed by a sequence of responses.
+ // The server buffers all the client requests and then serves them in order. A
+ // stream of responses are returned to the client when the server starts with
+ // first request.
+ rpc HalfDuplexCall(stream StreamingOutputCallRequest)
+ returns (stream StreamingOutputCallResponse);
+}
diff --git a/src/python/grpcio_test/grpc_test/_adapter/_low_test.py b/src/python/grpcio_test/grpc_test/_adapter/_low_test.py
index 9a8edfad0c..44fe760fbc 100644
--- a/src/python/grpcio_test/grpc_test/_adapter/_low_test.py
+++ b/src/python/grpcio_test/grpc_test/_adapter/_low_test.py
@@ -31,11 +31,12 @@ import threading
import time
import unittest
+from grpc import _grpcio_metadata
from grpc._adapter import _types
from grpc._adapter import _low
-def WaitForEvents(completion_queues, deadline):
+def wait_for_events(completion_queues, deadline):
"""
Args:
completion_queues: list of completion queues to wait for events on
@@ -62,6 +63,7 @@ def WaitForEvents(completion_queues, deadline):
thread.join()
return results
+
class InsecureServerInsecureClient(unittest.TestCase):
def setUp(self):
@@ -115,7 +117,7 @@ class InsecureServerInsecureClient(unittest.TestCase):
client_initial_metadata = [(CLIENT_METADATA_ASCII_KEY, CLIENT_METADATA_ASCII_VALUE), (CLIENT_METADATA_BIN_KEY, CLIENT_METADATA_BIN_VALUE)]
client_start_batch_result = client_call.start_batch([
_types.OpArgs.send_initial_metadata(client_initial_metadata),
- _types.OpArgs.send_message(REQUEST),
+ _types.OpArgs.send_message(REQUEST, 0),
_types.OpArgs.send_close_from_client(),
_types.OpArgs.recv_initial_metadata(),
_types.OpArgs.recv_message(),
@@ -123,20 +125,34 @@ class InsecureServerInsecureClient(unittest.TestCase):
], client_call_tag)
self.assertEquals(_types.CallError.OK, client_start_batch_result)
- client_no_event, request_event, = WaitForEvents([self.client_completion_queue, self.server_completion_queue], time.time() + 2)
+ client_no_event, request_event, = wait_for_events([self.client_completion_queue, self.server_completion_queue], time.time() + 2)
self.assertEquals(client_no_event, None)
self.assertEquals(_types.EventType.OP_COMPLETE, request_event.type)
self.assertIsInstance(request_event.call, _low.Call)
self.assertIs(server_request_tag, request_event.tag)
self.assertEquals(1, len(request_event.results))
- got_initial_metadata = dict(request_event.results[0].initial_metadata)
+ received_initial_metadata = dict(request_event.results[0].initial_metadata)
+ # Check that our metadata were transmitted
self.assertEquals(
dict(client_initial_metadata),
- dict((x, got_initial_metadata[x]) for x in zip(*client_initial_metadata)[0]))
+ dict((x, received_initial_metadata[x]) for x in zip(*client_initial_metadata)[0]))
+ # Check that Python's user agent string is a part of the full user agent
+ # string
+ self.assertIn('Python-gRPC-{}'.format(_grpcio_metadata.__version__),
+ received_initial_metadata['user-agent'])
self.assertEquals(METHOD, request_event.call_details.method)
self.assertEquals(HOST, request_event.call_details.host)
self.assertLess(abs(DEADLINE - request_event.call_details.deadline), DEADLINE_TOLERANCE)
+ # Check that the channel is connected, and that both it and the call have
+ # the proper target and peer; do this after the first flurry of messages to
+ # avoid the possibility that connection was delayed by the core until the
+ # first message was sent.
+ self.assertEqual(_types.ConnectivityState.READY,
+ self.client_channel.check_connectivity_state(False))
+ self.assertIsNotNone(self.client_channel.target())
+ self.assertIsNotNone(client_call.peer())
+
server_call_tag = object()
server_call = request_event.call
server_initial_metadata = [(SERVER_INITIAL_METADATA_KEY, SERVER_INITIAL_METADATA_VALUE)]
@@ -144,13 +160,13 @@ class InsecureServerInsecureClient(unittest.TestCase):
server_start_batch_result = server_call.start_batch([
_types.OpArgs.send_initial_metadata(server_initial_metadata),
_types.OpArgs.recv_message(),
- _types.OpArgs.send_message(RESPONSE),
+ _types.OpArgs.send_message(RESPONSE, 0),
_types.OpArgs.recv_close_on_server(),
_types.OpArgs.send_status_from_server(server_trailing_metadata, SERVER_STATUS_CODE, SERVER_STATUS_DETAILS)
], server_call_tag)
self.assertEquals(_types.CallError.OK, server_start_batch_result)
- client_event, server_event, = WaitForEvents([self.client_completion_queue, self.server_completion_queue], time.time() + 1)
+ client_event, server_event, = wait_for_events([self.client_completion_queue, self.server_completion_queue], time.time() + 1)
self.assertEquals(6, len(client_event.results))
found_client_op_types = set()
diff --git a/src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py b/src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py
new file mode 100644
index 0000000000..72b1ae5642
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py
@@ -0,0 +1,165 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Tests the RPC Framework Core's implementation of the Base interface."""
+
+import collections
+import logging
+import random
+import time
+import unittest
+
+from grpc._adapter import _intermediary_low
+from grpc._links import invocation
+from grpc._links import service
+from grpc.framework.core import implementations
+from grpc.framework.interfaces.base import utilities
+from grpc_test import test_common as grpc_test_common
+from grpc_test.framework.common import test_constants
+from grpc_test.framework.interfaces.base import test_cases
+from grpc_test.framework.interfaces.base import test_interfaces
+
+_INVOCATION_INITIAL_METADATA = ((b'0', b'abc'), (b'1', b'def'), (b'2', b'ghi'),)
+_SERVICE_INITIAL_METADATA = ((b'3', b'jkl'), (b'4', b'mno'), (b'5', b'pqr'),)
+_SERVICE_TERMINAL_METADATA = ((b'6', b'stu'), (b'7', b'vwx'), (b'8', b'yza'),)
+_CODE = _intermediary_low.Code.OK
+_MESSAGE = b'test message'
+
+
+class _SerializationBehaviors(
+ collections.namedtuple(
+ '_SerializationBehaviors',
+ ('request_serializers', 'request_deserializers', 'response_serializers',
+ 'response_deserializers',))):
+ pass
+
+
+class _Links(
+ collections.namedtuple(
+ '_Links',
+ ('invocation_end_link', 'invocation_grpc_link', 'service_grpc_link',
+ 'service_end_link'))):
+ pass
+
+
+def _serialization_behaviors_from_serializations(serializations):
+ request_serializers = {}
+ request_deserializers = {}
+ response_serializers = {}
+ response_deserializers = {}
+ for (group, method), serialization in serializations.iteritems():
+ request_serializers[group, method] = serialization.serialize_request
+ request_deserializers[group, method] = serialization.deserialize_request
+ response_serializers[group, method] = serialization.serialize_response
+ response_deserializers[group, method] = serialization.deserialize_response
+ return _SerializationBehaviors(
+ request_serializers, request_deserializers, response_serializers,
+ response_deserializers)
+
+
+class _Implementation(test_interfaces.Implementation):
+
+ def instantiate(self, serializations, servicer):
+ serialization_behaviors = _serialization_behaviors_from_serializations(
+ serializations)
+ invocation_end_link = implementations.invocation_end_link()
+ service_end_link = implementations.service_end_link(
+ servicer, test_constants.DEFAULT_TIMEOUT,
+ test_constants.MAXIMUM_TIMEOUT)
+ service_grpc_link = service.service_link(
+ serialization_behaviors.request_deserializers,
+ serialization_behaviors.response_serializers)
+ port = service_grpc_link.add_port(0, None)
+ channel = _intermediary_low.Channel('localhost:%d' % port, None)
+ invocation_grpc_link = invocation.invocation_link(
+ channel, b'localhost',
+ serialization_behaviors.request_serializers,
+ serialization_behaviors.response_deserializers)
+
+ invocation_end_link.join_link(invocation_grpc_link)
+ invocation_grpc_link.join_link(invocation_end_link)
+ service_end_link.join_link(service_grpc_link)
+ service_grpc_link.join_link(service_end_link)
+ invocation_grpc_link.start()
+ service_grpc_link.start()
+ return invocation_end_link, service_end_link, (
+ invocation_grpc_link, service_grpc_link)
+
+ def destantiate(self, memo):
+ invocation_grpc_link, service_grpc_link = memo
+ invocation_grpc_link.stop()
+ service_grpc_link.stop_gracefully()
+
+ def invocation_initial_metadata(self):
+ return _INVOCATION_INITIAL_METADATA
+
+ def service_initial_metadata(self):
+ return _SERVICE_INITIAL_METADATA
+
+ def invocation_completion(self):
+ return utilities.completion(None, None, None)
+
+ def service_completion(self):
+ return utilities.completion(_SERVICE_TERMINAL_METADATA, _CODE, _MESSAGE)
+
+ def metadata_transmitted(self, original_metadata, transmitted_metadata):
+ return original_metadata is None or grpc_test_common.metadata_transmitted(
+ original_metadata, transmitted_metadata)
+
+ def completion_transmitted(self, original_completion, transmitted_completion):
+ if (original_completion.terminal_metadata is not None and
+ not grpc_test_common.metadata_transmitted(
+ original_completion.terminal_metadata,
+ transmitted_completion.terminal_metadata)):
+ return False
+ elif original_completion.code is not transmitted_completion.code:
+ return False
+ elif original_completion.message != transmitted_completion.message:
+ return False
+ else:
+ return True
+
+
+def setUpModule():
+ logging.warn('setUpModule!')
+
+
+def tearDownModule():
+ logging.warn('tearDownModule!')
+
+
+def load_tests(loader, tests, pattern):
+ return unittest.TestSuite(
+ tests=tuple(
+ loader.loadTestsFromTestCase(test_case_class)
+ for test_case_class in test_cases.test_cases(_Implementation())))
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_test/grpc_test/framework/common/test_constants.py b/src/python/grpcio_test/grpc_test/framework/common/test_constants.py
index 3126d0d82c..e1d3c2709d 100644
--- a/src/python/grpcio_test/grpc_test/framework/common/test_constants.py
+++ b/src/python/grpcio_test/grpc_test/framework/common/test_constants.py
@@ -29,15 +29,25 @@
"""Constants shared among tests throughout RPC Framework."""
+# Value for maximum duration in seconds that a test is allowed for its actual
+# behavioral logic, excluding all time spent deliberately waiting in the test.
+TIME_ALLOWANCE = 10
# Value for maximum duration in seconds of RPCs that may time out as part of a
# test.
SHORT_TIMEOUT = 4
# Absurdly large value for maximum duration in seconds for should-not-time-out
# RPCs made during tests.
LONG_TIMEOUT = 3000
+# Values to supply on construction of an object that will service RPCs; these
+# should not be used as the actual timeout values of any RPCs made during tests.
+DEFAULT_TIMEOUT = 300
+MAXIMUM_TIMEOUT = 3600
# The number of payloads to transmit in streaming tests.
STREAM_LENGTH = 200
+# The size of payloads to transmit in tests.
+PAYLOAD_SIZE = 256 * 1024 + 17
+
# The size of thread pools to use in tests.
POOL_SIZE = 10
diff --git a/src/python/grpcio_test/grpc_test/framework/core/__init__.py b/src/python/grpcio_test/grpc_test/framework/core/__init__.py
new file mode 100644
index 0000000000..7086519106
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/framework/core/__init__.py
@@ -0,0 +1,30 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
diff --git a/src/python/grpcio_test/grpc_test/framework/core/_base_interface_test.py b/src/python/grpcio_test/grpc_test/framework/core/_base_interface_test.py
new file mode 100644
index 0000000000..8d72f131d5
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/framework/core/_base_interface_test.py
@@ -0,0 +1,96 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Tests the RPC Framework Core's implementation of the Base interface."""
+
+import logging
+import random
+import time
+import unittest
+
+from grpc.framework.core import implementations
+from grpc.framework.interfaces.base import utilities
+from grpc_test.framework.common import test_constants
+from grpc_test.framework.interfaces.base import test_cases
+from grpc_test.framework.interfaces.base import test_interfaces
+
+
+class _Implementation(test_interfaces.Implementation):
+
+ def __init__(self):
+ self._invocation_initial_metadata = object()
+ self._service_initial_metadata = object()
+ self._invocation_terminal_metadata = object()
+ self._service_terminal_metadata = object()
+
+ def instantiate(self, serializations, servicer):
+ invocation = implementations.invocation_end_link()
+ service = implementations.service_end_link(
+ servicer, test_constants.DEFAULT_TIMEOUT,
+ test_constants.MAXIMUM_TIMEOUT)
+ invocation.join_link(service)
+ service.join_link(invocation)
+ return invocation, service, None
+
+ def destantiate(self, memo):
+ pass
+
+ def invocation_initial_metadata(self):
+ return self._invocation_initial_metadata
+
+ def service_initial_metadata(self):
+ return self._service_initial_metadata
+
+ def invocation_completion(self):
+ return utilities.completion(self._invocation_terminal_metadata, None, None)
+
+ def service_completion(self):
+ return utilities.completion(self._service_terminal_metadata, None, None)
+
+ def metadata_transmitted(self, original_metadata, transmitted_metadata):
+ return transmitted_metadata is original_metadata
+
+ def completion_transmitted(self, original_completion, transmitted_completion):
+ return (
+ (original_completion.terminal_metadata is
+ transmitted_completion.terminal_metadata) and
+ original_completion.code is transmitted_completion.code and
+ original_completion.message is transmitted_completion.message
+ )
+
+
+def load_tests(loader, tests, pattern):
+ return unittest.TestSuite(
+ tests=tuple(
+ loader.loadTestsFromTestCase(test_case_class)
+ for test_case_class in test_cases.test_cases(_Implementation())))
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_test/grpc_test/framework/face/testing/blocking_invocation_inline_service_test_case.py b/src/python/grpcio_test/grpc_test/framework/face/testing/blocking_invocation_inline_service_test_case.py
index 7e1158f96b..251e1eb68e 100644
--- a/src/python/grpcio_test/grpc_test/framework/face/testing/blocking_invocation_inline_service_test_case.py
+++ b/src/python/grpcio_test/grpc_test/framework/face/testing/blocking_invocation_inline_service_test_case.py
@@ -34,15 +34,13 @@ import abc
import unittest # pylint: disable=unused-import
from grpc.framework.face import exceptions
+from grpc_test.framework.common import test_constants
from grpc_test.framework.face.testing import control
from grpc_test.framework.face.testing import coverage
from grpc_test.framework.face.testing import digest
from grpc_test.framework.face.testing import stock_service
from grpc_test.framework.face.testing import test_case
-_TIMEOUT = 3
-_LONG_TIMEOUT = 45
-
class BlockingInvocationInlineServiceTestCase(
test_case.FaceTestCase, coverage.BlockingCoverage):
@@ -79,7 +77,7 @@ class BlockingInvocationInlineServiceTestCase(
request = test_messages.request()
response = self.stub.blocking_value_in_value_out(
- name, request, _LONG_TIMEOUT)
+ name, request, test_constants.LONG_TIMEOUT)
test_messages.verify(request, response, self)
@@ -90,7 +88,7 @@ class BlockingInvocationInlineServiceTestCase(
request = test_messages.request()
response_iterator = self.stub.inline_value_in_stream_out(
- name, request, _LONG_TIMEOUT)
+ name, request, test_constants.LONG_TIMEOUT)
responses = list(response_iterator)
test_messages.verify(request, responses, self)
@@ -102,7 +100,7 @@ class BlockingInvocationInlineServiceTestCase(
requests = test_messages.requests()
response = self.stub.blocking_stream_in_value_out(
- name, iter(requests), _LONG_TIMEOUT)
+ name, iter(requests), test_constants.LONG_TIMEOUT)
test_messages.verify(requests, response, self)
@@ -113,7 +111,7 @@ class BlockingInvocationInlineServiceTestCase(
requests = test_messages.requests()
response_iterator = self.stub.inline_stream_in_stream_out(
- name, iter(requests), _LONG_TIMEOUT)
+ name, iter(requests), test_constants.LONG_TIMEOUT)
responses = list(response_iterator)
test_messages.verify(requests, responses, self)
@@ -126,12 +124,12 @@ class BlockingInvocationInlineServiceTestCase(
second_request = test_messages.request()
first_response = self.stub.blocking_value_in_value_out(
- name, first_request, _TIMEOUT)
+ name, first_request, test_constants.SHORT_TIMEOUT)
test_messages.verify(first_request, first_response, self)
second_response = self.stub.blocking_value_in_value_out(
- name, second_request, _TIMEOUT)
+ name, second_request, test_constants.SHORT_TIMEOUT)
test_messages.verify(second_request, second_response, self)
@@ -144,7 +142,7 @@ class BlockingInvocationInlineServiceTestCase(
with self.control.pause(), self.assertRaises(
exceptions.ExpirationError):
multi_callable = self.stub.unary_unary_multi_callable(name)
- multi_callable(request, _TIMEOUT)
+ multi_callable(request, test_constants.SHORT_TIMEOUT)
def testExpiredUnaryRequestStreamResponse(self):
for name, test_messages_sequence in (
@@ -155,7 +153,7 @@ class BlockingInvocationInlineServiceTestCase(
with self.control.pause(), self.assertRaises(
exceptions.ExpirationError):
response_iterator = self.stub.inline_value_in_stream_out(
- name, request, _TIMEOUT)
+ name, request, test_constants.SHORT_TIMEOUT)
list(response_iterator)
def testExpiredStreamRequestUnaryResponse(self):
@@ -167,7 +165,7 @@ class BlockingInvocationInlineServiceTestCase(
with self.control.pause(), self.assertRaises(
exceptions.ExpirationError):
multi_callable = self.stub.stream_unary_multi_callable(name)
- multi_callable(iter(requests), _TIMEOUT)
+ multi_callable(iter(requests), test_constants.SHORT_TIMEOUT)
def testExpiredStreamRequestStreamResponse(self):
for name, test_messages_sequence in (
@@ -178,7 +176,7 @@ class BlockingInvocationInlineServiceTestCase(
with self.control.pause(), self.assertRaises(
exceptions.ExpirationError):
response_iterator = self.stub.inline_stream_in_stream_out(
- name, iter(requests), _TIMEOUT)
+ name, iter(requests), test_constants.SHORT_TIMEOUT)
list(response_iterator)
def testFailedUnaryRequestUnaryResponse(self):
@@ -188,7 +186,8 @@ class BlockingInvocationInlineServiceTestCase(
request = test_messages.request()
with self.control.fail(), self.assertRaises(exceptions.ServicerError):
- self.stub.blocking_value_in_value_out(name, request, _TIMEOUT)
+ self.stub.blocking_value_in_value_out(name, request,
+ test_constants.SHORT_TIMEOUT)
def testFailedUnaryRequestStreamResponse(self):
for name, test_messages_sequence in (
@@ -198,7 +197,7 @@ class BlockingInvocationInlineServiceTestCase(
with self.control.fail(), self.assertRaises(exceptions.ServicerError):
response_iterator = self.stub.inline_value_in_stream_out(
- name, request, _TIMEOUT)
+ name, request, test_constants.SHORT_TIMEOUT)
list(response_iterator)
def testFailedStreamRequestUnaryResponse(self):
@@ -208,7 +207,8 @@ class BlockingInvocationInlineServiceTestCase(
requests = test_messages.requests()
with self.control.fail(), self.assertRaises(exceptions.ServicerError):
- self.stub.blocking_stream_in_value_out(name, iter(requests), _TIMEOUT)
+ self.stub.blocking_stream_in_value_out(name, iter(requests),
+ test_constants.SHORT_TIMEOUT)
def testFailedStreamRequestStreamResponse(self):
for name, test_messages_sequence in (
@@ -218,5 +218,5 @@ class BlockingInvocationInlineServiceTestCase(
with self.control.fail(), self.assertRaises(exceptions.ServicerError):
response_iterator = self.stub.inline_stream_in_stream_out(
- name, iter(requests), _TIMEOUT)
+ name, iter(requests), test_constants.SHORT_TIMEOUT)
list(response_iterator)
diff --git a/src/python/grpcio_test/grpc_test/framework/face/testing/event_invocation_synchronous_event_service_test_case.py b/src/python/grpcio_test/grpc_test/framework/face/testing/event_invocation_synchronous_event_service_test_case.py
index 18eed53d6e..9df77678eb 100644
--- a/src/python/grpcio_test/grpc_test/framework/face/testing/event_invocation_synchronous_event_service_test_case.py
+++ b/src/python/grpcio_test/grpc_test/framework/face/testing/event_invocation_synchronous_event_service_test_case.py
@@ -33,6 +33,7 @@ import abc
import unittest
from grpc.framework.face import interfaces
+from grpc_test.framework.common import test_constants
from grpc_test.framework.face.testing import callback as testing_callback
from grpc_test.framework.face.testing import control
from grpc_test.framework.face.testing import coverage
@@ -40,8 +41,6 @@ from grpc_test.framework.face.testing import digest
from grpc_test.framework.face.testing import stock_service
from grpc_test.framework.face.testing import test_case
-_TIMEOUT = 3
-
class EventInvocationSynchronousEventServiceTestCase(
test_case.FaceTestCase, coverage.FullCoverage):
@@ -79,7 +78,8 @@ class EventInvocationSynchronousEventServiceTestCase(
callback = testing_callback.Callback()
self.stub.event_value_in_value_out(
- name, request, callback.complete, callback.abort, _TIMEOUT)
+ name, request, callback.complete, callback.abort,
+ test_constants.SHORT_TIMEOUT)
callback.block_until_terminated()
response = callback.response()
@@ -93,7 +93,8 @@ class EventInvocationSynchronousEventServiceTestCase(
callback = testing_callback.Callback()
self.stub.event_value_in_stream_out(
- name, request, callback, callback.abort, _TIMEOUT)
+ name, request, callback, callback.abort,
+ test_constants.SHORT_TIMEOUT)
callback.block_until_terminated()
responses = callback.responses()
@@ -107,7 +108,8 @@ class EventInvocationSynchronousEventServiceTestCase(
callback = testing_callback.Callback()
unused_call, request_consumer = self.stub.event_stream_in_value_out(
- name, callback.complete, callback.abort, _TIMEOUT)
+ name, callback.complete, callback.abort,
+ test_constants.SHORT_TIMEOUT)
for request in requests:
request_consumer.consume(request)
request_consumer.terminate()
@@ -124,7 +126,7 @@ class EventInvocationSynchronousEventServiceTestCase(
callback = testing_callback.Callback()
unused_call, request_consumer = self.stub.event_stream_in_stream_out(
- name, callback, callback.abort, _TIMEOUT)
+ name, callback, callback.abort, test_constants.SHORT_TIMEOUT)
for request in requests:
request_consumer.consume(request)
request_consumer.terminate()
@@ -147,11 +149,11 @@ class EventInvocationSynchronousEventServiceTestCase(
first_callback.complete(first_response)
self.stub.event_value_in_value_out(
name, second_request, second_callback.complete,
- second_callback.abort, _TIMEOUT)
+ second_callback.abort, test_constants.SHORT_TIMEOUT)
self.stub.event_value_in_value_out(
name, first_request, make_second_invocation, first_callback.abort,
- _TIMEOUT)
+ test_constants.SHORT_TIMEOUT)
second_callback.block_until_terminated()
first_response = first_callback.response()
@@ -168,7 +170,8 @@ class EventInvocationSynchronousEventServiceTestCase(
with self.control.pause():
self.stub.event_value_in_value_out(
- name, request, callback.complete, callback.abort, _TIMEOUT)
+ name, request, callback.complete, callback.abort,
+ test_constants.SHORT_TIMEOUT)
callback.block_until_terminated()
self.assertEqual(interfaces.Abortion.EXPIRED, callback.abortion())
@@ -182,7 +185,8 @@ class EventInvocationSynchronousEventServiceTestCase(
with self.control.pause():
self.stub.event_value_in_stream_out(
- name, request, callback, callback.abort, _TIMEOUT)
+ name, request, callback, callback.abort,
+ test_constants.SHORT_TIMEOUT)
callback.block_until_terminated()
self.assertEqual(interfaces.Abortion.EXPIRED, callback.abortion())
@@ -194,7 +198,8 @@ class EventInvocationSynchronousEventServiceTestCase(
callback = testing_callback.Callback()
self.stub.event_stream_in_value_out(
- name, callback.complete, callback.abort, _TIMEOUT)
+ name, callback.complete, callback.abort,
+ test_constants.SHORT_TIMEOUT)
callback.block_until_terminated()
self.assertEqual(interfaces.Abortion.EXPIRED, callback.abortion())
@@ -207,7 +212,7 @@ class EventInvocationSynchronousEventServiceTestCase(
callback = testing_callback.Callback()
unused_call, request_consumer = self.stub.event_stream_in_stream_out(
- name, callback, callback.abort, _TIMEOUT)
+ name, callback, callback.abort, test_constants.SHORT_TIMEOUT)
for request in requests:
request_consumer.consume(request)
callback.block_until_terminated()
@@ -223,10 +228,12 @@ class EventInvocationSynchronousEventServiceTestCase(
with self.control.fail():
self.stub.event_value_in_value_out(
- name, request, callback.complete, callback.abort, _TIMEOUT)
+ name, request, callback.complete, callback.abort,
+ test_constants.SHORT_TIMEOUT)
callback.block_until_terminated()
- self.assertEqual(interfaces.Abortion.SERVICER_FAILURE, callback.abortion())
+ self.assertEqual(interfaces.Abortion.SERVICER_FAILURE,
+ callback.abortion())
def testFailedUnaryRequestStreamResponse(self):
for name, test_messages_sequence in (
@@ -237,10 +244,12 @@ class EventInvocationSynchronousEventServiceTestCase(
with self.control.fail():
self.stub.event_value_in_stream_out(
- name, request, callback, callback.abort, _TIMEOUT)
+ name, request, callback, callback.abort,
+ test_constants.SHORT_TIMEOUT)
callback.block_until_terminated()
- self.assertEqual(interfaces.Abortion.SERVICER_FAILURE, callback.abortion())
+ self.assertEqual(interfaces.Abortion.SERVICER_FAILURE,
+ callback.abortion())
def testFailedStreamRequestUnaryResponse(self):
for name, test_messages_sequence in (
@@ -251,13 +260,15 @@ class EventInvocationSynchronousEventServiceTestCase(
with self.control.fail():
unused_call, request_consumer = self.stub.event_stream_in_value_out(
- name, callback.complete, callback.abort, _TIMEOUT)
+ name, callback.complete, callback.abort,
+ test_constants.SHORT_TIMEOUT)
for request in requests:
request_consumer.consume(request)
request_consumer.terminate()
callback.block_until_terminated()
- self.assertEqual(interfaces.Abortion.SERVICER_FAILURE, callback.abortion())
+ self.assertEqual(interfaces.Abortion.SERVICER_FAILURE,
+ callback.abortion())
def testFailedStreamRequestStreamResponse(self):
for name, test_messages_sequence in (
@@ -268,7 +279,7 @@ class EventInvocationSynchronousEventServiceTestCase(
with self.control.fail():
unused_call, request_consumer = self.stub.event_stream_in_stream_out(
- name, callback, callback.abort, _TIMEOUT)
+ name, callback, callback.abort, test_constants.SHORT_TIMEOUT)
for request in requests:
request_consumer.consume(request)
request_consumer.terminate()
@@ -287,10 +298,10 @@ class EventInvocationSynchronousEventServiceTestCase(
self.stub.event_value_in_value_out(
name, first_request, first_callback.complete, first_callback.abort,
- _TIMEOUT)
+ test_constants.SHORT_TIMEOUT)
self.stub.event_value_in_value_out(
name, second_request, second_callback.complete,
- second_callback.abort, _TIMEOUT)
+ second_callback.abort, test_constants.SHORT_TIMEOUT)
first_callback.block_until_terminated()
second_callback.block_until_terminated()
@@ -312,7 +323,8 @@ class EventInvocationSynchronousEventServiceTestCase(
with self.control.pause():
call = self.stub.event_value_in_value_out(
- name, request, callback.complete, callback.abort, _TIMEOUT)
+ name, request, callback.complete, callback.abort,
+ test_constants.SHORT_TIMEOUT)
call.cancel()
callback.block_until_terminated()
@@ -326,7 +338,8 @@ class EventInvocationSynchronousEventServiceTestCase(
callback = testing_callback.Callback()
call = self.stub.event_value_in_stream_out(
- name, request, callback, callback.abort, _TIMEOUT)
+ name, request, callback, callback.abort,
+ test_constants.SHORT_TIMEOUT)
call.cancel()
callback.block_until_terminated()
@@ -340,7 +353,8 @@ class EventInvocationSynchronousEventServiceTestCase(
callback = testing_callback.Callback()
call, request_consumer = self.stub.event_stream_in_value_out(
- name, callback.complete, callback.abort, _TIMEOUT)
+ name, callback.complete, callback.abort,
+ test_constants.SHORT_TIMEOUT)
for request in requests:
request_consumer.consume(request)
call.cancel()
@@ -355,7 +369,7 @@ class EventInvocationSynchronousEventServiceTestCase(
callback = testing_callback.Callback()
call, unused_request_consumer = self.stub.event_stream_in_stream_out(
- name, callback, callback.abort, _TIMEOUT)
+ name, callback, callback.abort, test_constants.SHORT_TIMEOUT)
call.cancel()
callback.block_until_terminated()
diff --git a/src/python/grpcio_test/grpc_test/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py b/src/python/grpcio_test/grpc_test/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py
index 3b42914342..70d86a0422 100644
--- a/src/python/grpcio_test/grpc_test/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py
+++ b/src/python/grpcio_test/grpc_test/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py
@@ -37,13 +37,13 @@ import unittest
from grpc.framework.face import exceptions
from grpc.framework.foundation import future
from grpc.framework.foundation import logging_pool
+from grpc_test.framework.common import test_constants
from grpc_test.framework.face.testing import control
from grpc_test.framework.face.testing import coverage
from grpc_test.framework.face.testing import digest
from grpc_test.framework.face.testing import stock_service
from grpc_test.framework.face.testing import test_case
-_TIMEOUT = 3
_MAXIMUM_POOL_SIZE = 10
@@ -110,7 +110,7 @@ class FutureInvocationAsynchronousEventServiceTestCase(
request = test_messages.request()
response_future = self.stub.future_value_in_value_out(
- name, request, _TIMEOUT)
+ name, request, test_constants.SHORT_TIMEOUT)
response = response_future.result()
test_messages.verify(request, response, self)
@@ -122,7 +122,7 @@ class FutureInvocationAsynchronousEventServiceTestCase(
request = test_messages.request()
response_iterator = self.stub.inline_value_in_stream_out(
- name, request, _TIMEOUT)
+ name, request, test_constants.SHORT_TIMEOUT)
responses = list(response_iterator)
test_messages.verify(request, responses, self)
@@ -138,7 +138,7 @@ class FutureInvocationAsynchronousEventServiceTestCase(
# returned to calling code before the iterator yields any requests.
with request_iterator.pause():
response_future = self.stub.future_stream_in_value_out(
- name, request_iterator, _TIMEOUT)
+ name, request_iterator, test_constants.SHORT_TIMEOUT)
response = response_future.result()
test_messages.verify(requests, response, self)
@@ -154,7 +154,7 @@ class FutureInvocationAsynchronousEventServiceTestCase(
# returned to calling code before the iterator yields any requests.
with request_iterator.pause():
response_iterator = self.stub.inline_stream_in_stream_out(
- name, request_iterator, _TIMEOUT)
+ name, request_iterator, test_constants.SHORT_TIMEOUT)
responses = list(response_iterator)
test_messages.verify(requests, responses, self)
@@ -167,13 +167,13 @@ class FutureInvocationAsynchronousEventServiceTestCase(
second_request = test_messages.request()
first_response_future = self.stub.future_value_in_value_out(
- name, first_request, _TIMEOUT)
+ name, first_request, test_constants.SHORT_TIMEOUT)
first_response = first_response_future.result()
test_messages.verify(first_request, first_response, self)
second_response_future = self.stub.future_value_in_value_out(
- name, second_request, _TIMEOUT)
+ name, second_request, test_constants.SHORT_TIMEOUT)
second_response = second_response_future.result()
test_messages.verify(second_request, second_response, self)
@@ -186,7 +186,8 @@ class FutureInvocationAsynchronousEventServiceTestCase(
with self.control.pause():
multi_callable = self.stub.unary_unary_multi_callable(name)
- response_future = multi_callable.future(request, _TIMEOUT)
+ response_future = multi_callable.future(request,
+ test_constants.SHORT_TIMEOUT)
self.assertIsInstance(
response_future.exception(), exceptions.ExpirationError)
with self.assertRaises(exceptions.ExpirationError):
@@ -200,7 +201,7 @@ class FutureInvocationAsynchronousEventServiceTestCase(
with self.control.pause():
response_iterator = self.stub.inline_value_in_stream_out(
- name, request, _TIMEOUT)
+ name, request, test_constants.SHORT_TIMEOUT)
with self.assertRaises(exceptions.ExpirationError):
list(response_iterator)
@@ -212,7 +213,8 @@ class FutureInvocationAsynchronousEventServiceTestCase(
with self.control.pause():
multi_callable = self.stub.stream_unary_multi_callable(name)
- response_future = multi_callable.future(iter(requests), _TIMEOUT)
+ response_future = multi_callable.future(iter(requests),
+ test_constants.SHORT_TIMEOUT)
self.assertIsInstance(
response_future.exception(), exceptions.ExpirationError)
with self.assertRaises(exceptions.ExpirationError):
@@ -226,7 +228,7 @@ class FutureInvocationAsynchronousEventServiceTestCase(
with self.control.pause():
response_iterator = self.stub.inline_stream_in_stream_out(
- name, iter(requests), _TIMEOUT)
+ name, iter(requests), test_constants.SHORT_TIMEOUT)
with self.assertRaises(exceptions.ExpirationError):
list(response_iterator)
@@ -238,7 +240,7 @@ class FutureInvocationAsynchronousEventServiceTestCase(
with self.control.fail():
response_future = self.stub.future_value_in_value_out(
- name, request, _TIMEOUT)
+ name, request, test_constants.SHORT_TIMEOUT)
# Because the servicer fails outside of the thread from which the
# servicer-side runtime called into it its failure is
@@ -261,7 +263,7 @@ class FutureInvocationAsynchronousEventServiceTestCase(
# expiration of the RPC.
with self.control.fail(), self.assertRaises(exceptions.ExpirationError):
response_iterator = self.stub.inline_value_in_stream_out(
- name, request, _TIMEOUT)
+ name, request, test_constants.SHORT_TIMEOUT)
list(response_iterator)
def testFailedStreamRequestUnaryResponse(self):
@@ -272,7 +274,7 @@ class FutureInvocationAsynchronousEventServiceTestCase(
with self.control.fail():
response_future = self.stub.future_stream_in_value_out(
- name, iter(requests), _TIMEOUT)
+ name, iter(requests), test_constants.SHORT_TIMEOUT)
# Because the servicer fails outside of the thread from which the
# servicer-side runtime called into it its failure is
@@ -295,7 +297,7 @@ class FutureInvocationAsynchronousEventServiceTestCase(
# expiration of the RPC.
with self.control.fail(), self.assertRaises(exceptions.ExpirationError):
response_iterator = self.stub.inline_stream_in_stream_out(
- name, iter(requests), _TIMEOUT)
+ name, iter(requests), test_constants.SHORT_TIMEOUT)
list(response_iterator)
def testParallelInvocations(self):
@@ -305,10 +307,11 @@ class FutureInvocationAsynchronousEventServiceTestCase(
first_request = test_messages.request()
second_request = test_messages.request()
+ # TODO(bug 2039): use LONG_TIMEOUT instead
first_response_future = self.stub.future_value_in_value_out(
- name, first_request, _TIMEOUT)
+ name, first_request, test_constants.SHORT_TIMEOUT)
second_response_future = self.stub.future_value_in_value_out(
- name, second_request, _TIMEOUT)
+ name, second_request, test_constants.SHORT_TIMEOUT)
first_response = first_response_future.result()
second_response = second_response_future.result()
@@ -327,7 +330,7 @@ class FutureInvocationAsynchronousEventServiceTestCase(
with self.control.pause():
response_future = self.stub.future_value_in_value_out(
- name, request, _TIMEOUT)
+ name, request, test_constants.SHORT_TIMEOUT)
cancel_method_return_value = response_future.cancel()
self.assertFalse(cancel_method_return_value)
@@ -341,7 +344,7 @@ class FutureInvocationAsynchronousEventServiceTestCase(
with self.control.pause():
response_iterator = self.stub.inline_value_in_stream_out(
- name, request, _TIMEOUT)
+ name, request, test_constants.SHORT_TIMEOUT)
response_iterator.cancel()
with self.assertRaises(future.CancelledError):
@@ -355,7 +358,7 @@ class FutureInvocationAsynchronousEventServiceTestCase(
with self.control.pause():
response_future = self.stub.future_stream_in_value_out(
- name, iter(requests), _TIMEOUT)
+ name, iter(requests), test_constants.SHORT_TIMEOUT)
cancel_method_return_value = response_future.cancel()
self.assertFalse(cancel_method_return_value)
@@ -369,7 +372,7 @@ class FutureInvocationAsynchronousEventServiceTestCase(
with self.control.pause():
response_iterator = self.stub.inline_stream_in_stream_out(
- name, iter(requests), _TIMEOUT)
+ name, iter(requests), test_constants.SHORT_TIMEOUT)
response_iterator.cancel()
with self.assertRaises(future.CancelledError):
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/base/__init__.py b/src/python/grpcio_test/grpc_test/framework/interfaces/base/__init__.py
new file mode 100644
index 0000000000..7086519106
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/base/__init__.py
@@ -0,0 +1,30 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/base/_control.py b/src/python/grpcio_test/grpc_test/framework/interfaces/base/_control.py
new file mode 100644
index 0000000000..e4d2a7a0d7
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/base/_control.py
@@ -0,0 +1,568 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Part of the tests of the base interface of RPC Framework."""
+
+import abc
+import collections
+import enum
+import random # pylint: disable=unused-import
+import threading
+import time
+
+from grpc.framework.interfaces.base import base
+from grpc_test.framework.common import test_constants
+from grpc_test.framework.interfaces.base import _sequence
+from grpc_test.framework.interfaces.base import _state
+from grpc_test.framework.interfaces.base import test_interfaces # pylint: disable=unused-import
+
+_GROUP = 'base test cases test group'
+_METHOD = 'base test cases test method'
+
+_PAYLOAD_RANDOM_SECTION_MAXIMUM_SIZE = test_constants.PAYLOAD_SIZE / 20
+_MINIMUM_PAYLOAD_SIZE = test_constants.PAYLOAD_SIZE / 600
+
+
+def _create_payload(randomness):
+ length = randomness.randint(
+ _MINIMUM_PAYLOAD_SIZE, test_constants.PAYLOAD_SIZE)
+ random_section_length = randomness.randint(
+ 0, min(_PAYLOAD_RANDOM_SECTION_MAXIMUM_SIZE, length))
+ random_section = bytes(
+ bytearray(
+ randomness.getrandbits(8) for _ in range(random_section_length)))
+ sevens_section = '\x07' * (length - random_section_length)
+ return b''.join(randomness.sample((random_section, sevens_section), 2))
+
+
+def _anything_in_flight(state):
+ return (
+ state.invocation_initial_metadata_in_flight is not None or
+ state.invocation_payloads_in_flight or
+ state.invocation_completion_in_flight is not None or
+ state.service_initial_metadata_in_flight is not None or
+ state.service_payloads_in_flight or
+ state.service_completion_in_flight is not None or
+ 0 < state.invocation_allowance_in_flight or
+ 0 < state.service_allowance_in_flight
+ )
+
+
+def _verify_service_advance_and_update_state(
+ initial_metadata, payload, completion, allowance, state, implementation):
+ if initial_metadata is not None:
+ if state.invocation_initial_metadata_received:
+ return 'Later invocation initial metadata received: %s' % (
+ initial_metadata,)
+ if state.invocation_payloads_received:
+ return 'Invocation initial metadata received after payloads: %s' % (
+ state.invocation_payloads_received)
+ if state.invocation_completion_received:
+ return 'Invocation initial metadata received after invocation completion!'
+ if not implementation.metadata_transmitted(
+ state.invocation_initial_metadata_in_flight, initial_metadata):
+ return 'Invocation initial metadata maltransmitted: %s, %s' % (
+ state.invocation_initial_metadata_in_flight, initial_metadata)
+ else:
+ state.invocation_initial_metadata_in_flight = None
+ state.invocation_initial_metadata_received = True
+
+ if payload is not None:
+ if state.invocation_completion_received:
+ return 'Invocation payload received after invocation completion!'
+ elif not state.invocation_payloads_in_flight:
+ return 'Invocation payload "%s" received but not in flight!' % (payload,)
+ elif state.invocation_payloads_in_flight[0] != payload:
+ return 'Invocation payload mismatch: %s, %s' % (
+ state.invocation_payloads_in_flight[0], payload)
+ elif state.service_side_invocation_allowance < 1:
+ return 'Disallowed invocation payload!'
+ else:
+ state.invocation_payloads_in_flight.pop(0)
+ state.invocation_payloads_received += 1
+ state.service_side_invocation_allowance -= 1
+
+ if completion is not None:
+ if state.invocation_completion_received:
+ return 'Later invocation completion received: %s' % (completion,)
+ elif not implementation.completion_transmitted(
+ state.invocation_completion_in_flight, completion):
+ return 'Invocation completion maltransmitted: %s, %s' % (
+ state.invocation_completion_in_flight, completion)
+ else:
+ state.invocation_completion_in_flight = None
+ state.invocation_completion_received = True
+
+ if allowance is not None:
+ if allowance <= 0:
+ return 'Illegal allowance value: %s' % (allowance,)
+ else:
+ state.service_allowance_in_flight -= allowance
+ state.service_side_service_allowance += allowance
+
+
+def _verify_invocation_advance_and_update_state(
+ initial_metadata, payload, completion, allowance, state, implementation):
+ if initial_metadata is not None:
+ if state.service_initial_metadata_received:
+ return 'Later service initial metadata received: %s' % (initial_metadata,)
+ if state.service_payloads_received:
+ return 'Service initial metadata received after service payloads: %s' % (
+ state.service_payloads_received)
+ if state.service_completion_received:
+ return 'Service initial metadata received after service completion!'
+ if not implementation.metadata_transmitted(
+ state.service_initial_metadata_in_flight, initial_metadata):
+ return 'Service initial metadata maltransmitted: %s, %s' % (
+ state.service_initial_metadata_in_flight, initial_metadata)
+ else:
+ state.service_initial_metadata_in_flight = None
+ state.service_initial_metadata_received = True
+
+ if payload is not None:
+ if state.service_completion_received:
+ return 'Service payload received after service completion!'
+ elif not state.service_payloads_in_flight:
+ return 'Service payload "%s" received but not in flight!' % (payload,)
+ elif state.service_payloads_in_flight[0] != payload:
+ return 'Service payload mismatch: %s, %s' % (
+ state.invocation_payloads_in_flight[0], payload)
+ elif state.invocation_side_service_allowance < 1:
+ return 'Disallowed service payload!'
+ else:
+ state.service_payloads_in_flight.pop(0)
+ state.service_payloads_received += 1
+ state.invocation_side_service_allowance -= 1
+
+ if completion is not None:
+ if state.service_completion_received:
+ return 'Later service completion received: %s' % (completion,)
+ elif not implementation.completion_transmitted(
+ state.service_completion_in_flight, completion):
+ return 'Service completion maltransmitted: %s, %s' % (
+ state.service_completion_in_flight, completion)
+ else:
+ state.service_completion_in_flight = None
+ state.service_completion_received = True
+
+ if allowance is not None:
+ if allowance <= 0:
+ return 'Illegal allowance value: %s' % (allowance,)
+ else:
+ state.invocation_allowance_in_flight -= allowance
+ state.invocation_side_service_allowance += allowance
+
+
+class Invocation(
+ collections.namedtuple(
+ 'Invocation',
+ ('group', 'method', 'subscription_kind', 'timeout', 'initial_metadata',
+ 'payload', 'completion',))):
+ """A description of operation invocation.
+
+ Attributes:
+ group: The group identifier for the operation.
+ method: The method identifier for the operation.
+ subscription_kind: A base.Subscription.Kind value describing the kind of
+ subscription to use for the operation.
+ timeout: A duration in seconds to pass as the timeout value for the
+ operation.
+ initial_metadata: An object to pass as the initial metadata for the
+ operation or None.
+ payload: An object to pass as a payload value for the operation or None.
+ completion: An object to pass as a completion value for the operation or
+ None.
+ """
+
+
+class OnAdvance(
+ collections.namedtuple(
+ 'OnAdvance',
+ ('kind', 'initial_metadata', 'payload', 'completion', 'allowance'))):
+ """Describes action to be taken in a test in response to an advance call.
+
+ Attributes:
+ kind: A Kind value describing the overall kind of response.
+ initial_metadata: An initial metadata value to pass to a call of the advance
+ method of the operator under test. Only valid if kind is Kind.ADVANCE and
+ may be None.
+ payload: A payload value to pass to a call of the advance method of the
+ operator under test. Only valid if kind is Kind.ADVANCE and may be None.
+ completion: A base.Completion value to pass to a call of the advance method
+ of the operator under test. Only valid if kind is Kind.ADVANCE and may be
+ None.
+ allowance: An allowance value to pass to a call of the advance method of the
+ operator under test. Only valid if kind is Kind.ADVANCE and may be None.
+ """
+
+ @enum.unique
+ class Kind(enum.Enum):
+ ADVANCE = 'advance'
+ DEFECT = 'defect'
+ IDLE = 'idle'
+
+
+_DEFECT_ON_ADVANCE = OnAdvance(OnAdvance.Kind.DEFECT, None, None, None, None)
+_IDLE_ON_ADVANCE = OnAdvance(OnAdvance.Kind.IDLE, None, None, None, None)
+
+
+class Instruction(
+ collections.namedtuple(
+ 'Instruction',
+ ('kind', 'advance_args', 'advance_kwargs', 'conclude_success',
+ 'conclude_message', 'conclude_invocation_outcome',
+ 'conclude_service_outcome',))):
+ """"""
+
+ @enum.unique
+ class Kind(enum.Enum):
+ ADVANCE = 'ADVANCE'
+ CANCEL = 'CANCEL'
+ CONCLUDE = 'CONCLUDE'
+
+
+class Controller(object):
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def failed(self, message):
+ """"""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def serialize_request(self, request):
+ """"""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def deserialize_request(self, serialized_request):
+ """"""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def serialize_response(self, response):
+ """"""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def deserialize_response(self, serialized_response):
+ """"""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def invocation(self):
+ """"""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def poll(self):
+ """"""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def on_service_advance(
+ self, initial_metadata, payload, completion, allowance):
+ """"""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def on_invocation_advance(
+ self, initial_metadata, payload, completion, allowance):
+ """"""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def service_on_termination(self, outcome):
+ """"""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def invocation_on_termination(self, outcome):
+ """"""
+ raise NotImplementedError()
+
+
+class ControllerCreator(object):
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def name(self):
+ """"""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def controller(self, implementation, randomness):
+ """"""
+ raise NotImplementedError()
+
+
+class _Remainder(
+ collections.namedtuple(
+ '_Remainder',
+ ('invocation_payloads', 'service_payloads', 'invocation_completion',
+ 'service_completion',))):
+ """Describes work remaining to be done in a portion of a test.
+
+ Attributes:
+ invocation_payloads: The number of payloads to be sent from the invocation
+ side of the operation to the service side of the operation.
+ service_payloads: The number of payloads to be sent from the service side of
+ the operation to the invocation side of the operation.
+ invocation_completion: Whether or not completion from the invocation side of
+ the operation should be indicated and has yet to be indicated.
+ service_completion: Whether or not completion from the service side of the
+ operation should be indicated and has yet to be indicated.
+ """
+
+
+class _SequenceController(Controller):
+
+ def __init__(self, sequence, implementation, randomness):
+ """Constructor.
+
+ Args:
+ sequence: A _sequence.Sequence describing the steps to be taken in the
+ test at a relatively high level.
+ implementation: A test_interfaces.Implementation encapsulating the
+ base interface implementation that is the system under test.
+ randomness: A random.Random instance for use in the test.
+ """
+ self._condition = threading.Condition()
+ self._sequence = sequence
+ self._implementation = implementation
+ self._randomness = randomness
+
+ self._until = None
+ self._remaining_elements = None
+ self._poll_next = None
+ self._message = None
+
+ self._state = _state.OperationState()
+ self._todo = None
+
+ # called with self._condition
+ def _failed(self, message):
+ self._message = message
+ self._condition.notify_all()
+
+ def _passed(self, invocation_outcome, service_outcome):
+ self._poll_next = Instruction(
+ Instruction.Kind.CONCLUDE, None, None, True, None, invocation_outcome,
+ service_outcome)
+ self._condition.notify_all()
+
+ def failed(self, message):
+ with self._condition:
+ self._failed(message)
+
+ def serialize_request(self, request):
+ return request + request
+
+ def deserialize_request(self, serialized_request):
+ return serialized_request[:len(serialized_request) / 2]
+
+ def serialize_response(self, response):
+ return response * 3
+
+ def deserialize_response(self, serialized_response):
+ return serialized_response[2 * len(serialized_response) / 3:]
+
+ def invocation(self):
+ with self._condition:
+ self._until = time.time() + self._sequence.maximum_duration
+ self._remaining_elements = list(self._sequence.elements)
+ if self._sequence.invocation.initial_metadata:
+ initial_metadata = self._implementation.invocation_initial_metadata()
+ self._state.invocation_initial_metadata_in_flight = initial_metadata
+ else:
+ initial_metadata = None
+ if self._sequence.invocation.payload:
+ payload = _create_payload(self._randomness)
+ self._state.invocation_payloads_in_flight.append(payload)
+ else:
+ payload = None
+ if self._sequence.invocation.complete:
+ completion = self._implementation.invocation_completion()
+ self._state.invocation_completion_in_flight = completion
+ else:
+ completion = None
+ return Invocation(
+ _GROUP, _METHOD, base.Subscription.Kind.FULL,
+ self._sequence.invocation.timeout, initial_metadata, payload,
+ completion)
+
+ def poll(self):
+ with self._condition:
+ while True:
+ if self._message is not None:
+ return Instruction(
+ Instruction.Kind.CONCLUDE, None, None, False, self._message, None,
+ None)
+ elif self._poll_next:
+ poll_next = self._poll_next
+ self._poll_next = None
+ return poll_next
+ elif self._until < time.time():
+ return Instruction(
+ Instruction.Kind.CONCLUDE, None, None, False,
+ 'overran allotted time!', None, None)
+ else:
+ self._condition.wait(timeout=self._until-time.time())
+
+ def on_service_advance(
+ self, initial_metadata, payload, completion, allowance):
+ with self._condition:
+ message = _verify_service_advance_and_update_state(
+ initial_metadata, payload, completion, allowance, self._state,
+ self._implementation)
+ if message is not None:
+ self._failed(message)
+ if self._todo is not None:
+ raise ValueError('TODO!!!')
+ elif _anything_in_flight(self._state):
+ return _IDLE_ON_ADVANCE
+ elif self._remaining_elements:
+ element = self._remaining_elements.pop(0)
+ if element.kind is _sequence.Element.Kind.SERVICE_TRANSMISSION:
+ if element.transmission.initial_metadata:
+ initial_metadata = self._implementation.service_initial_metadata()
+ self._state.service_initial_metadata_in_flight = initial_metadata
+ else:
+ initial_metadata = None
+ if element.transmission.payload:
+ payload = _create_payload(self._randomness)
+ self._state.service_payloads_in_flight.append(payload)
+ self._state.service_side_service_allowance -= 1
+ else:
+ payload = None
+ if element.transmission.complete:
+ completion = self._implementation.service_completion()
+ self._state.service_completion_in_flight = completion
+ else:
+ completion = None
+ if (not self._state.invocation_completion_received and
+ 0 <= self._state.service_side_invocation_allowance):
+ allowance = 1
+ self._state.service_side_invocation_allowance += 1
+ self._state.invocation_allowance_in_flight += 1
+ else:
+ allowance = None
+ return OnAdvance(
+ OnAdvance.Kind.ADVANCE, initial_metadata, payload, completion,
+ allowance)
+ else:
+ raise ValueError('TODO!!!')
+ else:
+ return _IDLE_ON_ADVANCE
+
+ def on_invocation_advance(
+ self, initial_metadata, payload, completion, allowance):
+ with self._condition:
+ message = _verify_invocation_advance_and_update_state(
+ initial_metadata, payload, completion, allowance, self._state,
+ self._implementation)
+ if message is not None:
+ self._failed(message)
+ if self._todo is not None:
+ raise ValueError('TODO!!!')
+ elif _anything_in_flight(self._state):
+ return _IDLE_ON_ADVANCE
+ elif self._remaining_elements:
+ element = self._remaining_elements.pop(0)
+ if element.kind is _sequence.Element.Kind.INVOCATION_TRANSMISSION:
+ if element.transmission.initial_metadata:
+ initial_metadata = self._implementation.invocation_initial_metadata()
+ self._state.invocation_initial_metadata_in_fight = initial_metadata
+ else:
+ initial_metadata = None
+ if element.transmission.payload:
+ payload = _create_payload(self._randomness)
+ self._state.invocation_payloads_in_flight.append(payload)
+ self._state.invocation_side_invocation_allowance -= 1
+ else:
+ payload = None
+ if element.transmission.complete:
+ completion = self._implementation.invocation_completion()
+ self._state.invocation_completion_in_flight = completion
+ else:
+ completion = None
+ if (not self._state.service_completion_received and
+ 0 <= self._state.invocation_side_service_allowance):
+ allowance = 1
+ self._state.invocation_side_service_allowance += 1
+ self._state.service_allowance_in_flight += 1
+ else:
+ allowance = None
+ return OnAdvance(
+ OnAdvance.Kind.ADVANCE, initial_metadata, payload, completion,
+ allowance)
+ else:
+ raise ValueError('TODO!!!')
+ else:
+ return _IDLE_ON_ADVANCE
+
+ def service_on_termination(self, outcome):
+ with self._condition:
+ self._state.service_side_outcome = outcome
+ if self._todo is not None or self._remaining_elements:
+ self._failed('Premature service-side outcome %s!' % (outcome,))
+ elif outcome is not self._sequence.outcome.service:
+ self._failed(
+ 'Incorrect service-side outcome: %s should have been %s' % (
+ outcome, self._sequence.outcome.service))
+ elif self._state.invocation_side_outcome is not None:
+ self._passed(self._state.invocation_side_outcome, outcome)
+
+ def invocation_on_termination(self, outcome):
+ with self._condition:
+ self._state.invocation_side_outcome = outcome
+ if self._todo is not None or self._remaining_elements:
+ self._failed('Premature invocation-side outcome %s!' % (outcome,))
+ elif outcome is not self._sequence.outcome.invocation:
+ self._failed(
+ 'Incorrect invocation-side outcome: %s should have been %s' % (
+ outcome, self._sequence.outcome.invocation))
+ elif self._state.service_side_outcome is not None:
+ self._passed(outcome, self._state.service_side_outcome)
+
+
+class _SequenceControllerCreator(ControllerCreator):
+
+ def __init__(self, sequence):
+ self._sequence = sequence
+
+ def name(self):
+ return self._sequence.name
+
+ def controller(self, implementation, randomness):
+ return _SequenceController(self._sequence, implementation, randomness)
+
+
+CONTROLLER_CREATORS = tuple(
+ _SequenceControllerCreator(sequence) for sequence in _sequence.SEQUENCES)
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/base/_sequence.py b/src/python/grpcio_test/grpc_test/framework/interfaces/base/_sequence.py
new file mode 100644
index 0000000000..1d77aaebe6
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/base/_sequence.py
@@ -0,0 +1,168 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Part of the tests of the base interface of RPC Framework."""
+
+import collections
+import enum
+
+from grpc.framework.interfaces.base import base
+from grpc_test.framework.common import test_constants
+
+
+class Invocation(
+ collections.namedtuple(
+ 'Invocation', ('timeout', 'initial_metadata', 'payload', 'complete',))):
+ """A recipe for operation invocation.
+
+ Attributes:
+ timeout: A duration in seconds to pass to the system under test as the
+ operation's timeout value.
+ initial_metadata: A boolean indicating whether or not to pass initial
+ metadata when invoking the operation.
+ payload: A boolean indicating whether or not to pass a payload when
+ invoking the operation.
+ complete: A boolean indicating whether or not to indicate completion of
+ transmissions from the invoking side of the operation when invoking the
+ operation.
+ """
+
+
+class Transmission(
+ collections.namedtuple(
+ 'Transmission', ('initial_metadata', 'payload', 'complete',))):
+ """A recipe for a single transmission in an operation.
+
+ Attributes:
+ initial_metadata: A boolean indicating whether or not to pass initial
+ metadata as part of the transmission.
+ payload: A boolean indicating whether or not to pass a payload as part of
+ the transmission.
+ complete: A boolean indicating whether or not to indicate completion of
+ transmission from the transmitting side of the operation as part of the
+ transmission.
+ """
+
+
+class Intertransmission(
+ collections.namedtuple('Intertransmission', ('invocation', 'service',))):
+ """A recipe for multiple transmissions in an operation.
+
+ Attributes:
+ invocation: An integer describing the number of payloads to send from the
+ invocation side of the operation to the service side.
+ service: An integer describing the number of payloads to send from the
+ service side of the operation to the invocation side.
+ """
+
+
+class Element(collections.namedtuple('Element', ('kind', 'transmission',))):
+ """A sum type for steps to perform when testing an operation.
+
+ Attributes:
+ kind: A Kind value describing the kind of step to perform in the test.
+ transmission: Only valid for kinds Kind.INVOCATION_TRANSMISSION and
+ Kind.SERVICE_TRANSMISSION, a Transmission value describing the details of
+ the transmission to be made.
+ """
+
+ @enum.unique
+ class Kind(enum.Enum):
+ INVOCATION_TRANSMISSION = 'invocation transmission'
+ SERVICE_TRANSMISSION = 'service transmission'
+ INTERTRANSMISSION = 'intertransmission'
+ INVOCATION_CANCEL = 'invocation cancel'
+ SERVICE_CANCEL = 'service cancel'
+ INVOCATION_FAILURE = 'invocation failure'
+ SERVICE_FAILURE = 'service failure'
+
+
+class Outcome(collections.namedtuple('Outcome', ('invocation', 'service',))):
+ """A description of the expected outcome of an operation test.
+
+ Attributes:
+ invocation: The base.Outcome value expected on the invocation side of the
+ operation.
+ service: The base.Outcome value expected on the service side of the
+ operation.
+ """
+
+
+class Sequence(
+ collections.namedtuple(
+ 'Sequence',
+ ('name', 'maximum_duration', 'invocation', 'elements', 'outcome',))):
+ """Describes at a high level steps to perform in a test.
+
+ Attributes:
+ name: The string name of the sequence.
+ maximum_duration: A length of time in seconds to allow for the test before
+ declaring it to have failed.
+ invocation: An Invocation value describing how to invoke the operation
+ under test.
+ elements: A sequence of Element values describing at coarse granularity
+ actions to take during the operation under test.
+ outcome: An Outcome value describing the expected outcome of the test.
+ """
+
+_EASY = Sequence(
+ 'Easy',
+ test_constants.TIME_ALLOWANCE,
+ Invocation(test_constants.LONG_TIMEOUT, True, True, True),
+ (
+ Element(
+ Element.Kind.SERVICE_TRANSMISSION, Transmission(True, True, True)),
+ ),
+ Outcome(base.Outcome.COMPLETED, base.Outcome.COMPLETED))
+
+_PEASY = Sequence(
+ 'Peasy',
+ test_constants.TIME_ALLOWANCE,
+ Invocation(test_constants.LONG_TIMEOUT, True, True, False),
+ (
+ Element(
+ Element.Kind.SERVICE_TRANSMISSION, Transmission(True, True, False)),
+ Element(
+ Element.Kind.INVOCATION_TRANSMISSION,
+ Transmission(False, True, True)),
+ Element(
+ Element.Kind.SERVICE_TRANSMISSION, Transmission(False, True, True)),
+ ),
+ Outcome(base.Outcome.COMPLETED, base.Outcome.COMPLETED))
+
+
+# TODO(issue 2959): Finish this test suite. This tuple of sequences should
+# contain at least the values in the Cartesian product of (half-duplex,
+# full-duplex) * (zero payloads, one payload, test_constants.STREAM_LENGTH
+# payloads) * (completion, cancellation, expiration, programming defect in
+# servicer code).
+SEQUENCES = (
+ _EASY,
+ _PEASY,
+)
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/base/_state.py b/src/python/grpcio_test/grpc_test/framework/interfaces/base/_state.py
new file mode 100644
index 0000000000..21cf33aeb6
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/base/_state.py
@@ -0,0 +1,55 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Part of the tests of the base interface of RPC Framework."""
+
+
+class OperationState(object):
+
+ def __init__(self):
+ self.invocation_initial_metadata_in_flight = None
+ self.invocation_initial_metadata_received = False
+ self.invocation_payloads_in_flight = []
+ self.invocation_payloads_received = 0
+ self.invocation_completion_in_flight = None
+ self.invocation_completion_received = False
+ self.service_initial_metadata_in_flight = None
+ self.service_initial_metadata_received = False
+ self.service_payloads_in_flight = []
+ self.service_payloads_received = 0
+ self.service_completion_in_flight = None
+ self.service_completion_received = False
+ self.invocation_side_invocation_allowance = 1
+ self.invocation_side_service_allowance = 1
+ self.service_side_invocation_allowance = 1
+ self.service_side_service_allowance = 1
+ self.invocation_allowance_in_flight = 0
+ self.service_allowance_in_flight = 0
+ self.invocation_side_outcome = None
+ self.service_side_outcome = None
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py b/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py
new file mode 100644
index 0000000000..5c8b176da4
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py
@@ -0,0 +1,262 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Tests of the base interface of RPC Framework."""
+
+import logging
+import random
+import threading
+import time
+import unittest
+
+from grpc.framework.foundation import logging_pool
+from grpc.framework.interfaces.base import base
+from grpc.framework.interfaces.base import utilities
+from grpc_test.framework.common import test_constants
+from grpc_test.framework.interfaces.base import _control
+from grpc_test.framework.interfaces.base import test_interfaces
+
+_SYNCHRONICITY_VARIATION = (('Sync', False), ('Async', True))
+
+_EMPTY_OUTCOME_DICT = {outcome: 0 for outcome in base.Outcome}
+
+
+class _Serialization(test_interfaces.Serialization):
+
+ def serialize_request(self, request):
+ return request + request
+
+ def deserialize_request(self, serialized_request):
+ return serialized_request[:len(serialized_request) / 2]
+
+ def serialize_response(self, response):
+ return response * 3
+
+ def deserialize_response(self, serialized_response):
+ return serialized_response[2 * len(serialized_response) / 3:]
+
+
+def _advance(quadruples, operator, controller):
+ try:
+ for quadruple in quadruples:
+ operator.advance(
+ initial_metadata=quadruple[0], payload=quadruple[1],
+ completion=quadruple[2], allowance=quadruple[3])
+ except Exception as e: # pylint: disable=broad-except
+ controller.failed('Exception on advance: %e' % e)
+
+
+class _Operator(base.Operator):
+
+ def __init__(self, controller, on_advance, pool, operator_under_test):
+ self._condition = threading.Condition()
+ self._controller = controller
+ self._on_advance = on_advance
+ self._pool = pool
+ self._operator_under_test = operator_under_test
+ self._pending_advances = []
+
+ def set_operator_under_test(self, operator_under_test):
+ with self._condition:
+ self._operator_under_test = operator_under_test
+ pent_advances = self._pending_advances
+ self._pending_advances = []
+ pool = self._pool
+ controller = self._controller
+
+ if pool is None:
+ _advance(pent_advances, operator_under_test, controller)
+ else:
+ pool.submit(_advance, pent_advances, operator_under_test, controller)
+
+ def advance(
+ self, initial_metadata=None, payload=None, completion=None,
+ allowance=None):
+ on_advance = self._on_advance(
+ initial_metadata, payload, completion, allowance)
+ if on_advance.kind is _control.OnAdvance.Kind.ADVANCE:
+ with self._condition:
+ pool = self._pool
+ operator_under_test = self._operator_under_test
+ controller = self._controller
+
+ quadruple = (
+ on_advance.initial_metadata, on_advance.payload,
+ on_advance.completion, on_advance.allowance)
+ if pool is None:
+ _advance((quadruple,), operator_under_test, controller)
+ else:
+ pool.submit(_advance, (quadruple,), operator_under_test, controller)
+ elif on_advance.kind is _control.OnAdvance.Kind.DEFECT:
+ raise ValueError(
+ 'Deliberately raised exception from Operator.advance (in a test)!')
+
+
+class _Servicer(base.Servicer):
+ """An base.Servicer with instrumented for testing."""
+
+ def __init__(self, group, method, controllers, pool):
+ self._condition = threading.Condition()
+ self._group = group
+ self._method = method
+ self._pool = pool
+ self._controllers = list(controllers)
+
+ def service(self, group, method, context, output_operator):
+ with self._condition:
+ controller = self._controllers.pop(0)
+ if group != self._group or method != self._method:
+ controller.fail(
+ '%s != %s or %s != %s' % (group, self._group, method, self._method))
+ raise base.NoSuchMethodError()
+ else:
+ operator = _Operator(
+ controller, controller.on_service_advance, self._pool,
+ output_operator)
+ outcome = context.add_termination_callback(
+ controller.service_on_termination)
+ if outcome is not None:
+ controller.service_on_termination(outcome)
+ return utilities.full_subscription(operator)
+
+
+class _OperationTest(unittest.TestCase):
+
+ def setUp(self):
+ if self._synchronicity_variation:
+ self._pool = logging_pool.pool(test_constants.POOL_SIZE)
+ else:
+ self._pool = None
+ self._controller = self._controller_creator.controller(
+ self._implementation, self._randomness)
+
+ def tearDown(self):
+ if self._synchronicity_variation:
+ self._pool.shutdown(wait=True)
+ else:
+ self._pool = None
+
+ def test_operation(self):
+ invocation = self._controller.invocation()
+ if invocation.subscription_kind is base.Subscription.Kind.FULL:
+ test_operator = _Operator(
+ self._controller, self._controller.on_invocation_advance,
+ self._pool, None)
+ subscription = utilities.full_subscription(test_operator)
+ else:
+ # TODO(nathaniel): support and test other subscription kinds.
+ self.fail('Non-full subscriptions not yet supported!')
+
+ servicer = _Servicer(
+ invocation.group, invocation.method, (self._controller,), self._pool)
+
+ invocation_end, service_end, memo = self._implementation.instantiate(
+ {(invocation.group, invocation.method): _Serialization()}, servicer)
+
+ try:
+ invocation_end.start()
+ service_end.start()
+ operation_context, operator_under_test = invocation_end.operate(
+ invocation.group, invocation.method, subscription, invocation.timeout,
+ initial_metadata=invocation.initial_metadata, payload=invocation.payload,
+ completion=invocation.completion)
+ test_operator.set_operator_under_test(operator_under_test)
+ outcome = operation_context.add_termination_callback(
+ self._controller.invocation_on_termination)
+ if outcome is not None:
+ self._controller.invocation_on_termination(outcome)
+ except Exception as e: # pylint: disable=broad-except
+ self._controller.failed('Exception on invocation: %s' % e)
+ self.fail(e)
+
+ while True:
+ instruction = self._controller.poll()
+ if instruction.kind is _control.Instruction.Kind.ADVANCE:
+ try:
+ test_operator.advance(
+ *instruction.advance_args, **instruction.advance_kwargs)
+ except Exception as e: # pylint: disable=broad-except
+ self._controller.failed('Exception on instructed advance: %s' % e)
+ elif instruction.kind is _control.Instruction.Kind.CANCEL:
+ try:
+ operation_context.cancel()
+ except Exception as e: # pylint: disable=broad-except
+ self._controller.failed('Exception on cancel: %s' % e)
+ elif instruction.kind is _control.Instruction.Kind.CONCLUDE:
+ break
+
+ invocation_stop_event = invocation_end.stop(0)
+ service_stop_event = service_end.stop(0)
+ invocation_stop_event.wait()
+ service_stop_event.wait()
+ invocation_stats = invocation_end.operation_stats()
+ service_stats = service_end.operation_stats()
+
+ self._implementation.destantiate(memo)
+
+ self.assertTrue(
+ instruction.conclude_success, msg=instruction.conclude_message)
+
+ expected_invocation_stats = dict(_EMPTY_OUTCOME_DICT)
+ expected_invocation_stats[instruction.conclude_invocation_outcome] += 1
+ self.assertDictEqual(expected_invocation_stats, invocation_stats)
+ expected_service_stats = dict(_EMPTY_OUTCOME_DICT)
+ expected_service_stats[instruction.conclude_service_outcome] += 1
+ self.assertDictEqual(expected_service_stats, service_stats)
+
+
+def test_cases(implementation):
+ """Creates unittest.TestCase classes for a given Base implementation.
+
+ Args:
+ implementation: A test_interfaces.Implementation specifying creation and
+ destruction of the Base implementation under test.
+
+ Returns:
+ A sequence of subclasses of unittest.TestCase defining tests of the
+ specified Base layer implementation.
+ """
+ random_seed = hash(time.time())
+ logging.warning('Random seed for this execution: %s', random_seed)
+ randomness = random.Random(x=random_seed)
+
+ test_case_classes = []
+ for synchronicity_variation in _SYNCHRONICITY_VARIATION:
+ for controller_creator in _control.CONTROLLER_CREATORS:
+ name = ''.join(
+ (synchronicity_variation[0], controller_creator.name(), 'Test',))
+ test_case_classes.append(
+ type(name, (_OperationTest,),
+ {'_implementation': implementation,
+ '_randomness': randomness,
+ '_synchronicity_variation': synchronicity_variation[1],
+ '_controller_creator': controller_creator,
+ }))
+
+ return test_case_classes
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_interfaces.py b/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_interfaces.py
new file mode 100644
index 0000000000..02426ab846
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_interfaces.py
@@ -0,0 +1,186 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Interfaces used in tests of implementations of the Base layer."""
+
+import abc
+
+from grpc.framework.interfaces.base import base # pylint: disable=unused-import
+
+
+class Serialization(object):
+ """Specifies serialization and deserialization of test payloads."""
+ __metaclass__ = abc.ABCMeta
+
+ def serialize_request(self, request):
+ """Serializes a request value used in a test.
+
+ Args:
+ request: A request value created by a test.
+
+ Returns:
+ A bytestring that is the serialization of the given request.
+ """
+ raise NotImplementedError()
+
+ def deserialize_request(self, serialized_request):
+ """Deserializes a request value used in a test.
+
+ Args:
+ serialized_request: A bytestring that is the serialization of some request
+ used in a test.
+
+ Returns:
+ The request value encoded by the given bytestring.
+ """
+ raise NotImplementedError()
+
+ def serialize_response(self, response):
+ """Serializes a response value used in a test.
+
+ Args:
+ response: A response value created by a test.
+
+ Returns:
+ A bytestring that is the serialization of the given response.
+ """
+ raise NotImplementedError()
+
+ def deserialize_response(self, serialized_response):
+ """Deserializes a response value used in a test.
+
+ Args:
+ serialized_response: A bytestring that is the serialization of some
+ response used in a test.
+
+ Returns:
+ The response value encoded by the given bytestring.
+ """
+ raise NotImplementedError()
+
+
+class Implementation(object):
+ """Specifies an implementation of the Base layer."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def instantiate(self, serializations, servicer):
+ """Instantiates the Base layer implementation to be used in a test.
+
+ Args:
+ serializations: A dict from group-method pair to Serialization object
+ specifying how to serialize and deserialize payload values used in the
+ test.
+ servicer: A base.Servicer object to be called to service RPCs made during
+ the test.
+
+ Returns:
+ A sequence of length three the first element of which is a
+ base.End to be used to invoke RPCs, the second element of which is a
+ base.End to be used to service invoked RPCs, and the third element of
+ which is an arbitrary memo object to be kept and passed to destantiate
+ at the conclusion of the test.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def destantiate(self, memo):
+ """Destroys the Base layer implementation under test.
+
+ Args:
+ memo: The object from the third position of the return value of a call to
+ instantiate.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def invocation_initial_metadata(self):
+ """Provides an operation's invocation-side initial metadata.
+
+ Returns:
+ A value to use for an operation's invocation-side initial metadata, or
+ None.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def service_initial_metadata(self):
+ """Provices an operation's service-side initial metadata.
+
+ Returns:
+ A value to use for an operation's service-side initial metadata, or
+ None.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def invocation_completion(self):
+ """Provides an operation's invocation-side completion.
+
+ Returns:
+ A base.Completion to use for an operation's invocation-side completion.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def service_completion(self):
+ """Provides an operation's service-side completion.
+
+ Returns:
+ A base.Completion to use for an operation's service-side completion.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def metadata_transmitted(self, original_metadata, transmitted_metadata):
+ """Identifies whether or not metadata was properly transmitted.
+
+ Args:
+ original_metadata: A metadata value passed to the system under test.
+ transmitted_metadata: The same metadata value after having been
+ transmitted through the system under test.
+
+ Returns:
+ Whether or not the metadata was properly transmitted.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def completion_transmitted(self, original_completion, transmitted_completion):
+ """Identifies whether or not a base.Completion was properly transmitted.
+
+ Args:
+ original_completion: A base.Completion passed to the system under test.
+ transmitted_completion: The same completion value after having been
+ transmitted through the system under test.
+
+ Returns:
+ Whether or not the completion was properly transmitted.
+ """
+ raise NotImplementedError()
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_cases.py b/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_cases.py
index 26ca035c44..1e575d1a9e 100644
--- a/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_cases.py
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_cases.py
@@ -303,16 +303,9 @@ class TransmissionTest(object):
invocation_message, links.Ticket.Termination.COMPLETION)
self._invocation_link.accept_ticket(original_invocation_ticket)
- # TODO(nathaniel): This shouldn't be necessary. Detecting the end of the
- # invocation-side ticket sequence shouldn't require granting allowance for
- # another payload.
self._service_mate.block_until_tickets_satisfy(
at_least_n_payloads_received_predicate(1))
service_operation_id = self._service_mate.tickets()[0].operation_id
- self._service_link.accept_ticket(
- links.Ticket(
- service_operation_id, 0, None, None, links.Ticket.Subscription.FULL,
- None, 1, None, None, None, None, None, None))
self._service_mate.block_until_tickets_satisfy(terminated)
self._assert_is_valid_invocation_sequence(
@@ -321,7 +314,7 @@ class TransmissionTest(object):
invocation_terminal_metadata, links.Ticket.Termination.COMPLETION)
original_service_ticket = links.Ticket(
- service_operation_id, 1, None, None, links.Ticket.Subscription.FULL,
+ service_operation_id, 0, None, None, links.Ticket.Subscription.FULL,
timeout, 0, service_initial_metadata, service_payload,
service_terminal_metadata, service_code, service_message,
links.Ticket.Termination.COMPLETION)
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_utilities.py b/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_utilities.py
index 6c2e3346aa..a2bd7107c1 100644
--- a/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_utilities.py
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_utilities.py
@@ -29,9 +29,42 @@
"""State and behavior appropriate for use in tests."""
+import logging
import threading
+import time
from grpc.framework.interfaces.links import links
+from grpc.framework.interfaces.links import utilities
+
+# A more-or-less arbitrary limit on the length of raw data values to be logged.
+_UNCOMFORTABLY_LONG = 48
+
+
+def _safe_for_log_ticket(ticket):
+ """Creates a safe-for-printing-to-the-log ticket for a given ticket.
+
+ Args:
+ ticket: Any links.Ticket.
+
+ Returns:
+ A links.Ticket that is as much as can be equal to the given ticket but
+ possibly features values like the string "<payload of length 972321>" in
+ place of the actual values of the given ticket.
+ """
+ if isinstance(ticket.payload, (basestring,)):
+ payload_length = len(ticket.payload)
+ else:
+ payload_length = -1
+ if payload_length < _UNCOMFORTABLY_LONG:
+ return ticket
+ else:
+ return links.Ticket(
+ ticket.operation_id, ticket.sequence_number,
+ ticket.group, ticket.method, ticket.subscription, ticket.timeout,
+ ticket.allowance, ticket.initial_metadata,
+ '<payload of length {}>'.format(payload_length),
+ ticket.terminal_metadata, ticket.code, ticket.message,
+ ticket.termination)
class RecordingLink(links.Link):
@@ -64,3 +97,71 @@ class RecordingLink(links.Link):
"""Returns a copy of the list of all tickets received by this Link."""
with self._condition:
return tuple(self._tickets)
+
+
+class _Pipe(object):
+ """A conduit that logs all tickets passed through it."""
+
+ def __init__(self, name):
+ self._lock = threading.Lock()
+ self._name = name
+ self._left_mate = utilities.NULL_LINK
+ self._right_mate = utilities.NULL_LINK
+
+ def accept_left_to_right_ticket(self, ticket):
+ with self._lock:
+ logging.warning(
+ '%s: moving left to right through %s: %s', time.time(), self._name,
+ _safe_for_log_ticket(ticket))
+ try:
+ self._right_mate.accept_ticket(ticket)
+ except Exception as e: # pylint: disable=broad-except
+ logging.exception(e)
+
+ def accept_right_to_left_ticket(self, ticket):
+ with self._lock:
+ logging.warning(
+ '%s: moving right to left through %s: %s', time.time(), self._name,
+ _safe_for_log_ticket(ticket))
+ try:
+ self._left_mate.accept_ticket(ticket)
+ except Exception as e: # pylint: disable=broad-except
+ logging.exception(e)
+
+ def join_left_mate(self, left_mate):
+ with self._lock:
+ self._left_mate = utilities.NULL_LINK if left_mate is None else left_mate
+
+ def join_right_mate(self, right_mate):
+ with self._lock:
+ self._right_mate = (
+ utilities.NULL_LINK if right_mate is None else right_mate)
+
+
+class _Facade(links.Link):
+
+ def __init__(self, accept, join):
+ self._accept = accept
+ self._join = join
+
+ def accept_ticket(self, ticket):
+ self._accept(ticket)
+
+ def join_link(self, link):
+ self._join(link)
+
+
+def logging_links(name):
+ """Creates a conduit that logs all tickets passed through it.
+
+ Args:
+ name: A name to use for the conduit to identify itself in logging output.
+
+ Returns:
+ Two links.Links, the first of which is the "left" side of the conduit
+ and the second of which is the "right" side of the conduit.
+ """
+ pipe = _Pipe(name)
+ left_facade = _Facade(pipe.accept_left_to_right_ticket, pipe.join_left_mate)
+ right_facade = _Facade(pipe.accept_right_to_left_ticket, pipe.join_right_mate)
+ return left_facade, right_facade
diff --git a/src/python/grpcio_test/setup.py b/src/python/grpcio_test/setup.py
index 925c32720f..a6203cae2d 100644
--- a/src/python/grpcio_test/setup.py
+++ b/src/python/grpcio_test/setup.py
@@ -48,8 +48,13 @@ _PACKAGE_DIRECTORIES = {
_PACKAGE_DATA = {
'grpc_interop': [
- 'credentials/ca.pem', 'credentials/server1.key',
- 'credentials/server1.pem',]
+ 'credentials/ca.pem',
+ 'credentials/server1.key',
+ 'credentials/server1.pem',
+ ],
+ 'grpc_protoc_plugin': [
+ 'test.proto',
+ ],
}
_SETUP_REQUIRES = (
@@ -75,5 +80,5 @@ setuptools.setup(
package_data=_PACKAGE_DATA,
install_requires=_INSTALL_REQUIRES + _SETUP_REQUIRES,
setup_requires=_SETUP_REQUIRES,
- cmdclass=_COMMAND_CLASS
+ cmdclass=_COMMAND_CLASS,
)