diff options
Diffstat (limited to 'src/python')
63 files changed, 5979 insertions, 142 deletions
diff --git a/src/python/README.md b/src/python/README.md index 2beb3a913a..de0142db05 100644 --- a/src/python/README.md +++ b/src/python/README.md @@ -9,12 +9,36 @@ Alpha : Ready for early adopters PREREQUISITES ------------- - Python 2.7, virtualenv, pip -- [homebrew][] on Mac OS X, [linuxbrew][] on Linux. These simplify the installation of the gRPC C core. +- [homebrew][] on Mac OS X. These simplify the installation of the gRPC C core. INSTALLATION ------------- -On Mac OS X, install [homebrew][]. On Linux, install [linuxbrew][]. -Run the following command to install gRPC Python. + +**Linux (Debian):** + +Add [Debian unstable][] to your `sources.list` file. Example: + +```sh +echo "deb http://ftp.us.debian.org/debian unstable main contrib non-free" | \ +sudo tee -a /etc/apt/sources.list +``` + +Install the gRPC Debian package + +```sh +sudo apt-get update +sudo apt-get install libgrpc-dev +``` + +Install the gRPC Python module + +```sh +sudo pip install grpcio +``` + +**Mac OS X** + +Install [homebrew][]. Run the following command to install gRPC Python. ```sh $ curl -fsSL https://goo.gl/getgrpc | bash -s python ``` @@ -27,11 +51,6 @@ Please read our online documentation for a [Quick Start][] and a [detailed examp BUILDING FROM SOURCE --------------------- - Clone this repository -- Build the gRPC core from the root of the - [gRPC Git repository](https://github.com/grpc/grpc) -``` -$ make shared_c static_c -``` - Use build_python.sh to build the Python code and install it into a virtual environment ``` @@ -60,7 +79,7 @@ $ ../../tools/distrib/python/submit.py ``` [homebrew]:http://brew.sh -[linuxbrew]:https://github.com/Homebrew/linuxbrew#installation [gRPC install script]:https://raw.githubusercontent.com/grpc/homebrew-grpc/master/scripts/install [Quick Start]:http://www.grpc.io/docs/tutorials/basic/python.html [detailed example]:http://www.grpc.io/docs/installation/python.html +[Debian unstable]:https://www.debian.org/releases/sid/ diff --git a/src/python/grpcio/README.rst b/src/python/grpcio/README.rst index 00bdecf56f..c7b5a3bde4 100644 --- a/src/python/grpcio/README.rst +++ b/src/python/grpcio/README.rst @@ -6,7 +6,7 @@ Package for GRPC Python. Dependencies ------------ -Ensure you have installed the gRPC core. On Mac OS X, install homebrew_. On Linux, install linuxbrew_. +Ensure you have installed the gRPC core. On Mac OS X, install homebrew_. Run the following command to install gRPC Python. :: @@ -19,5 +19,4 @@ Otherwise, `install from source`_ .. _`install from source`: https://github.com/grpc/grpc/blob/master/src/python/README.md#building-from-source .. _homebrew: http://brew.sh -.. _linuxbrew: https://github.com/Homebrew/linuxbrew#installation .. _`gRPC install script`: https://raw.githubusercontent.com/grpc/homebrew-grpc/master/scripts/install diff --git a/src/python/grpcio/grpc/_adapter/_c/module.c b/src/python/grpcio/grpc/_adapter/_c/module.c index 1f3aedd9d8..9b93b051f6 100644 --- a/src/python/grpcio/grpc/_adapter/_c/module.c +++ b/src/python/grpcio/grpc/_adapter/_c/module.c @@ -53,6 +53,12 @@ PyMODINIT_FUNC init_c(void) { return; } + if (PyModule_AddStringConstant( + module, "PRIMARY_USER_AGENT_KEY", + GRPC_ARG_PRIMARY_USER_AGENT_STRING) < 0) { + return; + } + /* GRPC maintains an internal counter of how many times it has been initialized and handles multiple pairs of grpc_init()/grpc_shutdown() invocations accordingly. */ diff --git a/src/python/grpcio/grpc/_adapter/_c/types.h b/src/python/grpcio/grpc/_adapter/_c/types.h index 4e0da4a28a..f646465c63 100644 --- a/src/python/grpcio/grpc/_adapter/_c/types.h +++ b/src/python/grpcio/grpc/_adapter/_c/types.h @@ -113,6 +113,7 @@ Call *pygrpc_Call_new_empty(CompletionQueue *cq); void pygrpc_Call_dealloc(Call *self); PyObject *pygrpc_Call_start_batch(Call *self, PyObject *args, PyObject *kwargs); PyObject *pygrpc_Call_cancel(Call *self, PyObject *args, PyObject *kwargs); +PyObject *pygrpc_Call_peer(Call *self); extern PyTypeObject pygrpc_Call_type; @@ -129,6 +130,11 @@ Channel *pygrpc_Channel_new( void pygrpc_Channel_dealloc(Channel *self); Call *pygrpc_Channel_create_call( Channel *self, PyObject *args, PyObject *kwargs); +PyObject *pygrpc_Channel_check_connectivity_state(Channel *self, PyObject *args, + PyObject *kwargs); +PyObject *pygrpc_Channel_watch_connectivity_state(Channel *self, PyObject *args, + PyObject *kwargs); +PyObject *pygrpc_Channel_target(Channel *self); extern PyTypeObject pygrpc_Channel_type; @@ -181,6 +187,9 @@ pygrpc_tag *pygrpc_produce_request_tag(PyObject *user_tag, Call *empty_call); /* Construct a tag associated with a server shutdown. */ pygrpc_tag *pygrpc_produce_server_shutdown_tag(PyObject *user_tag); +/* Construct a tag associated with a channel state change. */ +pygrpc_tag *pygrpc_produce_channel_state_change_tag(PyObject *user_tag); + /* Frees all resources owned by the tag and the tag itself. */ void pygrpc_discard_tag(pygrpc_tag *tag); diff --git a/src/python/grpcio/grpc/_adapter/_c/types/call.c b/src/python/grpcio/grpc/_adapter/_c/types/call.c index 0739070044..42a50151f6 100644 --- a/src/python/grpcio/grpc/_adapter/_c/types/call.c +++ b/src/python/grpcio/grpc/_adapter/_c/types/call.c @@ -42,6 +42,7 @@ PyMethodDef pygrpc_Call_methods[] = { {"start_batch", (PyCFunction)pygrpc_Call_start_batch, METH_KEYWORDS, ""}, {"cancel", (PyCFunction)pygrpc_Call_cancel, METH_KEYWORDS, ""}, + {"peer", (PyCFunction)pygrpc_Call_peer, METH_NOARGS, ""}, {NULL} }; const char pygrpc_Call_doc[] = "See grpc._adapter._types.Call."; @@ -131,7 +132,7 @@ PyObject *pygrpc_Call_start_batch(Call *self, PyObject *args, PyObject *kwargs) } } tag = pygrpc_produce_batch_tag(user_tag, self, ops, nops); - errcode = grpc_call_start_batch(self->c_call, tag->ops, tag->nops, tag); + errcode = grpc_call_start_batch(self->c_call, tag->ops, tag->nops, tag, NULL); gpr_free(ops); return PyInt_FromLong(errcode); } @@ -151,13 +152,20 @@ PyObject *pygrpc_Call_cancel(Call *self, PyObject *args, PyObject *kwargs) { return NULL; } code = PyInt_AsLong(py_code); - errcode = grpc_call_cancel_with_status(self->c_call, code, details); + errcode = grpc_call_cancel_with_status(self->c_call, code, details, NULL); } else if (py_code != NULL || details != NULL) { PyErr_SetString(PyExc_ValueError, "if `code` is specified, so must `details`"); return NULL; } else { - errcode = grpc_call_cancel(self->c_call); + errcode = grpc_call_cancel(self->c_call, NULL); } return PyInt_FromLong(errcode); } + +PyObject *pygrpc_Call_peer(Call *self) { + char *peer = grpc_call_get_peer(self->c_call); + PyObject *py_peer = PyString_FromString(peer); + gpr_free(peer); + return py_peer; +} diff --git a/src/python/grpcio/grpc/_adapter/_c/types/channel.c b/src/python/grpcio/grpc/_adapter/_c/types/channel.c index 963104742f..c577ac05eb 100644 --- a/src/python/grpcio/grpc/_adapter/_c/types/channel.c +++ b/src/python/grpcio/grpc/_adapter/_c/types/channel.c @@ -36,10 +36,14 @@ #define PY_SSIZE_T_CLEAN #include <Python.h> #include <grpc/grpc.h> +#include <grpc/support/alloc.h> PyMethodDef pygrpc_Channel_methods[] = { {"create_call", (PyCFunction)pygrpc_Channel_create_call, METH_KEYWORDS, ""}, + {"check_connectivity_state", (PyCFunction)pygrpc_Channel_check_connectivity_state, METH_KEYWORDS, ""}, + {"watch_connectivity_state", (PyCFunction)pygrpc_Channel_watch_connectivity_state, METH_KEYWORDS, ""}, + {"target", (PyCFunction)pygrpc_Channel_target, METH_NOARGS, ""}, {NULL} }; const char pygrpc_Channel_doc[] = "See grpc._adapter._types.Channel."; @@ -104,7 +108,7 @@ Channel *pygrpc_Channel_new( if (creds) { self->c_chan = grpc_secure_channel_create(creds->c_creds, target, &c_args); } else { - self->c_chan = grpc_insecure_channel_create(target, &c_args); + self->c_chan = grpc_insecure_channel_create(target, &c_args, NULL); } pygrpc_discard_channel_args(c_args); return self; @@ -122,13 +126,61 @@ Call *pygrpc_Channel_create_call( const char *host; double deadline; char *keywords[] = {"cq", "method", "host", "deadline", NULL}; - if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O!ssd:create_call", keywords, + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O!szd:create_call", keywords, &pygrpc_CompletionQueue_type, &cq, &method, &host, &deadline)) { return NULL; } call = pygrpc_Call_new_empty(cq); call->c_call = grpc_channel_create_call( self->c_chan, NULL, GRPC_PROPAGATE_DEFAULTS, cq->c_cq, method, host, - pygrpc_cast_double_to_gpr_timespec(deadline)); + pygrpc_cast_double_to_gpr_timespec(deadline), NULL); return call; } + +PyObject *pygrpc_Channel_check_connectivity_state( + Channel *self, PyObject *args, PyObject *kwargs) { + PyObject *py_try_to_connect; + int try_to_connect; + char *keywords[] = {"try_to_connect", NULL}; + grpc_connectivity_state state; + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O:connectivity_state", keywords, + &py_try_to_connect)) { + return NULL; + } + if (!PyBool_Check(py_try_to_connect)) { + Py_XDECREF(py_try_to_connect); + return NULL; + } + try_to_connect = Py_True == py_try_to_connect; + Py_DECREF(py_try_to_connect); + state = grpc_channel_check_connectivity_state(self->c_chan, try_to_connect); + return PyInt_FromLong(state); +} + +PyObject *pygrpc_Channel_watch_connectivity_state( + Channel *self, PyObject *args, PyObject *kwargs) { + PyObject *tag; + double deadline; + int last_observed_state; + CompletionQueue *completion_queue; + char *keywords[] = {"last_observed_state", "deadline", + "completion_queue", "tag"}; + if (!PyArg_ParseTupleAndKeywords( + args, kwargs, "idO!O:watch_connectivity_state", keywords, + &last_observed_state, &deadline, &pygrpc_CompletionQueue_type, + &completion_queue, &tag)) { + return NULL; + } + grpc_channel_watch_connectivity_state( + self->c_chan, (grpc_connectivity_state)last_observed_state, + pygrpc_cast_double_to_gpr_timespec(deadline), completion_queue->c_cq, + pygrpc_produce_channel_state_change_tag(tag)); + Py_RETURN_NONE; +} + +PyObject *pygrpc_Channel_target(Channel *self) { + char *target = grpc_channel_get_target(self->c_chan); + PyObject *py_target = PyString_FromString(target); + gpr_free(target); + return py_target; +} diff --git a/src/python/grpcio/grpc/_adapter/_c/types/completion_queue.c b/src/python/grpcio/grpc/_adapter/_c/types/completion_queue.c index 2dd44b6ddd..d8bb89ca4b 100644 --- a/src/python/grpcio/grpc/_adapter/_c/types/completion_queue.c +++ b/src/python/grpcio/grpc/_adapter/_c/types/completion_queue.c @@ -90,7 +90,7 @@ PyTypeObject pygrpc_CompletionQueue_type = { CompletionQueue *pygrpc_CompletionQueue_new( PyTypeObject *type, PyObject *args, PyObject *kwargs) { CompletionQueue *self = (CompletionQueue *)type->tp_alloc(type, 0); - self->c_cq = grpc_completion_queue_create(); + self->c_cq = grpc_completion_queue_create(NULL); return self; } @@ -111,7 +111,7 @@ PyObject *pygrpc_CompletionQueue_next( } Py_BEGIN_ALLOW_THREADS; event = grpc_completion_queue_next( - self->c_cq, pygrpc_cast_double_to_gpr_timespec(deadline)); + self->c_cq, pygrpc_cast_double_to_gpr_timespec(deadline), NULL); Py_END_ALLOW_THREADS; transliterated_event = pygrpc_consume_event(event); return transliterated_event; diff --git a/src/python/grpcio/grpc/_adapter/_c/types/server.c b/src/python/grpcio/grpc/_adapter/_c/types/server.c index c2190ea672..15c98f28eb 100644 --- a/src/python/grpcio/grpc/_adapter/_c/types/server.c +++ b/src/python/grpcio/grpc/_adapter/_c/types/server.c @@ -96,7 +96,7 @@ Server *pygrpc_Server_new(PyTypeObject *type, PyObject *args, PyObject *kwargs) PyObject *py_args; grpc_channel_args c_args; char *keywords[] = {"cq", "args", NULL}; - if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O!O:Channel", keywords, + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O!O:Server", keywords, &pygrpc_CompletionQueue_type, &cq, &py_args)) { return NULL; } @@ -104,8 +104,8 @@ Server *pygrpc_Server_new(PyTypeObject *type, PyObject *args, PyObject *kwargs) return NULL; } self = (Server *)type->tp_alloc(type, 0); - self->c_serv = grpc_server_create(&c_args); - grpc_server_register_completion_queue(self->c_serv, cq->c_cq); + self->c_serv = grpc_server_create(&c_args, NULL); + grpc_server_register_completion_queue(self->c_serv, cq->c_cq, NULL); pygrpc_discard_channel_args(c_args); self->cq = cq; Py_INCREF(self->cq); diff --git a/src/python/grpcio/grpc/_adapter/_c/utility.c b/src/python/grpcio/grpc/_adapter/_c/utility.c index 51f3c9be01..590f7e013a 100644 --- a/src/python/grpcio/grpc/_adapter/_c/utility.c +++ b/src/python/grpcio/grpc/_adapter/_c/utility.c @@ -88,6 +88,19 @@ pygrpc_tag *pygrpc_produce_server_shutdown_tag(PyObject *user_tag) { return tag; } +pygrpc_tag *pygrpc_produce_channel_state_change_tag(PyObject *user_tag) { + pygrpc_tag *tag = gpr_malloc(sizeof(pygrpc_tag)); + tag->user_tag = user_tag; + Py_XINCREF(tag->user_tag); + tag->call = NULL; + tag->ops = NULL; + tag->nops = 0; + grpc_call_details_init(&tag->request_call_details); + grpc_metadata_array_init(&tag->request_metadata); + tag->is_new_call = 0; + return tag; +} + void pygrpc_discard_tag(pygrpc_tag *tag) { if (!tag) { return; @@ -139,7 +152,7 @@ PyObject *pygrpc_consume_event(grpc_event event) { } int pygrpc_produce_op(PyObject *op, grpc_op *result) { - static const int OP_TUPLE_SIZE = 5; + static const int OP_TUPLE_SIZE = 6; static const int STATUS_TUPLE_SIZE = 2; static const int TYPE_INDEX = 0; static const int INITIAL_METADATA_INDEX = 1; @@ -148,6 +161,7 @@ int pygrpc_produce_op(PyObject *op, grpc_op *result) { static const int STATUS_INDEX = 4; static const int STATUS_CODE_INDEX = 0; static const int STATUS_DETAILS_INDEX = 1; + static const int WRITE_FLAGS_INDEX = 5; int type; Py_ssize_t message_size; char *message; @@ -170,7 +184,11 @@ int pygrpc_produce_op(PyObject *op, grpc_op *result) { return 0; } c_op.op = type; - c_op.flags = 0; + c_op.reserved = NULL; + c_op.flags = PyInt_AsLong(PyTuple_GET_ITEM(op, WRITE_FLAGS_INDEX)); + if (PyErr_Occurred()) { + return 0; + } switch (type) { case GRPC_OP_SEND_INITIAL_METADATA: if (!pygrpc_cast_pyseq_to_send_metadata( diff --git a/src/python/grpcio/grpc/_adapter/_intermediary_low.py b/src/python/grpcio/grpc/_adapter/_intermediary_low.py index 3c7f0a2619..e7bf9dc462 100644 --- a/src/python/grpcio/grpc/_adapter/_intermediary_low.py +++ b/src/python/grpcio/grpc/_adapter/_intermediary_low.py @@ -127,7 +127,7 @@ class Call(object): def write(self, message, tag): return self._internal.start_batch([ - _types.OpArgs.send_message(message) + _types.OpArgs.send_message(message, 0) ], _TagAdapter(tag, Event.Kind.WRITE_ACCEPTED)) def complete(self, tag): diff --git a/src/python/grpcio/grpc/_adapter/_low.py b/src/python/grpcio/grpc/_adapter/_low.py index dcf67dbc11..147086e725 100644 --- a/src/python/grpcio/grpc/_adapter/_low.py +++ b/src/python/grpcio/grpc/_adapter/_low.py @@ -27,9 +27,12 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +from grpc import _grpcio_metadata from grpc._adapter import _c from grpc._adapter import _types +_USER_AGENT = 'Python-gRPC-{}'.format(_grpcio_metadata.__version__) + ClientCredentials = _c.ClientCredentials ServerCredentials = _c.ServerCredentials @@ -72,10 +75,14 @@ class Call(_types.Call): else: return self.call.cancel(code, details) + def peer(self): + return self.call.peer() + class Channel(_types.Channel): def __init__(self, target, args, creds=None): + args = list(args) + [(_c.PRIMARY_USER_AGENT_KEY, _USER_AGENT)] if creds is None: self.channel = _c.Channel(target, args) else: @@ -84,6 +91,17 @@ class Channel(_types.Channel): def create_call(self, completion_queue, method, host, deadline=None): return Call(self.channel.create_call(completion_queue.completion_queue, method, host, deadline)) + def check_connectivity_state(self, try_to_connect): + return self.channel.check_connectivity_state(try_to_connect) + + def watch_connectivity_state(self, last_observed_state, deadline, + completion_queue, tag): + self.channel.watch_connectivity_state( + last_observed_state, deadline, completion_queue.completion_queue, tag) + + def target(self): + return self.channel.target() + _NO_TAG = object() diff --git a/src/python/grpcio/grpc/_adapter/_types.py b/src/python/grpcio/grpc/_adapter/_types.py index 5ddb1774ea..5470d2de4a 100644 --- a/src/python/grpcio/grpc/_adapter/_types.py +++ b/src/python/grpcio/grpc/_adapter/_types.py @@ -31,13 +31,12 @@ import abc import collections import enum -# TODO(atash): decide whether or not to move these enums to the _c module to -# force build errors with upstream changes. class GrpcChannelArgumentKeys(enum.Enum): """Mirrors keys used in grpc_channel_args for GRPC-specific arguments.""" SSL_TARGET_NAME_OVERRIDE = 'grpc.ssl_target_name_override' + @enum.unique class CallError(enum.IntEnum): """Mirrors grpc_call_error in the C core.""" @@ -53,6 +52,7 @@ class CallError(enum.IntEnum): ERROR_INVALID_FLAGS = 9 ERROR_INVALID_METADATA = 10 + @enum.unique class StatusCode(enum.IntEnum): """Mirrors grpc_status_code in the C core.""" @@ -74,6 +74,14 @@ class StatusCode(enum.IntEnum): DATA_LOSS = 15 UNAUTHENTICATED = 16 + +@enum.unique +class OpWriteFlags(enum.IntEnum): + """Mirrors defined write-flag constants in the C core.""" + WRITE_BUFFER_HINT = 1 + WRITE_NO_COMPRESS = 2 + + @enum.unique class OpType(enum.IntEnum): """Mirrors grpc_op_type in the C core.""" @@ -86,12 +94,24 @@ class OpType(enum.IntEnum): RECV_STATUS_ON_CLIENT = 6 RECV_CLOSE_ON_SERVER = 7 + @enum.unique class EventType(enum.IntEnum): """Mirrors grpc_completion_type in the C core.""" - QUEUE_SHUTDOWN = 0 - QUEUE_TIMEOUT = 1 # if seen on the Python side, something went horridly wrong - OP_COMPLETE = 2 + QUEUE_SHUTDOWN = 0 + QUEUE_TIMEOUT = 1 # if seen on the Python side, something went horridly wrong + OP_COMPLETE = 2 + + +@enum.unique +class ConnectivityState(enum.IntEnum): + """Mirrors grpc_connectivity_state in the C core.""" + IDLE = 0 + CONNECTING = 1 + READY = 2 + TRANSIENT_FAILURE = 3 + FATAL_FAILURE = 4 + class Status(collections.namedtuple( 'Status', [ @@ -105,6 +125,7 @@ class Status(collections.namedtuple( details (str): ... """ + class CallDetails(collections.namedtuple( 'CallDetails', [ 'method', @@ -119,6 +140,7 @@ class CallDetails(collections.namedtuple( deadline (float): ... """ + class OpArgs(collections.namedtuple( 'OpArgs', [ 'type', @@ -126,6 +148,7 @@ class OpArgs(collections.namedtuple( 'trailing_metadata', 'message', 'status', + 'write_flags', ])): """Arguments passed into a GRPC operation. @@ -138,39 +161,40 @@ class OpArgs(collections.namedtuple( message (bytes): Only valid if type == OpType.SEND_MESSAGE, else is None. status (Status): Only valid if type == OpType.SEND_STATUS_FROM_SERVER, else is None. + write_flags (int): a bit OR'ing of 0 or more OpWriteFlags values. """ @staticmethod def send_initial_metadata(initial_metadata): - return OpArgs(OpType.SEND_INITIAL_METADATA, initial_metadata, None, None, None) + return OpArgs(OpType.SEND_INITIAL_METADATA, initial_metadata, None, None, None, 0) @staticmethod - def send_message(message): - return OpArgs(OpType.SEND_MESSAGE, None, None, message, None) + def send_message(message, flags): + return OpArgs(OpType.SEND_MESSAGE, None, None, message, None, flags) @staticmethod def send_close_from_client(): - return OpArgs(OpType.SEND_CLOSE_FROM_CLIENT, None, None, None, None) + return OpArgs(OpType.SEND_CLOSE_FROM_CLIENT, None, None, None, None, 0) @staticmethod def send_status_from_server(trailing_metadata, status_code, status_details): - return OpArgs(OpType.SEND_STATUS_FROM_SERVER, None, trailing_metadata, None, Status(status_code, status_details)) + return OpArgs(OpType.SEND_STATUS_FROM_SERVER, None, trailing_metadata, None, Status(status_code, status_details), 0) @staticmethod def recv_initial_metadata(): - return OpArgs(OpType.RECV_INITIAL_METADATA, None, None, None, None); + return OpArgs(OpType.RECV_INITIAL_METADATA, None, None, None, None, 0); @staticmethod def recv_message(): - return OpArgs(OpType.RECV_MESSAGE, None, None, None, None) + return OpArgs(OpType.RECV_MESSAGE, None, None, None, None, 0) @staticmethod def recv_status_on_client(): - return OpArgs(OpType.RECV_STATUS_ON_CLIENT, None, None, None, None) + return OpArgs(OpType.RECV_STATUS_ON_CLIENT, None, None, None, None, 0) @staticmethod def recv_close_on_server(): - return OpArgs(OpType.RECV_CLOSE_ON_SERVER, None, None, None, None) + return OpArgs(OpType.RECV_CLOSE_ON_SERVER, None, None, None, None, 0) class OpResult(collections.namedtuple( @@ -290,6 +314,15 @@ class Call: """ return CallError.ERROR + @abc.abstractmethod + def peer(self): + """Get the peer of this call. + + Returns: + str: the peer of this call. + """ + return None + class Channel: __metaclass__ = abc.ABCMeta @@ -321,6 +354,40 @@ class Channel: """ return None + @abc.abstractmethod + def check_connectivity_state(self, try_to_connect): + """Check and optionally repair the connectivity state of the channel. + + Args: + try_to_connect (bool): whether or not to try to connect the channel if + disconnected. + + Returns: + ConnectivityState: state of the channel at the time of this invocation. + """ + return None + + @abc.abstractmethod + def watch_connectivity_state(self, last_observed_state, deadline, + completion_queue, tag): + """Watch for connectivity state changes from the last_observed_state. + + Args: + last_observed_state (ConnectivityState): ... + deadline (float): ... + completion_queue (CompletionQueue): ... + tag (object) ... + """ + + @abc.abstractmethod + def target(self): + """Get the target of this channel. + + Returns: + str: the target of this channel. + """ + return None + class Server: __metaclass__ = abc.ABCMeta diff --git a/src/python/grpcio/grpc/_links/service.py b/src/python/grpcio/grpc/_links/service.py index 7783e91824..5c636d61ab 100644 --- a/src/python/grpcio/grpc/_links/service.py +++ b/src/python/grpcio/grpc/_links/service.py @@ -44,7 +44,10 @@ from grpc.framework.interfaces.links import links @enum.unique class _Read(enum.Enum): READING = 'reading' - AWAITING_ALLOWANCE = 'awaiting allowance' + # TODO(issue 2916): This state will again be necessary after eliminating the + # "early_read" field of _RPCState and going back to only reading when granted + # allowance to read. + # AWAITING_ALLOWANCE = 'awaiting allowance' CLOSED = 'closed' @@ -67,12 +70,15 @@ class _RPCState(object): def __init__( self, request_deserializer, response_serializer, sequence_number, read, - allowance, high_write, low_write, premetadataed, terminal_metadata, code, - message): + early_read, allowance, high_write, low_write, premetadataed, + terminal_metadata, code, message): self.request_deserializer = request_deserializer self.response_serializer = response_serializer self.sequence_number = sequence_number self.read = read + # TODO(issue 2916): Eliminate this by eliminating the necessity of calling + # call.read just to advance the RPC. + self.early_read = early_read # A raw (not deserialized) read. self.allowance = allowance self.high_write = high_write self.low_write = low_write @@ -120,7 +126,7 @@ class _Kernel(object): call.read(call) self._rpc_states[call] = _RPCState( - request_deserializer, response_serializer, 1, _Read.READING, 0, + request_deserializer, response_serializer, 1, _Read.READING, None, 1, _HighWrite.OPEN, _LowWrite.OPEN, False, None, None, None) ticket = links.Ticket( call, 0, group, method, links.Ticket.Subscription.FULL, @@ -140,12 +146,15 @@ class _Kernel(object): termination = links.Ticket.Termination.COMPLETION else: if 0 < rpc_state.allowance: + payload = rpc_state.request_deserializer(event.bytes) + termination = None rpc_state.allowance -= 1 call.read(call) else: - rpc_state.read = _Read.AWAITING_ALLOWANCE - payload = rpc_state.request_deserializer(event.bytes) - termination = None + rpc_state.early_read = event.bytes + return + # TODO(issue 2916): Instead of returning: + # rpc_state.read = _Read.AWAITING_ALLOWANCE ticket = links.Ticket( call, rpc_state.sequence_number, None, None, None, None, None, None, payload, None, None, None, termination) @@ -237,12 +246,22 @@ class _Kernel(object): rpc_state.premetadataed = True if ticket.allowance is not None: - if rpc_state.read is _Read.AWAITING_ALLOWANCE: - rpc_state.allowance += ticket.allowance - 1 - call.read(call) - rpc_state.read = _Read.READING - else: + if rpc_state.early_read is None: rpc_state.allowance += ticket.allowance + else: + payload = rpc_state.request_deserializer(rpc_state.early_read) + rpc_state.allowance += ticket.allowance - 1 + rpc_state.early_read = None + if rpc_state.read is _Read.READING: + call.read(call) + termination = None + else: + termination = links.Ticket.Termination.COMPLETION + ticket = links.Ticket( + call, rpc_state.sequence_number, None, None, None, None, None, + None, payload, None, None, None, termination) + rpc_state.sequence_number += 1 + self._relay.add_value(ticket) if ticket.payload is not None: call.write(rpc_state.response_serializer(ticket.payload), call) diff --git a/src/python/grpcio/grpc/early_adopter/implementations.py b/src/python/grpcio/grpc/early_adopter/implementations.py index 10919fae69..9c396aa7ad 100644 --- a/src/python/grpcio/grpc/early_adopter/implementations.py +++ b/src/python/grpcio/grpc/early_adopter/implementations.py @@ -41,13 +41,15 @@ from grpc.framework.base import util as _base_utilities from grpc.framework.face import implementations as _face_implementations from grpc.framework.foundation import logging_pool -_THREAD_POOL_SIZE = 8 +_DEFAULT_THREAD_POOL_SIZE = 8 _ONE_DAY_IN_SECONDS = 24 * 60 * 60 class _Server(interfaces.Server): - def __init__(self, breakdown, port, private_key, certificate_chain): + def __init__( + self, breakdown, port, private_key, certificate_chain, + thread_pool_size=_DEFAULT_THREAD_POOL_SIZE): self._lock = threading.Lock() self._breakdown = breakdown self._port = port @@ -56,6 +58,7 @@ class _Server(interfaces.Server): else: self._key_chain_pairs = ((private_key, certificate_chain),) + self._pool_size = thread_pool_size self._pool = None self._back = None self._fore_link = None @@ -63,7 +66,7 @@ class _Server(interfaces.Server): def _start(self): with self._lock: if self._pool is None: - self._pool = logging_pool.pool(_THREAD_POOL_SIZE) + self._pool = logging_pool.pool(self._pool_size) servicer = _face_implementations.servicer( self._pool, self._breakdown.implementations, None) self._back = _base_implementations.back_link( @@ -114,7 +117,8 @@ class _Stub(interfaces.Stub): def __init__( self, breakdown, host, port, secure, root_certificates, private_key, - certificate_chain, metadata_transformer=None, server_host_override=None): + certificate_chain, metadata_transformer=None, server_host_override=None, + thread_pool_size=_DEFAULT_THREAD_POOL_SIZE): self._lock = threading.Lock() self._breakdown = breakdown self._host = host @@ -126,6 +130,7 @@ class _Stub(interfaces.Stub): self._metadata_transformer = metadata_transformer self._server_host_override = server_host_override + self._pool_size = thread_pool_size self._pool = None self._front = None self._rear_link = None @@ -134,7 +139,7 @@ class _Stub(interfaces.Stub): def __enter__(self): with self._lock: if self._pool is None: - self._pool = logging_pool.pool(_THREAD_POOL_SIZE) + self._pool = logging_pool.pool(self._pool_size) self._front = _base_implementations.front_link( self._pool, self._pool, self._pool) self._rear_link = _rear.RearLink( @@ -193,7 +198,7 @@ class _Stub(interfaces.Stub): def stub( service_name, methods, host, port, metadata_transformer=None, secure=False, root_certificates=None, private_key=None, certificate_chain=None, - server_host_override=None): + server_host_override=None, thread_pool_size=_DEFAULT_THREAD_POOL_SIZE): """Constructs an interfaces.Stub. Args: @@ -216,6 +221,8 @@ def stub( certificate chain should be used. server_host_override: (For testing only) the target name used for SSL host name checking. + thread_pool_size: The maximum number of threads to allow in the backing + thread pool. Returns: An interfaces.Stub affording RPC invocation. @@ -224,11 +231,13 @@ def stub( return _Stub( breakdown, host, port, secure, root_certificates, private_key, certificate_chain, server_host_override=server_host_override, - metadata_transformer=metadata_transformer) + metadata_transformer=metadata_transformer, + thread_pool_size=thread_pool_size) def server( - service_name, methods, port, private_key=None, certificate_chain=None): + service_name, methods, port, private_key=None, certificate_chain=None, + thread_pool_size=_DEFAULT_THREAD_POOL_SIZE): """Constructs an interfaces.Server. Args: @@ -242,9 +251,12 @@ def server( private_key: A pem-encoded private key, or None for an insecure server. certificate_chain: A pem-encoded certificate chain, or None for an insecure server. + thread_pool_size: The maximum number of threads to allow in the backing + thread pool. Returns: An interfaces.Server that will serve secure traffic. """ breakdown = _face_utilities.break_down_service(service_name, methods) - return _Server(breakdown, port, private_key, certificate_chain) + return _Server(breakdown, port, private_key, certificate_chain, + thread_pool_size=thread_pool_size) diff --git a/src/python/grpcio/grpc/framework/core/__init__.py b/src/python/grpcio/grpc/framework/core/__init__.py new file mode 100644 index 0000000000..7086519106 --- /dev/null +++ b/src/python/grpcio/grpc/framework/core/__init__.py @@ -0,0 +1,30 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + diff --git a/src/python/grpcio/grpc/framework/core/_constants.py b/src/python/grpcio/grpc/framework/core/_constants.py new file mode 100644 index 0000000000..d3be3a4c4a --- /dev/null +++ b/src/python/grpcio/grpc/framework/core/_constants.py @@ -0,0 +1,59 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Private constants for the package.""" + +from grpc.framework.interfaces.base import base +from grpc.framework.interfaces.links import links + +TICKET_SUBSCRIPTION_FOR_BASE_SUBSCRIPTION_KIND = { + base.Subscription.Kind.NONE: links.Ticket.Subscription.NONE, + base.Subscription.Kind.TERMINATION_ONLY: + links.Ticket.Subscription.TERMINATION, + base.Subscription.Kind.FULL: links.Ticket.Subscription.FULL, + } + +# Mapping from abortive operation outcome to ticket termination to be +# sent to the other side of the operation, or None to indicate that no +# ticket should be sent to the other side in the event of such an +# outcome. +ABORTION_OUTCOME_TO_TICKET_TERMINATION = { + base.Outcome.CANCELLED: links.Ticket.Termination.CANCELLATION, + base.Outcome.EXPIRED: links.Ticket.Termination.EXPIRATION, + base.Outcome.LOCAL_SHUTDOWN: links.Ticket.Termination.SHUTDOWN, + base.Outcome.REMOTE_SHUTDOWN: None, + base.Outcome.RECEPTION_FAILURE: links.Ticket.Termination.RECEPTION_FAILURE, + base.Outcome.TRANSMISSION_FAILURE: None, + base.Outcome.LOCAL_FAILURE: links.Ticket.Termination.LOCAL_FAILURE, + base.Outcome.REMOTE_FAILURE: links.Ticket.Termination.REMOTE_FAILURE, +} + +INTERNAL_ERROR_LOG_MESSAGE = ':-( RPC Framework (Core) internal error! )-:' +TERMINATION_CALLBACK_EXCEPTION_LOG_MESSAGE = ( + 'Exception calling termination callback!') diff --git a/src/python/grpcio/grpc/framework/core/_context.py b/src/python/grpcio/grpc/framework/core/_context.py new file mode 100644 index 0000000000..24a12b612e --- /dev/null +++ b/src/python/grpcio/grpc/framework/core/_context.py @@ -0,0 +1,92 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""State and behavior for operation context.""" + +import time + +# _interfaces is referenced from specification in this module. +from grpc.framework.core import _interfaces # pylint: disable=unused-import +from grpc.framework.interfaces.base import base + + +class OperationContext(base.OperationContext): + """An implementation of interfaces.OperationContext.""" + + def __init__( + self, lock, termination_manager, transmission_manager, + expiration_manager): + """Constructor. + + Args: + lock: The operation-wide lock. + termination_manager: The _interfaces.TerminationManager for the operation. + transmission_manager: The _interfaces.TransmissionManager for the + operation. + expiration_manager: The _interfaces.ExpirationManager for the operation. + """ + self._lock = lock + self._termination_manager = termination_manager + self._transmission_manager = transmission_manager + self._expiration_manager = expiration_manager + + def _abort(self, outcome): + with self._lock: + if self._termination_manager.outcome is None: + self._termination_manager.abort(outcome) + self._transmission_manager.abort(outcome) + self._expiration_manager.terminate() + + def outcome(self): + """See base.OperationContext.outcome for specification.""" + with self._lock: + return self._termination_manager.outcome + + def add_termination_callback(self, callback): + """See base.OperationContext.add_termination_callback.""" + with self._lock: + if self._termination_manager.outcome is None: + self._termination_manager.add_callback(callback) + return None + else: + return self._termination_manager.outcome + + def time_remaining(self): + """See base.OperationContext.time_remaining for specification.""" + with self._lock: + deadline = self._expiration_manager.deadline() + return max(0.0, deadline - time.time()) + + def cancel(self): + """See base.OperationContext.cancel for specification.""" + self._abort(base.Outcome.CANCELLED) + + def fail(self, exception): + """See base.OperationContext.fail for specification.""" + self._abort(base.Outcome.LOCAL_FAILURE) diff --git a/src/python/grpcio/grpc/framework/core/_emission.py b/src/python/grpcio/grpc/framework/core/_emission.py new file mode 100644 index 0000000000..7c702ab2ce --- /dev/null +++ b/src/python/grpcio/grpc/framework/core/_emission.py @@ -0,0 +1,97 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""State and behavior for handling emitted values.""" + +from grpc.framework.core import _interfaces +from grpc.framework.interfaces.base import base + + +class EmissionManager(_interfaces.EmissionManager): + """An EmissionManager implementation.""" + + def __init__( + self, lock, termination_manager, transmission_manager, + expiration_manager): + """Constructor. + + Args: + lock: The operation-wide lock. + termination_manager: The _interfaces.TerminationManager for the operation. + transmission_manager: The _interfaces.TransmissionManager for the + operation. + expiration_manager: The _interfaces.ExpirationManager for the operation. + """ + self._lock = lock + self._termination_manager = termination_manager + self._transmission_manager = transmission_manager + self._expiration_manager = expiration_manager + self._ingestion_manager = None + + self._initial_metadata_seen = False + self._payload_seen = False + self._completion_seen = False + + def set_ingestion_manager(self, ingestion_manager): + """Sets the ingestion manager with which this manager will cooperate. + + Args: + ingestion_manager: The _interfaces.IngestionManager for the operation. + """ + self._ingestion_manager = ingestion_manager + + def advance( + self, initial_metadata=None, payload=None, completion=None, + allowance=None): + initial_metadata_present = initial_metadata is not None + payload_present = payload is not None + completion_present = completion is not None + allowance_present = allowance is not None + with self._lock: + if self._termination_manager.outcome is None: + if (initial_metadata_present and ( + self._initial_metadata_seen or self._payload_seen or + self._completion_seen) or + payload_present and self._completion_seen or + completion_present and self._completion_seen or + allowance_present and allowance <= 0): + self._termination_manager.abort(base.Outcome.LOCAL_FAILURE) + self._transmission_manager.abort(base.Outcome.LOCAL_FAILURE) + self._expiration_manager.terminate() + else: + self._initial_metadata_seen |= initial_metadata_present + self._payload_seen |= payload_present + self._completion_seen |= completion_present + if completion_present: + self._termination_manager.emission_complete() + self._ingestion_manager.local_emissions_done() + self._transmission_manager.advance( + initial_metadata, payload, completion, allowance) + if allowance_present: + self._ingestion_manager.add_local_allowance(allowance) diff --git a/src/python/grpcio/grpc/framework/core/_end.py b/src/python/grpcio/grpc/framework/core/_end.py new file mode 100644 index 0000000000..fb2c532df6 --- /dev/null +++ b/src/python/grpcio/grpc/framework/core/_end.py @@ -0,0 +1,251 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Implementation of base.End.""" + +import abc +import enum +import threading +import uuid + +from grpc.framework.core import _operation +from grpc.framework.core import _utilities +from grpc.framework.foundation import callable_util +from grpc.framework.foundation import later +from grpc.framework.foundation import logging_pool +from grpc.framework.interfaces.base import base +from grpc.framework.interfaces.links import links +from grpc.framework.interfaces.links import utilities + +_IDLE_ACTION_EXCEPTION_LOG_MESSAGE = 'Exception calling idle action!' + + +class End(base.End, links.Link): + """A bridge between base.End and links.Link. + + Implementations of this interface translate arriving tickets into + calls on application objects implementing base interfaces and + translate calls from application objects implementing base interfaces + into tickets sent to a joined link. + """ + __metaclass__ = abc.ABCMeta + + +class _Cycle(object): + """State for a single start-stop End lifecycle.""" + + def __init__(self, pool): + self.pool = pool + self.grace = False + self.futures = [] + self.operations = {} + self.idle_actions = [] + + +def _abort(operations): + for operation in operations: + operation.abort(base.Outcome.LOCAL_SHUTDOWN) + + +def _cancel_futures(futures): + for future in futures: + futures.cancel() + + +def _future_shutdown(lock, cycle, event): + def in_future(): + with lock: + _abort(cycle.operations.values()) + _cancel_futures(cycle.futures) + pool = cycle.pool + cycle.pool.shutdown(wait=True) + return in_future + + +def _termination_action(lock, stats, operation_id, cycle): + """Constructs the termination action for a single operation. + + Args: + lock: A lock to hold during the termination action. + states: A mapping from base.Outcome values to integers to increment with + the outcome given to the termination action. + operation_id: The operation ID for the termination action. + cycle: A _Cycle value to be updated during the termination action. + + Returns: + A callable that takes an operation outcome as its sole parameter and that + should be used as the termination action for the operation associated + with the given operation ID. + """ + def termination_action(outcome): + with lock: + stats[outcome] += 1 + cycle.operations.pop(operation_id, None) + if not cycle.operations: + for action in cycle.idle_actions: + cycle.pool.submit(action) + cycle.idle_actions = [] + if cycle.grace: + _cancel_futures(cycle.futures) + return termination_action + + +class _End(End): + """An End implementation.""" + + def __init__(self, servicer_package): + """Constructor. + + Args: + servicer_package: A _ServicerPackage for servicing operations or None if + this end will not be used to service operations. + """ + self._lock = threading.Condition() + self._servicer_package = servicer_package + + self._stats = {outcome: 0 for outcome in base.Outcome} + + self._mate = None + + self._cycle = None + + def start(self): + """See base.End.start for specification.""" + with self._lock: + if self._cycle is not None: + raise ValueError('Tried to start a not-stopped End!') + else: + self._cycle = _Cycle(logging_pool.pool(1)) + + def stop(self, grace): + """See base.End.stop for specification.""" + with self._lock: + if self._cycle is None: + event = threading.Event() + event.set() + return event + elif not self._cycle.operations: + event = threading.Event() + self._cycle.pool.submit(event.set) + self._cycle.pool.shutdown(wait=False) + self._cycle = None + return event + else: + self._cycle.grace = True + event = threading.Event() + self._cycle.idle_actions.append(event.set) + if 0 < grace: + future = later.later( + grace, _future_shutdown(self._lock, self._cycle, event)) + self._cycle.futures.append(future) + else: + _abort(self._cycle.operations.values()) + return event + + def operate( + self, group, method, subscription, timeout, initial_metadata=None, + payload=None, completion=None): + """See base.End.operate for specification.""" + operation_id = uuid.uuid4() + with self._lock: + if self._cycle is None or self._cycle.grace: + raise ValueError('Can\'t operate on stopped or stopping End!') + termination_action = _termination_action( + self._lock, self._stats, operation_id, self._cycle) + operation = _operation.invocation_operate( + operation_id, group, method, subscription, timeout, initial_metadata, + payload, completion, self._mate.accept_ticket, termination_action, + self._cycle.pool) + self._cycle.operations[operation_id] = operation + return operation.context, operation.operator + + def operation_stats(self): + """See base.End.operation_stats for specification.""" + with self._lock: + return dict(self._stats) + + def add_idle_action(self, action): + """See base.End.add_idle_action for specification.""" + with self._lock: + if self._cycle is None: + raise ValueError('Can\'t add idle action to stopped End!') + action_with_exceptions_logged = callable_util.with_exceptions_logged( + action, _IDLE_ACTION_EXCEPTION_LOG_MESSAGE) + if self._cycle.operations: + self._cycle.idle_actions.append(action_with_exceptions_logged) + else: + self._cycle.pool.submit(action_with_exceptions_logged) + + def accept_ticket(self, ticket): + """See links.Link.accept_ticket for specification.""" + with self._lock: + if self._cycle is not None and not self._cycle.grace: + operation = self._cycle.operations.get(ticket.operation_id) + if operation is not None: + operation.handle_ticket(ticket) + elif self._servicer_package is not None: + termination_action = _termination_action( + self._lock, self._stats, ticket.operation_id, self._cycle) + operation = _operation.service_operate( + self._servicer_package, ticket, self._mate.accept_ticket, + termination_action, self._cycle.pool) + if operation is not None: + self._cycle.operations[ticket.operation_id] = operation + + def join_link(self, link): + """See links.Link.join_link for specification.""" + with self._lock: + self._mate = utilities.NULL_LINK if link is None else link + + +def serviceless_end_link(): + """Constructs an End usable only for invoking operations. + + Returns: + An End usable for translating operations into ticket exchange. + """ + return _End(None) + + +def serviceful_end_link(servicer, default_timeout, maximum_timeout): + """Constructs an End capable of servicing operations. + + Args: + servicer: An interfaces.Servicer for servicing operations. + default_timeout: A length of time in seconds to be used as the default + time alloted for a single operation. + maximum_timeout: A length of time in seconds to be used as the maximum + time alloted for a single operation. + + Returns: + An End capable of servicing the operations requested of it through ticket + exchange. + """ + return _End( + _utilities.ServicerPackage(servicer, default_timeout, maximum_timeout)) diff --git a/src/python/grpcio/grpc/framework/core/_expiration.py b/src/python/grpcio/grpc/framework/core/_expiration.py new file mode 100644 index 0000000000..d94bdf2d2b --- /dev/null +++ b/src/python/grpcio/grpc/framework/core/_expiration.py @@ -0,0 +1,152 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""State and behavior for operation expiration.""" + +import time + +from grpc.framework.core import _interfaces +from grpc.framework.foundation import later +from grpc.framework.interfaces.base import base + + +class _ExpirationManager(_interfaces.ExpirationManager): + """An implementation of _interfaces.ExpirationManager.""" + + def __init__( + self, commencement, timeout, maximum_timeout, lock, termination_manager, + transmission_manager): + """Constructor. + + Args: + commencement: The time in seconds since the epoch at which the operation + began. + timeout: A length of time in seconds to allow for the operation to run. + maximum_timeout: The maximum length of time in seconds to allow for the + operation to run despite what is requested via this object's + change_timout method. + lock: The operation-wide lock. + termination_manager: The _interfaces.TerminationManager for the operation. + transmission_manager: The _interfaces.TransmissionManager for the + operation. + """ + self._lock = lock + self._termination_manager = termination_manager + self._transmission_manager = transmission_manager + self._commencement = commencement + self._maximum_timeout = maximum_timeout + + self._timeout = timeout + self._deadline = commencement + timeout + self._index = None + self._future = None + + def _expire(self, index): + def expire(): + with self._lock: + if self._future is not None and index == self._index: + self._future = None + self._termination_manager.expire() + self._transmission_manager.abort(base.Outcome.EXPIRED) + return expire + + def start(self): + self._index = 0 + self._future = later.later(self._timeout, self._expire(0)) + + def change_timeout(self, timeout): + if self._future is not None and timeout != self._timeout: + self._future.cancel() + new_timeout = min(timeout, self._maximum_timeout) + new_index = self._index + 1 + self._timeout = new_timeout + self._deadline = self._commencement + new_timeout + self._index = new_index + delay = self._deadline - time.time() + self._future = later.later(delay, self._expire(new_index)) + if new_timeout != timeout: + self._transmission_manager.timeout(new_timeout) + + def deadline(self): + return self._deadline + + def terminate(self): + if self._future: + self._future.cancel() + self._future = None + self._deadline_index = None + + +def invocation_expiration_manager( + timeout, lock, termination_manager, transmission_manager): + """Creates an _interfaces.ExpirationManager appropriate for front-side use. + + Args: + timeout: A length of time in seconds to allow for the operation to run. + lock: The operation-wide lock. + termination_manager: The _interfaces.TerminationManager for the operation. + transmission_manager: The _interfaces.TransmissionManager for the + operation. + + Returns: + An _interfaces.ExpirationManager appropriate for invocation-side use. + """ + expiration_manager = _ExpirationManager( + time.time(), timeout, timeout, lock, termination_manager, + transmission_manager) + expiration_manager.start() + return expiration_manager + + +def service_expiration_manager( + timeout, default_timeout, maximum_timeout, lock, termination_manager, + transmission_manager): + """Creates an _interfaces.ExpirationManager appropriate for back-side use. + + Args: + timeout: A length of time in seconds to allow for the operation to run. May + be None in which case default_timeout will be used. + default_timeout: The default length of time in seconds to allow for the + operation to run if the front-side customer has not specified such a value + (or if the value they specified is not yet known). + maximum_timeout: The maximum length of time in seconds to allow for the + operation to run. + lock: The operation-wide lock. + termination_manager: The _interfaces.TerminationManager for the operation. + transmission_manager: The _interfaces.TransmissionManager for the + operation. + + Returns: + An _interfaces.ExpirationManager appropriate for service-side use. + """ + expiration_manager = _ExpirationManager( + time.time(), default_timeout if timeout is None else timeout, + maximum_timeout, lock, termination_manager, transmission_manager) + expiration_manager.start() + return expiration_manager diff --git a/src/python/grpcio/grpc/framework/core/_ingestion.py b/src/python/grpcio/grpc/framework/core/_ingestion.py new file mode 100644 index 0000000000..59f7f8adc8 --- /dev/null +++ b/src/python/grpcio/grpc/framework/core/_ingestion.py @@ -0,0 +1,410 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""State and behavior for ingestion during an operation.""" + +import abc +import collections + +from grpc.framework.core import _constants +from grpc.framework.core import _interfaces +from grpc.framework.foundation import abandonment +from grpc.framework.foundation import callable_util +from grpc.framework.interfaces.base import base + +_CREATE_SUBSCRIPTION_EXCEPTION_LOG_MESSAGE = 'Exception initializing ingestion!' +_INGESTION_EXCEPTION_LOG_MESSAGE = 'Exception during ingestion!' + + +class _SubscriptionCreation(collections.namedtuple( + '_SubscriptionCreation', ('subscription', 'remote_error', 'abandoned'))): + """A sum type for the outcome of ingestion initialization. + + Either subscription will be non-None, remote_error will be True, or abandoned + will be True. + + Attributes: + subscription: A base.Subscription describing the customer's interest in + operation values from the other side. + remote_error: A boolean indicating that the subscription could not be + created due to an error on the remote side of the operation. + abandoned: A boolean indicating that subscription creation was abandoned. + """ + + +class _SubscriptionCreator(object): + """Common specification of subscription-creating behavior.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def create(self, group, method): + """Creates the base.Subscription of the local customer. + + Any exceptions raised by this method should be attributed to and treated as + defects in the customer code called by this method. + + Args: + group: The group identifier of the operation. + method: The method identifier of the operation. + + Returns: + A _SubscriptionCreation describing the result of subscription creation. + """ + raise NotImplementedError() + + +class _ServiceSubscriptionCreator(_SubscriptionCreator): + """A _SubscriptionCreator appropriate for service-side use.""" + + def __init__(self, servicer, operation_context, output_operator): + """Constructor. + + Args: + servicer: The base.Servicer that will service the operation. + operation_context: A base.OperationContext for the operation to be passed + to the customer. + output_operator: A base.Operator for the operation to be passed to the + customer and to be called by the customer to accept operation data + emitted by the customer. + """ + self._servicer = servicer + self._operation_context = operation_context + self._output_operator = output_operator + + def create(self, group, method): + try: + subscription = self._servicer.service( + group, method, self._operation_context, self._output_operator) + except base.NoSuchMethodError: + return _SubscriptionCreation(None, True, False) + except abandonment.Abandoned: + return _SubscriptionCreation(None, False, True) + else: + return _SubscriptionCreation(subscription, False, False) + + +def _wrap(behavior): + def wrapped(*args, **kwargs): + try: + behavior(*args, **kwargs) + except abandonment.Abandoned: + return False + else: + return True + return wrapped + + +class _IngestionManager(_interfaces.IngestionManager): + """An implementation of _interfaces.IngestionManager.""" + + def __init__( + self, lock, pool, subscription, subscription_creator, termination_manager, + transmission_manager, expiration_manager): + """Constructor. + + Args: + lock: The operation-wide lock. + pool: A thread pool in which to execute customer code. + subscription: A base.Subscription describing the customer's interest in + operation values from the other side. May be None if + subscription_creator is not None. + subscription_creator: A _SubscriptionCreator wrapping the portion of + customer code that when called returns the base.Subscription describing + the customer's interest in operation values from the other side. May be + None if subscription is not None. + termination_manager: The _interfaces.TerminationManager for the operation. + transmission_manager: The _interfaces.TransmissionManager for the + operation. + expiration_manager: The _interfaces.ExpirationManager for the operation. + """ + self._lock = lock + self._pool = pool + self._termination_manager = termination_manager + self._transmission_manager = transmission_manager + self._expiration_manager = expiration_manager + + if subscription is None: + self._subscription_creator = subscription_creator + self._wrapped_operator = None + elif subscription.kind is base.Subscription.Kind.FULL: + self._subscription_creator = None + self._wrapped_operator = _wrap(subscription.operator.advance) + else: + # TODO(nathaniel): Support other subscriptions. + raise ValueError('Unsupported subscription "%s"!' % subscription.kind) + self._pending_initial_metadata = None + self._pending_payloads = [] + self._pending_completion = None + self._local_allowance = 1 + # A nonnegative integer or None, with None indicating that the local + # customer is done emitting anyway so there's no need to bother it by + # informing it that the remote customer has granted it further permission to + # emit. + self._remote_allowance = 0 + self._processing = False + + def _abort_internal_only(self): + self._subscription_creator = None + self._wrapped_operator = None + self._pending_initial_metadata = None + self._pending_payloads = None + self._pending_completion = None + + def _abort_and_notify(self, outcome): + self._abort_internal_only() + self._termination_manager.abort(outcome) + self._transmission_manager.abort(outcome) + self._expiration_manager.terminate() + + def _operator_next(self): + """Computes the next step for full-subscription ingestion. + + Returns: + An initial_metadata, payload, completion, allowance, continue quintet + indicating what operation values (if any) are available to pass into + customer code and whether or not there is anything immediately + actionable to call customer code to do. + """ + if self._wrapped_operator is None: + return None, None, None, None, False + else: + initial_metadata, payload, completion, allowance, action = [None] * 5 + if self._pending_initial_metadata is not None: + initial_metadata = self._pending_initial_metadata + self._pending_initial_metadata = None + action = True + if self._pending_payloads and 0 < self._local_allowance: + payload = self._pending_payloads.pop(0) + self._local_allowance -= 1 + action = True + if not self._pending_payloads and self._pending_completion is not None: + completion = self._pending_completion + self._pending_completion = None + action = True + if self._remote_allowance is not None and 0 < self._remote_allowance: + allowance = self._remote_allowance + self._remote_allowance = 0 + action = True + return initial_metadata, payload, completion, allowance, bool(action) + + def _operator_process( + self, wrapped_operator, initial_metadata, payload, + completion, allowance): + while True: + advance_outcome = callable_util.call_logging_exceptions( + wrapped_operator, _INGESTION_EXCEPTION_LOG_MESSAGE, + initial_metadata=initial_metadata, payload=payload, + completion=completion, allowance=allowance) + if advance_outcome.exception is None: + if advance_outcome.return_value: + with self._lock: + if self._termination_manager.outcome is not None: + return + if completion is not None: + self._termination_manager.ingestion_complete() + initial_metadata, payload, completion, allowance, moar = ( + self._operator_next()) + if not moar: + self._processing = False + return + else: + with self._lock: + if self._termination_manager.outcome is None: + self._abort_and_notify(base.Outcome.LOCAL_FAILURE) + return + else: + with self._lock: + if self._termination_manager.outcome is None: + self._abort_and_notify(base.Outcome.LOCAL_FAILURE) + return + + def _operator_post_create(self, subscription): + wrapped_operator = _wrap(subscription.operator.advance) + with self._lock: + if self._termination_manager.outcome is not None: + return + self._wrapped_operator = wrapped_operator + self._subscription_creator = None + metadata, payload, completion, allowance, moar = self._operator_next() + if not moar: + self._processing = False + return + self._operator_process( + wrapped_operator, metadata, payload, completion, allowance) + + def _create(self, subscription_creator, group, name): + outcome = callable_util.call_logging_exceptions( + subscription_creator.create, _CREATE_SUBSCRIPTION_EXCEPTION_LOG_MESSAGE, + group, name) + if outcome.return_value is None: + with self._lock: + if self._termination_manager.outcome is None: + self._abort_and_notify(base.Outcome.LOCAL_FAILURE) + elif outcome.return_value.abandoned: + with self._lock: + if self._termination_manager.outcome is None: + self._abort_and_notify(base.Outcome.LOCAL_FAILURE) + elif outcome.return_value.remote_error: + with self._lock: + if self._termination_manager.outcome is None: + self._abort_and_notify(base.Outcome.REMOTE_FAILURE) + elif outcome.return_value.subscription.kind is base.Subscription.Kind.FULL: + self._operator_post_create(outcome.return_value.subscription) + else: + # TODO(nathaniel): Support other subscriptions. + raise ValueError( + 'Unsupported "%s"!' % outcome.return_value.subscription.kind) + + def _store_advance(self, initial_metadata, payload, completion, allowance): + if initial_metadata is not None: + self._pending_initial_metadata = initial_metadata + if payload is not None: + self._pending_payloads.append(payload) + if completion is not None: + self._pending_completion = completion + if allowance is not None and self._remote_allowance is not None: + self._remote_allowance += allowance + + def _operator_advance(self, initial_metadata, payload, completion, allowance): + if self._processing: + self._store_advance(initial_metadata, payload, completion, allowance) + else: + action = False + if initial_metadata is not None: + action = True + if payload is not None: + if 0 < self._local_allowance: + self._local_allowance -= 1 + action = True + else: + self._pending_payloads.append(payload) + payload = False + if completion is not None: + if self._pending_payloads: + self._pending_completion = completion + else: + action = True + if allowance is not None and self._remote_allowance is not None: + allowance += self._remote_allowance + self._remote_allowance = 0 + action = True + if action: + self._pool.submit( + callable_util.with_exceptions_logged( + self._operator_process, _constants.INTERNAL_ERROR_LOG_MESSAGE), + self._wrapped_operator, initial_metadata, payload, completion, + allowance) + + def set_group_and_method(self, group, method): + """See _interfaces.IngestionManager.set_group_and_method for spec.""" + if self._subscription_creator is not None and not self._processing: + self._pool.submit( + callable_util.with_exceptions_logged( + self._create, _constants.INTERNAL_ERROR_LOG_MESSAGE), + self._subscription_creator, group, method) + self._processing = True + + def add_local_allowance(self, allowance): + """See _interfaces.IngestionManager.add_local_allowance for spec.""" + if any((self._subscription_creator, self._wrapped_operator,)): + self._local_allowance += allowance + if not self._processing: + initial_metadata, payload, completion, allowance, moar = ( + self._operator_next()) + if moar: + self._pool.submit( + callable_util.with_exceptions_logged( + self._operator_process, + _constants.INTERNAL_ERROR_LOG_MESSAGE), + initial_metadata, payload, completion, allowance) + + def local_emissions_done(self): + self._remote_allowance = None + + def advance(self, initial_metadata, payload, completion, allowance): + """See _interfaces.IngestionManager.advance for specification.""" + if self._subscription_creator is not None: + self._store_advance(initial_metadata, payload, completion, allowance) + elif self._wrapped_operator is not None: + self._operator_advance(initial_metadata, payload, completion, allowance) + + +def invocation_ingestion_manager( + subscription, lock, pool, termination_manager, transmission_manager, + expiration_manager): + """Creates an IngestionManager appropriate for invocation-side use. + + Args: + subscription: A base.Subscription indicating the customer's interest in the + data and results from the service-side of the operation. + lock: The operation-wide lock. + pool: A thread pool in which to execute customer code. + termination_manager: The _interfaces.TerminationManager for the operation. + transmission_manager: The _interfaces.TransmissionManager for the + operation. + expiration_manager: The _interfaces.ExpirationManager for the operation. + + Returns: + An IngestionManager appropriate for invocation-side use. + """ + return _IngestionManager( + lock, pool, subscription, None, termination_manager, transmission_manager, + expiration_manager) + + +def service_ingestion_manager( + servicer, operation_context, output_operator, lock, pool, + termination_manager, transmission_manager, expiration_manager): + """Creates an IngestionManager appropriate for service-side use. + + The returned IngestionManager will require its set_group_and_name method to be + called before its advance method may be called. + + Args: + servicer: A base.Servicer for servicing the operation. + operation_context: A base.OperationContext for the operation to be passed to + the customer. + output_operator: A base.Operator for the operation to be passed to the + customer and to be called by the customer to accept operation data output + by the customer. + lock: The operation-wide lock. + pool: A thread pool in which to execute customer code. + termination_manager: The _interfaces.TerminationManager for the operation. + transmission_manager: The _interfaces.TransmissionManager for the + operation. + expiration_manager: The _interfaces.ExpirationManager for the operation. + + Returns: + An IngestionManager appropriate for service-side use. + """ + subscription_creator = _ServiceSubscriptionCreator( + servicer, operation_context, output_operator) + return _IngestionManager( + lock, pool, None, subscription_creator, termination_manager, + transmission_manager, expiration_manager) diff --git a/src/python/grpcio/grpc/framework/core/_interfaces.py b/src/python/grpcio/grpc/framework/core/_interfaces.py new file mode 100644 index 0000000000..a626b9f767 --- /dev/null +++ b/src/python/grpcio/grpc/framework/core/_interfaces.py @@ -0,0 +1,308 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Package-internal interfaces.""" + +import abc + +from grpc.framework.interfaces.base import base + + +class TerminationManager(object): + """An object responsible for handling the termination of an operation. + + Attributes: + outcome: None if the operation is active or a base.Outcome value if it has + terminated. + """ + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def add_callback(self, callback): + """Registers a callback to be called on operation termination. + + If the operation has already terminated the callback will not be called. + + Args: + callback: A callable that will be passed an interfaces.Outcome value. + + Returns: + None if the operation has not yet terminated and the passed callback will + be called when it does, or a base.Outcome value describing the operation + termination if the operation has terminated and the callback will not be + called as a result of this method call. + """ + raise NotImplementedError() + + @abc.abstractmethod + def emission_complete(self): + """Indicates that emissions from customer code have completed.""" + raise NotImplementedError() + + @abc.abstractmethod + def transmission_complete(self): + """Indicates that transmissions to the remote end are complete. + + Returns: + True if the operation has terminated or False if the operation remains + ongoing. + """ + raise NotImplementedError() + + @abc.abstractmethod + def reception_complete(self): + """Indicates that reception from the other side is complete.""" + raise NotImplementedError() + + @abc.abstractmethod + def ingestion_complete(self): + """Indicates that customer code ingestion of received values is complete.""" + raise NotImplementedError() + + @abc.abstractmethod + def expire(self): + """Indicates that the operation must abort because it has taken too long.""" + raise NotImplementedError() + + @abc.abstractmethod + def abort(self, outcome): + """Indicates that the operation must abort for the indicated reason. + + Args: + outcome: An interfaces.Outcome indicating operation abortion. + """ + raise NotImplementedError() + + +class TransmissionManager(object): + """A manager responsible for transmitting to the other end of an operation.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def kick_off( + self, group, method, timeout, initial_metadata, payload, completion, + allowance): + """Transmits the values associated with operation invocation.""" + raise NotImplementedError() + + @abc.abstractmethod + def advance(self, initial_metadata, payload, completion, allowance): + """Accepts values for transmission to the other end of the operation. + + Args: + initial_metadata: An initial metadata value to be transmitted to the other + side of the operation. May only ever be non-None once. + payload: A payload value. + completion: A base.Completion value. May only ever be non-None in the last + transmission to be made to the other side. + allowance: A positive integer communicating the number of additional + payloads allowed to be transmitted from the other side to this side of + the operation, or None if no additional allowance is being granted in + this call. + """ + raise NotImplementedError() + + @abc.abstractmethod + def timeout(self, timeout): + """Accepts for transmission to the other side a new timeout value. + + Args: + timeout: A positive float used as the new timeout value for the operation + to be transmitted to the other side. + """ + raise NotImplementedError() + + @abc.abstractmethod + def allowance(self, allowance): + """Indicates to this manager that the remote customer is allowing payloads. + + Args: + allowance: A positive integer indicating the number of additional payloads + the remote customer is allowing to be transmitted from this side of the + operation. + """ + raise NotImplementedError() + + @abc.abstractmethod + def remote_complete(self): + """Indicates to this manager that data from the remote side is complete.""" + raise NotImplementedError() + + @abc.abstractmethod + def abort(self, outcome): + """Indicates that the operation has aborted. + + Args: + outcome: An interfaces.Outcome for the operation. If None, indicates that + the operation abortion should not be communicated to the other side of + the operation. + """ + raise NotImplementedError() + + +class ExpirationManager(object): + """A manager responsible for aborting the operation if it runs out of time.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def change_timeout(self, timeout): + """Changes the timeout allotted for the operation. + + Operation duration is always measure from the beginning of the operation; + calling this method changes the operation's allotted time to timeout total + seconds, not timeout seconds from the time of this method call. + + Args: + timeout: A length of time in seconds to allow for the operation. + """ + raise NotImplementedError() + + @abc.abstractmethod + def deadline(self): + """Returns the time until which the operation is allowed to run. + + Returns: + The time (seconds since the epoch) at which the operation will expire. + """ + raise NotImplementedError() + + @abc.abstractmethod + def terminate(self): + """Indicates to this manager that the operation has terminated.""" + raise NotImplementedError() + + +class EmissionManager(base.Operator): + """A manager of values emitted by customer code.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def advance( + self, initial_metadata=None, payload=None, completion=None, + allowance=None): + """Accepts a value emitted by customer code. + + This method should only be called by customer code. + + Args: + initial_metadata: An initial metadata value emitted by the local customer + to be sent to the other side of the operation. + payload: A payload value emitted by the local customer to be sent to the + other side of the operation. + completion: A Completion value emitted by the local customer to be sent to + the other side of the operation. + allowance: A positive integer indicating an additional number of payloads + that the local customer is willing to accept from the other side of the + operation. + """ + raise NotImplementedError() + + +class IngestionManager(object): + """A manager responsible for executing customer code. + + This name of this manager comes from its responsibility to pass successive + values from the other side of the operation into the code of the local + customer. + """ + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def set_group_and_method(self, group, method): + """Communicates to this IngestionManager the operation group and method. + + Args: + group: The group identifier of the operation. + method: The method identifier of the operation. + """ + raise NotImplementedError() + + @abc.abstractmethod + def add_local_allowance(self, allowance): + """Communicates to this IngestionManager that more payloads may be ingested. + + Args: + allowance: A positive integer indicating an additional number of payloads + that the local customer is willing to ingest. + """ + raise NotImplementedError() + + @abc.abstractmethod + def local_emissions_done(self): + """Indicates to this manager that local emissions are done.""" + raise NotImplementedError() + + @abc.abstractmethod + def advance(self, initial_metadata, payload, completion, allowance): + """Advances the operation by passing values to the local customer.""" + raise NotImplementedError() + + +class ReceptionManager(object): + """A manager responsible for receiving tickets from the other end.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def receive_ticket(self, ticket): + """Handle a ticket from the other side of the operation. + + Args: + ticket: An interfaces.BackToFrontTicket or interfaces.FrontToBackTicket + appropriate to this end of the operation and this object. + """ + raise NotImplementedError() + + +class Operation(object): + """An ongoing operation. + + Attributes: + context: A base.OperationContext object for the operation. + operator: A base.Operator object for the operation for use by the customer + of the operation. + """ + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def handle_ticket(self, ticket): + """Handle a ticket from the other side of the operation. + + Args: + ticket: A links.Ticket from the other side of the operation. + """ + raise NotImplementedError() + + @abc.abstractmethod + def abort(self, outcome): + """Aborts the operation. + + Args: + outcome: A base.Outcome value indicating operation abortion. + """ + raise NotImplementedError() diff --git a/src/python/grpcio/grpc/framework/core/_operation.py b/src/python/grpcio/grpc/framework/core/_operation.py new file mode 100644 index 0000000000..d20e40a53d --- /dev/null +++ b/src/python/grpcio/grpc/framework/core/_operation.py @@ -0,0 +1,192 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Implementation of operations.""" + +import threading + +# _utilities is referenced from specification in this module. +from grpc.framework.core import _context +from grpc.framework.core import _emission +from grpc.framework.core import _expiration +from grpc.framework.core import _ingestion +from grpc.framework.core import _interfaces +from grpc.framework.core import _reception +from grpc.framework.core import _termination +from grpc.framework.core import _transmission +from grpc.framework.core import _utilities # pylint: disable=unused-import + + +class _EasyOperation(_interfaces.Operation): + """A trivial implementation of interfaces.Operation.""" + + def __init__( + self, lock, termination_manager, transmission_manager, expiration_manager, + context, operator, reception_manager): + """Constructor. + + Args: + lock: The operation-wide lock. + termination_manager: The _interfaces.TerminationManager for the operation. + transmission_manager: The _interfaces.TransmissionManager for the + operation. + expiration_manager: The _interfaces.ExpirationManager for the operation. + context: A base.OperationContext for use by the customer during the + operation. + operator: A base.Operator for use by the customer during the operation. + reception_manager: The _interfaces.ReceptionManager for the operation. + """ + self._lock = lock + self._termination_manager = termination_manager + self._transmission_manager = transmission_manager + self._expiration_manager = expiration_manager + self._reception_manager = reception_manager + + self.context = context + self.operator = operator + + def handle_ticket(self, ticket): + with self._lock: + self._reception_manager.receive_ticket(ticket) + + def abort(self, outcome): + with self._lock: + if self._termination_manager.outcome is None: + self._termination_manager.abort(outcome) + self._transmission_manager.abort(outcome) + self._expiration_manager.terminate() + + +def invocation_operate( + operation_id, group, method, subscription, timeout, initial_metadata, + payload, completion, ticket_sink, termination_action, pool): + """Constructs objects necessary for front-side operation management. + + Args: + operation_id: An object identifying the operation. + group: The group identifier of the operation. + method: The method identifier of the operation. + subscription: A base.Subscription describing the customer's interest in the + results of the operation. + timeout: A length of time in seconds to allow for the operation. + initial_metadata: An initial metadata value to be sent to the other side of + the operation. May be None if the initial metadata will be passed later or + if there will be no initial metadata passed at all. + payload: The first payload value to be transmitted to the other side. May be + None if there is no such value or if the customer chose not to pass it at + operation invocation. + completion: A base.Completion value indicating the end of values passed to + the other side of the operation. + ticket_sink: A callable that accepts links.Tickets and delivers them to the + other side of the operation. + termination_action: A callable that accepts the outcome of the operation as + a base.Outcome value to be called on operation completion. + pool: A thread pool with which to do the work of the operation. + + Returns: + An _interfaces.Operation for the operation. + """ + lock = threading.Lock() + with lock: + termination_manager = _termination.invocation_termination_manager( + termination_action, pool) + transmission_manager = _transmission.TransmissionManager( + operation_id, ticket_sink, lock, pool, termination_manager) + expiration_manager = _expiration.invocation_expiration_manager( + timeout, lock, termination_manager, transmission_manager) + operation_context = _context.OperationContext( + lock, termination_manager, transmission_manager, expiration_manager) + emission_manager = _emission.EmissionManager( + lock, termination_manager, transmission_manager, expiration_manager) + ingestion_manager = _ingestion.invocation_ingestion_manager( + subscription, lock, pool, termination_manager, transmission_manager, + expiration_manager) + reception_manager = _reception.ReceptionManager( + termination_manager, transmission_manager, expiration_manager, + ingestion_manager) + + termination_manager.set_expiration_manager(expiration_manager) + transmission_manager.set_expiration_manager(expiration_manager) + emission_manager.set_ingestion_manager(ingestion_manager) + + transmission_manager.kick_off( + group, method, timeout, initial_metadata, payload, completion, None) + + return _EasyOperation( + lock, termination_manager, transmission_manager, expiration_manager, + operation_context, emission_manager, reception_manager) + + +def service_operate( + servicer_package, ticket, ticket_sink, termination_action, pool): + """Constructs an Operation for service of an operation. + + Args: + servicer_package: A _utilities.ServicerPackage to be used servicing the + operation. + ticket: The first links.Ticket received for the operation. + ticket_sink: A callable that accepts links.Tickets and delivers them to the + other side of the operation. + termination_action: A callable that accepts the outcome of the operation as + a base.Outcome value to be called on operation completion. + pool: A thread pool with which to do the work of the operation. + + Returns: + An _interfaces.Operation for the operation. + """ + lock = threading.Lock() + with lock: + termination_manager = _termination.service_termination_manager( + termination_action, pool) + transmission_manager = _transmission.TransmissionManager( + ticket.operation_id, ticket_sink, lock, pool, termination_manager) + expiration_manager = _expiration.service_expiration_manager( + ticket.timeout, servicer_package.default_timeout, + servicer_package.maximum_timeout, lock, termination_manager, + transmission_manager) + operation_context = _context.OperationContext( + lock, termination_manager, transmission_manager, expiration_manager) + emission_manager = _emission.EmissionManager( + lock, termination_manager, transmission_manager, expiration_manager) + ingestion_manager = _ingestion.service_ingestion_manager( + servicer_package.servicer, operation_context, emission_manager, lock, + pool, termination_manager, transmission_manager, expiration_manager) + reception_manager = _reception.ReceptionManager( + termination_manager, transmission_manager, expiration_manager, + ingestion_manager) + + termination_manager.set_expiration_manager(expiration_manager) + transmission_manager.set_expiration_manager(expiration_manager) + emission_manager.set_ingestion_manager(ingestion_manager) + + reception_manager.receive_ticket(ticket) + + return _EasyOperation( + lock, termination_manager, transmission_manager, expiration_manager, + operation_context, emission_manager, reception_manager) diff --git a/src/python/grpcio/grpc/framework/core/_reception.py b/src/python/grpcio/grpc/framework/core/_reception.py new file mode 100644 index 0000000000..b64faf8146 --- /dev/null +++ b/src/python/grpcio/grpc/framework/core/_reception.py @@ -0,0 +1,137 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""State and behavior for ticket reception.""" + +from grpc.framework.core import _interfaces +from grpc.framework.interfaces.base import base +from grpc.framework.interfaces.base import utilities +from grpc.framework.interfaces.links import links + +_REMOTE_TICKET_TERMINATION_TO_LOCAL_OUTCOME = { + links.Ticket.Termination.CANCELLATION: base.Outcome.CANCELLED, + links.Ticket.Termination.EXPIRATION: base.Outcome.EXPIRED, + links.Ticket.Termination.SHUTDOWN: base.Outcome.REMOTE_SHUTDOWN, + links.Ticket.Termination.RECEPTION_FAILURE: base.Outcome.RECEPTION_FAILURE, + links.Ticket.Termination.TRANSMISSION_FAILURE: + base.Outcome.TRANSMISSION_FAILURE, + links.Ticket.Termination.LOCAL_FAILURE: base.Outcome.REMOTE_FAILURE, +} + + +class ReceptionManager(_interfaces.ReceptionManager): + """A ReceptionManager based around a _Receiver passed to it.""" + + def __init__( + self, termination_manager, transmission_manager, expiration_manager, + ingestion_manager): + """Constructor. + + Args: + termination_manager: The operation's _interfaces.TerminationManager. + transmission_manager: The operation's _interfaces.TransmissionManager. + expiration_manager: The operation's _interfaces.ExpirationManager. + ingestion_manager: The operation's _interfaces.IngestionManager. + """ + self._termination_manager = termination_manager + self._transmission_manager = transmission_manager + self._expiration_manager = expiration_manager + self._ingestion_manager = ingestion_manager + + self._lowest_unseen_sequence_number = 0 + self._out_of_sequence_tickets = {} + self._aborted = False + + def _abort(self, outcome): + self._aborted = True + self._termination_manager.abort(outcome) + self._transmission_manager.abort(outcome) + self._expiration_manager.terminate() + + def _sequence_failure(self, ticket): + """Determines a just-arrived ticket's sequential legitimacy. + + Args: + ticket: A just-arrived ticket. + + Returns: + True if the ticket is sequentially legitimate; False otherwise. + """ + if ticket.sequence_number < self._lowest_unseen_sequence_number: + return True + elif ticket.sequence_number in self._out_of_sequence_tickets: + return True + else: + return False + + def _process_one(self, ticket): + if ticket.sequence_number == 0: + self._ingestion_manager.set_group_and_method(ticket.group, ticket.method) + if ticket.timeout is not None: + self._expiration_manager.change_timeout(ticket.timeout) + if ticket.termination is None: + completion = None + else: + completion = utilities.completion( + ticket.terminal_metadata, ticket.code, ticket.message) + self._ingestion_manager.advance( + ticket.initial_metadata, ticket.payload, completion, ticket.allowance) + if ticket.allowance is not None: + self._transmission_manager.allowance(ticket.allowance) + + def _process(self, ticket): + """Process those tickets ready to be processed. + + Args: + ticket: A just-arrived ticket the sequence number of which matches this + _ReceptionManager's _lowest_unseen_sequence_number field. + """ + while True: + self._process_one(ticket) + next_ticket = self._out_of_sequence_tickets.pop( + ticket.sequence_number + 1, None) + if next_ticket is None: + self._lowest_unseen_sequence_number = ticket.sequence_number + 1 + return + else: + ticket = next_ticket + + def receive_ticket(self, ticket): + """See _interfaces.ReceptionManager.receive_ticket for specification.""" + if self._aborted: + return + elif self._sequence_failure(ticket): + self._abort(base.Outcome.RECEPTION_FAILURE) + elif ticket.termination not in (None, links.Ticket.Termination.COMPLETION): + outcome = _REMOTE_TICKET_TERMINATION_TO_LOCAL_OUTCOME[ticket.termination] + self._abort(outcome) + elif ticket.sequence_number == self._lowest_unseen_sequence_number: + self._process(ticket) + else: + self._out_of_sequence_tickets[ticket.sequence_number] = ticket diff --git a/src/python/grpcio/grpc/framework/core/_termination.py b/src/python/grpcio/grpc/framework/core/_termination.py new file mode 100644 index 0000000000..ad9f6123d8 --- /dev/null +++ b/src/python/grpcio/grpc/framework/core/_termination.py @@ -0,0 +1,212 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""State and behavior for operation termination.""" + +import abc + +from grpc.framework.core import _constants +from grpc.framework.core import _interfaces +from grpc.framework.foundation import callable_util +from grpc.framework.interfaces.base import base + + +def _invocation_completion_predicate( + unused_emission_complete, unused_transmission_complete, + unused_reception_complete, ingestion_complete): + return ingestion_complete + + +def _service_completion_predicate( + unused_emission_complete, transmission_complete, unused_reception_complete, + unused_ingestion_complete): + return transmission_complete + + +class TerminationManager(_interfaces.TerminationManager): + """A _interfaces.TransmissionManager on which another manager may be set.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def set_expiration_manager(self, expiration_manager): + """Sets the expiration manager with which this manager will interact. + + Args: + expiration_manager: The _interfaces.ExpirationManager associated with the + current operation. + """ + raise NotImplementedError() + + +class _TerminationManager(TerminationManager): + """An implementation of TerminationManager.""" + + def __init__(self, predicate, action, pool): + """Constructor. + + Args: + predicate: One of _invocation_completion_predicate or + _service_completion_predicate to be used to determine when the operation + has completed. + action: A behavior to pass the operation outcome on operation termination. + pool: A thread pool. + """ + self._predicate = predicate + self._action = action + self._pool = pool + self._expiration_manager = None + + self.outcome = None + self._callbacks = [] + + self._emission_complete = False + self._transmission_complete = False + self._reception_complete = False + self._ingestion_complete = False + + def set_expiration_manager(self, expiration_manager): + self._expiration_manager = expiration_manager + + def _terminate_internal_only(self, outcome): + """Terminates the operation. + + Args: + outcome: A base.Outcome describing the outcome of the operation. + """ + self.outcome = outcome + callbacks = list(self._callbacks) + self._callbacks = None + + act = callable_util.with_exceptions_logged( + self._action, _constants.INTERNAL_ERROR_LOG_MESSAGE) + + if outcome is base.Outcome.LOCAL_FAILURE: + self._pool.submit(act, outcome) + else: + def call_callbacks_and_act(callbacks, outcome): + for callback in callbacks: + callback_outcome = callable_util.call_logging_exceptions( + callback, _constants.TERMINATION_CALLBACK_EXCEPTION_LOG_MESSAGE, + outcome) + if callback_outcome.exception is not None: + outcome = base.Outcome.LOCAL_FAILURE + break + act(outcome) + + self._pool.submit( + callable_util.with_exceptions_logged( + call_callbacks_and_act, _constants.INTERNAL_ERROR_LOG_MESSAGE), + callbacks, outcome) + + def _terminate_and_notify(self, outcome): + self._terminate_internal_only(outcome) + self._expiration_manager.terminate() + + def _perhaps_complete(self): + if self._predicate( + self._emission_complete, self._transmission_complete, + self._reception_complete, self._ingestion_complete): + self._terminate_and_notify(base.Outcome.COMPLETED) + return True + else: + return False + + def is_active(self): + """See _interfaces.TerminationManager.is_active for specification.""" + return self.outcome is None + + def add_callback(self, callback): + """See _interfaces.TerminationManager.add_callback for specification.""" + if self.outcome is None: + self._callbacks.append(callback) + return None + else: + return self.outcome + + def emission_complete(self): + """See superclass method for specification.""" + if self.outcome is None: + self._emission_complete = True + self._perhaps_complete() + + def transmission_complete(self): + """See superclass method for specification.""" + if self.outcome is None: + self._transmission_complete = True + return self._perhaps_complete() + else: + return False + + def reception_complete(self): + """See superclass method for specification.""" + if self.outcome is None: + self._reception_complete = True + self._perhaps_complete() + + def ingestion_complete(self): + """See superclass method for specification.""" + if self.outcome is None: + self._ingestion_complete = True + self._perhaps_complete() + + def expire(self): + """See _interfaces.TerminationManager.expire for specification.""" + self._terminate_internal_only(base.Outcome.EXPIRED) + + def abort(self, outcome): + """See _interfaces.TerminationManager.abort for specification.""" + self._terminate_and_notify(outcome) + + +def invocation_termination_manager(action, pool): + """Creates a TerminationManager appropriate for invocation-side use. + + Args: + action: An action to call on operation termination. + pool: A thread pool in which to execute the passed action and any + termination callbacks that are registered during the operation. + + Returns: + A TerminationManager appropriate for invocation-side use. + """ + return _TerminationManager(_invocation_completion_predicate, action, pool) + + +def service_termination_manager(action, pool): + """Creates a TerminationManager appropriate for service-side use. + + Args: + action: An action to call on operation termination. + pool: A thread pool in which to execute the passed action and any + termination callbacks that are registered during the operation. + + Returns: + A TerminationManager appropriate for service-side use. + """ + return _TerminationManager(_service_completion_predicate, action, pool) diff --git a/src/python/grpcio/grpc/framework/core/_transmission.py b/src/python/grpcio/grpc/framework/core/_transmission.py new file mode 100644 index 0000000000..01894d398d --- /dev/null +++ b/src/python/grpcio/grpc/framework/core/_transmission.py @@ -0,0 +1,294 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""State and behavior for ticket transmission during an operation.""" + +from grpc.framework.core import _constants +from grpc.framework.core import _interfaces +from grpc.framework.foundation import callable_util +from grpc.framework.interfaces.base import base +from grpc.framework.interfaces.links import links + +_TRANSMISSION_EXCEPTION_LOG_MESSAGE = 'Exception during transmission!' + + +def _explode_completion(completion): + if completion is None: + return None, None, None, None + else: + return ( + completion.terminal_metadata, completion.code, completion.message, + links.Ticket.Termination.COMPLETION) + + +class TransmissionManager(_interfaces.TransmissionManager): + """An _interfaces.TransmissionManager that sends links.Tickets.""" + + def __init__( + self, operation_id, ticket_sink, lock, pool, termination_manager): + """Constructor. + + Args: + operation_id: The operation's ID. + ticket_sink: A callable that accepts tickets and sends them to the other + side of the operation. + lock: The operation-servicing-wide lock object. + pool: A thread pool in which the work of transmitting tickets will be + performed. + termination_manager: The _interfaces.TerminationManager associated with + this operation. + """ + self._lock = lock + self._pool = pool + self._ticket_sink = ticket_sink + self._operation_id = operation_id + self._termination_manager = termination_manager + self._expiration_manager = None + + self._lowest_unused_sequence_number = 0 + self._remote_allowance = 1 + self._remote_complete = False + self._timeout = None + self._local_allowance = 0 + self._initial_metadata = None + self._payloads = [] + self._completion = None + self._aborted = False + self._abortion_outcome = None + self._transmitting = False + + def set_expiration_manager(self, expiration_manager): + """Sets the ExpirationManager with which this manager will cooperate.""" + self._expiration_manager = expiration_manager + + def _next_ticket(self): + """Creates the next ticket to be transmitted. + + Returns: + A links.Ticket to be sent to the other side of the operation or None if + there is nothing to be sent at this time. + """ + if self._aborted: + if self._abortion_outcome is None: + return None + else: + termination = _constants.ABORTION_OUTCOME_TO_TICKET_TERMINATION[ + self._abortion_outcome] + if termination is None: + return None + else: + self._abortion_outcome = None + return links.Ticket( + self._operation_id, self._lowest_unused_sequence_number, None, + None, None, None, None, None, None, None, None, None, + termination) + + action = False + # TODO(nathaniel): Support other subscriptions. + local_subscription = links.Ticket.Subscription.FULL + timeout = self._timeout + if timeout is not None: + self._timeout = None + action = True + if self._local_allowance <= 0: + allowance = None + else: + allowance = self._local_allowance + self._local_allowance = 0 + action = True + initial_metadata = self._initial_metadata + if initial_metadata is not None: + self._initial_metadata = None + action = True + if not self._payloads or self._remote_allowance <= 0: + payload = None + else: + payload = self._payloads.pop(0) + self._remote_allowance -= 1 + action = True + if self._completion is None or self._payloads: + terminal_metadata, code, message, termination = None, None, None, None + else: + terminal_metadata, code, message, termination = _explode_completion( + self._completion) + self._completion = None + action = True + + if action: + ticket = links.Ticket( + self._operation_id, self._lowest_unused_sequence_number, None, None, + local_subscription, timeout, allowance, initial_metadata, payload, + terminal_metadata, code, message, termination) + self._lowest_unused_sequence_number += 1 + return ticket + else: + return None + + def _transmit(self, ticket): + """Commences the transmission loop sending tickets. + + Args: + ticket: A links.Ticket to be sent to the other side of the operation. + """ + def transmit(ticket): + while True: + transmission_outcome = callable_util.call_logging_exceptions( + self._ticket_sink, _TRANSMISSION_EXCEPTION_LOG_MESSAGE, ticket) + if transmission_outcome.exception is None: + with self._lock: + if ticket.termination is links.Ticket.Termination.COMPLETION: + self._termination_manager.transmission_complete() + ticket = self._next_ticket() + if ticket is None: + self._transmitting = False + return + else: + with self._lock: + if self._termination_manager.outcome is None: + self._termination_manager.abort(base.Outcome.TRANSMISSION_FAILURE) + self._expiration_manager.terminate() + return + + self._pool.submit(callable_util.with_exceptions_logged( + transmit, _constants.INTERNAL_ERROR_LOG_MESSAGE), ticket) + self._transmitting = True + + def kick_off( + self, group, method, timeout, initial_metadata, payload, completion, + allowance): + """See _interfaces.TransmissionManager.kickoff for specification.""" + # TODO(nathaniel): Support other subscriptions. + subscription = links.Ticket.Subscription.FULL + terminal_metadata, code, message, termination = _explode_completion( + completion) + self._remote_allowance = 1 if payload is None else 0 + ticket = links.Ticket( + self._operation_id, 0, group, method, subscription, timeout, allowance, + initial_metadata, payload, terminal_metadata, code, message, + termination) + self._lowest_unused_sequence_number = 1 + self._transmit(ticket) + + def advance(self, initial_metadata, payload, completion, allowance): + """See _interfaces.TransmissionManager.advance for specification.""" + effective_initial_metadata = initial_metadata + effective_payload = payload + effective_completion = completion + if allowance is not None and not self._remote_complete: + effective_allowance = allowance + else: + effective_allowance = None + if self._transmitting: + if effective_initial_metadata is not None: + self._initial_metadata = effective_initial_metadata + if effective_payload is not None: + self._payloads.append(effective_payload) + if effective_completion is not None: + self._completion = effective_completion + if effective_allowance is not None: + self._local_allowance += effective_allowance + else: + if effective_payload is not None: + if 0 < self._remote_allowance: + ticket_payload = effective_payload + self._remote_allowance -= 1 + else: + self._payloads.append(effective_payload) + ticket_payload = None + else: + ticket_payload = None + if effective_completion is not None and not self._payloads: + ticket_completion = effective_completion + else: + self._completion = effective_completion + ticket_completion = None + if any( + (effective_initial_metadata, ticket_payload, ticket_completion, + effective_allowance)): + terminal_metadata, code, message, termination = _explode_completion( + completion) + ticket = links.Ticket( + self._operation_id, self._lowest_unused_sequence_number, None, None, + None, None, allowance, effective_initial_metadata, ticket_payload, + terminal_metadata, code, message, termination) + self._lowest_unused_sequence_number += 1 + self._transmit(ticket) + + def timeout(self, timeout): + """See _interfaces.TransmissionManager.timeout for specification.""" + if self._transmitting: + self._timeout = timeout + else: + ticket = links.Ticket( + self._operation_id, self._lowest_unused_sequence_number, None, None, + None, timeout, None, None, None, None, None, None, None) + self._lowest_unused_sequence_number += 1 + self._transmit(ticket) + + def allowance(self, allowance): + """See _interfaces.TransmissionManager.allowance for specification.""" + if self._transmitting or not self._payloads: + self._remote_allowance += allowance + else: + self._remote_allowance += allowance - 1 + payload = self._payloads.pop(0) + if self._payloads: + completion = None + else: + completion = self._completion + self._completion = None + terminal_metadata, code, message, termination = _explode_completion( + completion) + ticket = links.Ticket( + self._operation_id, self._lowest_unused_sequence_number, None, None, + None, None, None, None, payload, terminal_metadata, code, message, + termination) + self._lowest_unused_sequence_number += 1 + self._transmit(ticket) + + def remote_complete(self): + """See _interfaces.TransmissionManager.remote_complete for specification.""" + self._remote_complete = True + self._local_allowance = 0 + + def abort(self, outcome): + """See _interfaces.TransmissionManager.abort for specification.""" + if self._transmitting: + self._aborted, self._abortion_outcome = True, outcome + else: + self._aborted = True + if outcome is not None: + termination = _constants.ABORTION_OUTCOME_TO_TICKET_TERMINATION[ + outcome] + if termination is not None: + ticket = links.Ticket( + self._operation_id, self._lowest_unused_sequence_number, None, + None, None, None, None, None, None, None, None, None, + termination) + self._transmit(ticket) diff --git a/src/python/grpcio/grpc/framework/core/_utilities.py b/src/python/grpcio/grpc/framework/core/_utilities.py new file mode 100644 index 0000000000..5b0d798751 --- /dev/null +++ b/src/python/grpcio/grpc/framework/core/_utilities.py @@ -0,0 +1,46 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Package-internal utilities.""" + +import collections + + +class ServicerPackage( + collections.namedtuple( + 'ServicerPackage', ('servicer', 'default_timeout', 'maximum_timeout'))): + """A trivial bundle class. + + Attributes: + servicer: A base.Servicer. + default_timeout: A float indicating the length of time in seconds to allow + for an operation invoked without a timeout. + maximum_timeout: A float indicating the maximum length of time in seconds to + allow for an operation. + """ diff --git a/src/python/grpcio/grpc/framework/core/implementations.py b/src/python/grpcio/grpc/framework/core/implementations.py new file mode 100644 index 0000000000..364a7faed4 --- /dev/null +++ b/src/python/grpcio/grpc/framework/core/implementations.py @@ -0,0 +1,62 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Entry points into the ticket-exchange-based base layer implementation.""" + +# base and links are referenced from specification in this module. +from grpc.framework.core import _end +from grpc.framework.interfaces.base import base # pylint: disable=unused-import +from grpc.framework.interfaces.links import links # pylint: disable=unused-import + + +def invocation_end_link(): + """Creates a base.End-links.Link suitable for operation invocation. + + Returns: + An object that is both a base.End and a links.Link, that supports operation + invocation, and that translates operation invocation into ticket exchange. + """ + return _end.serviceless_end_link() + + +def service_end_link(servicer, default_timeout, maximum_timeout): + """Creates a base.End-links.Link suitable for operation service. + + Args: + servicer: A base.Servicer for servicing operations. + default_timeout: A length of time in seconds to be used as the default + time alloted for a single operation. + maximum_timeout: A length of time in seconds to be used as the maximum + time alloted for a single operation. + + Returns: + An object that is both a base.End and a links.Link and that services + operations that arrive at it through ticket exchange. + """ + return _end.serviceful_end_link(servicer, default_timeout, maximum_timeout) diff --git a/src/python/grpcio/grpc/framework/interfaces/base/__init__.py b/src/python/grpcio/grpc/framework/interfaces/base/__init__.py new file mode 100644 index 0000000000..7086519106 --- /dev/null +++ b/src/python/grpcio/grpc/framework/interfaces/base/__init__.py @@ -0,0 +1,30 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + diff --git a/src/python/grpcio/grpc/framework/interfaces/base/base.py b/src/python/grpcio/grpc/framework/interfaces/base/base.py new file mode 100644 index 0000000000..76e0a5bdae --- /dev/null +++ b/src/python/grpcio/grpc/framework/interfaces/base/base.py @@ -0,0 +1,290 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""The base interface of RPC Framework. + +Implementations of this interface support the conduct of "operations": +exchanges between two distinct ends of an arbitrary number of data payloads +and metadata such as a name for the operation, initial and terminal metadata +in each direction, and flow control. These operations may be used for transfers +of data, remote procedure calls, status indication, or anything else +applications choose. +""" + +# threading is referenced from specification in this module. +import abc +import enum +import threading + +# abandonment is referenced from specification in this module. +from grpc.framework.foundation import abandonment # pylint: disable=unused-import + + +class NoSuchMethodError(Exception): + """Indicates that an unrecognized operation has been called.""" + + +@enum.unique +class Outcome(enum.Enum): + """Operation outcomes.""" + + COMPLETED = 'completed' + CANCELLED = 'cancelled' + EXPIRED = 'expired' + LOCAL_SHUTDOWN = 'local shutdown' + REMOTE_SHUTDOWN = 'remote shutdown' + RECEPTION_FAILURE = 'reception failure' + TRANSMISSION_FAILURE = 'transmission failure' + LOCAL_FAILURE = 'local failure' + REMOTE_FAILURE = 'remote failure' + + +class Completion(object): + """An aggregate of the values exchanged upon operation completion. + + Attributes: + terminal_metadata: A terminal metadata value for the operaton. + code: A code value for the operation. + message: A message value for the operation. + """ + __metaclass__ = abc.ABCMeta + + +class OperationContext(object): + """Provides operation-related information and action.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def outcome(self): + """Indicates the operation's outcome (or that the operation is ongoing). + + Returns: + None if the operation is still active or the Outcome value for the + operation if it has terminated. + """ + raise NotImplementedError() + + @abc.abstractmethod + def add_termination_callback(self, callback): + """Adds a function to be called upon operation termination. + + Args: + callback: A callable to be passed an Outcome value on operation + termination. + + Returns: + None if the operation has not yet terminated and the passed callback will + later be called when it does terminate, or if the operation has already + terminated an Outcome value describing the operation termination and the + passed callback will not be called as a result of this method call. + """ + raise NotImplementedError() + + @abc.abstractmethod + def time_remaining(self): + """Describes the length of allowed time remaining for the operation. + + Returns: + A nonnegative float indicating the length of allowed time in seconds + remaining for the operation to complete before it is considered to have + timed out. Zero is returned if the operation has terminated. + """ + raise NotImplementedError() + + @abc.abstractmethod + def cancel(self): + """Cancels the operation if the operation has not yet terminated.""" + raise NotImplementedError() + + @abc.abstractmethod + def fail(self, exception): + """Indicates that the operation has failed. + + Args: + exception: An exception germane to the operation failure. May be None. + """ + raise NotImplementedError() + + +class Operator(object): + """An interface through which to participate in an operation.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def advance( + self, initial_metadata=None, payload=None, completion=None, + allowance=None): + """Progresses the operation. + + Args: + initial_metadata: An initial metadata value. Only one may ever be + communicated in each direction for an operation, and they must be + communicated no later than either the first payload or the completion. + payload: A payload value. + completion: A Completion value. May only ever be non-None once in either + direction, and no payloads may be passed after it has been communicated. + allowance: A positive integer communicating the number of additional + payloads allowed to be passed by the remote side of the operation. + """ + raise NotImplementedError() + + +class Subscription(object): + """Describes customer code's interest in values from the other side. + + Attributes: + kind: A Kind value describing the overall kind of this value. + termination_callback: A callable to be passed the Outcome associated with + the operation after it has terminated. Must be non-None if kind is + Kind.TERMINATION_ONLY. Must be None otherwise. + allowance: A callable behavior that accepts positive integers representing + the number of additional payloads allowed to be passed to the other side + of the operation. Must be None if kind is Kind.FULL. Must not be None + otherwise. + operator: An Operator to be passed values from the other side of the + operation. Must be non-None if kind is Kind.FULL. Must be None otherwise. + """ + + @enum.unique + class Kind(enum.Enum): + + NONE = 'none' + TERMINATION_ONLY = 'termination only' + FULL = 'full' + + +class Servicer(object): + """Interface for service implementations.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def service(self, group, method, context, output_operator): + """Services an operation. + + Args: + group: The group identifier of the operation to be serviced. + method: The method identifier of the operation to be serviced. + context: An OperationContext object affording contextual information and + actions. + output_operator: An Operator that will accept output values of the + operation. + + Returns: + A Subscription via which this object may or may not accept more values of + the operation. + + Raises: + NoSuchMethodError: If this Servicer does not handle operations with the + given group and method. + abandonment.Abandoned: If the operation has been aborted and there no + longer is any reason to service the operation. + """ + raise NotImplementedError() + + +class End(object): + """Common type for entry-point objects on both sides of an operation.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def start(self): + """Starts this object's service of operations.""" + raise NotImplementedError() + + @abc.abstractmethod + def stop(self, grace): + """Stops this object's service of operations. + + This object will refuse service of new operations as soon as this method is + called but operations under way at the time of the call may be given a + grace period during which they are allowed to finish. + + Args: + grace: A duration of time in seconds to allow ongoing operations to + terminate before being forcefully terminated by the stopping of this + End. May be zero to terminate all ongoing operations and immediately + stop. + + Returns: + A threading.Event that will be set to indicate all operations having + terminated and this End having completely stopped. The returned event + may not be set until after the full grace period (if some ongoing + operation continues for the full length of the period) or it may be set + much sooner (if for example this End had no operations in progress at + the time its stop method was called). + """ + raise NotImplementedError() + + @abc.abstractmethod + def operate( + self, group, method, subscription, timeout, initial_metadata=None, + payload=None, completion=None): + """Commences an operation. + + Args: + group: The group identifier of the invoked operation. + method: The method identifier of the invoked operation. + subscription: A Subscription to which the results of the operation will be + passed. + timeout: A length of time in seconds to allow for the operation. + initial_metadata: An initial metadata value to be sent to the other side + of the operation. May be None if the initial metadata will be later + passed via the returned operator or if there will be no initial metadata + passed at all. + payload: An initial payload for the operation. + completion: A Completion value indicating the end of transmission to the + other side of the operation. + + Returns: + A pair of objects affording information about the operation and action + continuing the operation. The first element of the returned pair is an + OperationContext for the operation and the second element of the + returned pair is an Operator to which operation values not passed in + this call should later be passed. + """ + raise NotImplementedError() + + @abc.abstractmethod + def operation_stats(self): + """Reports the number of terminated operations broken down by outcome. + + Returns: + A dictionary from Outcome value to an integer identifying the number + of operations that terminated with that outcome. + """ + raise NotImplementedError() + + @abc.abstractmethod + def add_idle_action(self, action): + """Adds an action to be called when this End has no ongoing operations. + + Args: + action: A callable that accepts no arguments. + """ + raise NotImplementedError() diff --git a/src/python/grpcio/grpc/framework/interfaces/base/utilities.py b/src/python/grpcio/grpc/framework/interfaces/base/utilities.py new file mode 100644 index 0000000000..a9ee1a0981 --- /dev/null +++ b/src/python/grpcio/grpc/framework/interfaces/base/utilities.py @@ -0,0 +1,79 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Utilities for use with the base interface of RPC Framework.""" + +import collections + +from grpc.framework.interfaces.base import base + + +class _Completion( + base.Completion, + collections.namedtuple( + '_Completion', ('terminal_metadata', 'code', 'message',))): + """A trivial implementation of base.Completion.""" + + +class _Subscription( + base.Subscription, + collections.namedtuple( + '_Subscription', + ('kind', 'termination_callback', 'allowance', 'operator',))): + """A trivial implementation of base.Subscription.""" + +_NONE_SUBSCRIPTION = _Subscription( + base.Subscription.Kind.NONE, None, None, None) + + +def completion(terminal_metadata, code, message): + """Creates a base.Completion aggregating the given operation values. + + Args: + terminal_metadata: A terminal metadata value for an operaton. + code: A code value for an operation. + message: A message value for an operation. + + Returns: + A base.Completion aggregating the given operation values. + """ + return _Completion(terminal_metadata, code, message) + + +def full_subscription(operator): + """Creates a "full" base.Subscription for the given base.Operator. + + Args: + operator: A base.Operator to be used in an operation. + + Returns: + A base.Subscription of kind base.Subscription.Kind.FULL wrapping the given + base.Operator. + """ + return _Subscription(base.Subscription.Kind.FULL, None, None, operator) diff --git a/src/python/grpcio/grpc/framework/interfaces/links/links.py b/src/python/grpcio/grpc/framework/interfaces/links/links.py index 5ebbac8a6f..069ff024dd 100644 --- a/src/python/grpcio/grpc/framework/interfaces/links/links.py +++ b/src/python/grpcio/grpc/framework/interfaces/links/links.py @@ -98,7 +98,7 @@ class Ticket( COMPLETION = 'completion' CANCELLATION = 'cancellation' EXPIRATION = 'expiration' - LOCAL_SHUTDOWN = 'local shutdown' + SHUTDOWN = 'shutdown' RECEPTION_FAILURE = 'reception failure' TRANSMISSION_FAILURE = 'transmission failure' LOCAL_FAILURE = 'local failure' diff --git a/src/python/grpcio_health_checking/MANIFEST.in b/src/python/grpcio_health_checking/MANIFEST.in new file mode 100644 index 0000000000..498b55f20a --- /dev/null +++ b/src/python/grpcio_health_checking/MANIFEST.in @@ -0,0 +1,2 @@ +graft grpc +include commands.py diff --git a/src/python/grpcio_health_checking/README.rst b/src/python/grpcio_health_checking/README.rst new file mode 100644 index 0000000000..600734e50d --- /dev/null +++ b/src/python/grpcio_health_checking/README.rst @@ -0,0 +1,9 @@ +gRPC Python Health Checking +=========================== + +Reference package for GRPC Python health checking. + +Dependencies +------------ + +Depends on the `grpcio` package, available from PyPI via `pip install grpcio`. diff --git a/src/python/grpcio_health_checking/commands.py b/src/python/grpcio_health_checking/commands.py new file mode 100644 index 0000000000..6a95e679c4 --- /dev/null +++ b/src/python/grpcio_health_checking/commands.py @@ -0,0 +1,80 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Provides distutils command classes for the GRPC Python setup process.""" + +import distutils +import glob +import os +import os.path +import subprocess +import sys + +import setuptools +from setuptools.command import build_py + + +class BuildProtoModules(setuptools.Command): + """Command to generate project *_pb2.py modules from proto files.""" + + description = '' + user_options = [] + + def initialize_options(self): + pass + + def finalize_options(self): + self.protoc_command = 'protoc' + self.grpc_python_plugin_command = distutils.spawn.find_executable( + 'grpc_python_plugin') + + def run(self): + paths = [] + root_directory = os.getcwd() + for walk_root, directories, filenames in os.walk(root_directory): + for filename in filenames: + if filename.endswith('.proto'): + paths.append(os.path.join(walk_root, filename)) + command = [ + self.protoc_command, + '--plugin=protoc-gen-python-grpc={}'.format( + self.grpc_python_plugin_command), + '-I {}'.format(root_directory), + '--python_out={}'.format(root_directory), + '--python-grpc_out={}'.format(root_directory), + ] + paths + subprocess.check_call(' '.join(command), cwd=root_directory, shell=True) + + +class BuildPy(build_py.build_py): + """Custom project build command.""" + + def run(self): + self.run_command('build_proto_modules') + build_py.build_py.run(self) diff --git a/src/python/grpcio_health_checking/grpc/__init__.py b/src/python/grpcio_health_checking/grpc/__init__.py new file mode 100644 index 0000000000..7086519106 --- /dev/null +++ b/src/python/grpcio_health_checking/grpc/__init__.py @@ -0,0 +1,30 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + diff --git a/src/python/grpcio_health_checking/grpc/health/__init__.py b/src/python/grpcio_health_checking/grpc/health/__init__.py new file mode 100644 index 0000000000..7086519106 --- /dev/null +++ b/src/python/grpcio_health_checking/grpc/health/__init__.py @@ -0,0 +1,30 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + diff --git a/src/python/grpcio_health_checking/grpc/health/v1alpha/__init__.py b/src/python/grpcio_health_checking/grpc/health/v1alpha/__init__.py new file mode 100644 index 0000000000..7086519106 --- /dev/null +++ b/src/python/grpcio_health_checking/grpc/health/v1alpha/__init__.py @@ -0,0 +1,30 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + diff --git a/src/python/grpcio_health_checking/grpc/health/v1alpha/health.proto b/src/python/grpcio_health_checking/grpc/health/v1alpha/health.proto new file mode 100644 index 0000000000..57f4aaa9c0 --- /dev/null +++ b/src/python/grpcio_health_checking/grpc/health/v1alpha/health.proto @@ -0,0 +1,49 @@ +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +syntax = "proto3"; + +package grpc.health.v1alpha; + +message HealthCheckRequest { + string service = 1; +} + +message HealthCheckResponse { + enum ServingStatus { + UNKNOWN = 0; + SERVING = 1; + NOT_SERVING = 2; + } + ServingStatus status = 1; +} + +service Health { + rpc Check(HealthCheckRequest) returns (HealthCheckResponse); +} diff --git a/src/python/grpcio_health_checking/grpc/health/v1alpha/health.py b/src/python/grpcio_health_checking/grpc/health/v1alpha/health.py new file mode 100644 index 0000000000..9dfcd962f0 --- /dev/null +++ b/src/python/grpcio_health_checking/grpc/health/v1alpha/health.py @@ -0,0 +1,129 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Reference implementation for health checking in gRPC Python.""" + +import abc +import enum +import threading + +from grpc.health.v1alpha import health_pb2 + + +@enum.unique +class HealthStatus(enum.Enum): + """Statuses for a service mirroring the reference health.proto's values.""" + UNKNOWN = health_pb2.HealthCheckResponse.UNKNOWN + SERVING = health_pb2.HealthCheckResponse.SERVING + NOT_SERVING = health_pb2.HealthCheckResponse.NOT_SERVING + + +class _HealthServicer(health_pb2.EarlyAdopterHealthServicer): + """Servicer handling RPCs for service statuses.""" + + def __init__(self): + self._server_status_lock = threading.Lock() + self._server_status = {} + + def Check(self, request, context): + with self._server_status_lock: + if request.service not in self._server_status: + # TODO(atash): once the Python API has a way of setting the server + # status, bring us into conformance with the health check spec by + # returning the NOT_FOUND status here. + raise NotImplementedError() + else: + return health_pb2.HealthCheckResponse( + status=self._server_status[request.service].value) + + def set(service, status): + if not isinstance(status, HealthStatus): + raise TypeError('expected grpc.health.v1alpha.health.HealthStatus ' + 'for argument `status` but got {}'.format(status)) + with self._server_status_lock: + self._server_status[service] = status + + +class HealthServer(health_pb2.EarlyAdopterHealthServer): + """Interface for the reference gRPC Python health server.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def start(self): + raise NotImplementedError() + + @abc.abstractmethod + def stop(self): + raise NotImplementedError() + + @abc.abstractmethod + def set(self, service, status): + """Set the status of the given service. + + Args: + service (str): service name of the service to set the reported status of + status (HealthStatus): status to set for the specified service + """ + raise NotImplementedError() + + +class _HealthServerImplementation(HealthServer): + """Implementation for the reference gRPC Python health server.""" + + def __init__(self, server, servicer): + self._server = server + self._servicer = servicer + + def start(self): + self._server.start() + + def stop(self): + self._server.stop() + + def set(self, service, status): + self._servicer.set(service, status) + + +def create_Health_server(port, private_key=None, certificate_chain=None): + """Get a HealthServer instance. + + Args: + port (int): port number passed through to health_pb2 server creation + routine. + private_key (str): to-be-created server's desired private key + certificate_chain (str): to-be-created server's desired certificate chain + + Returns: + An instance of HealthServer (conforming thus to + EarlyAdopterHealthServer and providing a method to set server status).""" + servicer = _HealthServicer() + server = health_pb2.early_adopter_create_Health_server( + servicer, port=port, private_key=private_key, + certificate_chain=certificate_chain) + return _HealthServerImplementation(server, servicer) diff --git a/src/python/grpcio_health_checking/setup.py b/src/python/grpcio_health_checking/setup.py new file mode 100644 index 0000000000..fcde0dab8c --- /dev/null +++ b/src/python/grpcio_health_checking/setup.py @@ -0,0 +1,72 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Setup module for the GRPC Python package's optional health checking.""" + +import os +import os.path +import sys + +from distutils import core as _core +import setuptools + +# Ensure we're in the proper directory whether or not we're being used by pip. +os.chdir(os.path.dirname(os.path.abspath(__file__))) + +# Break import-style to ensure we can actually find our commands module. +import commands + +_PACKAGES = ( + setuptools.find_packages('.') +) + +_PACKAGE_DIRECTORIES = { + '': '.', +} + +_INSTALL_REQUIRES = ( + 'grpcio>=0.10.0a0', +) + +_SETUP_REQUIRES = _INSTALL_REQUIRES + +_COMMAND_CLASS = { + 'build_proto_modules': commands.BuildProtoModules, + 'build_py': commands.BuildPy, +} + +setuptools.setup( + name='grpcio_health_checking', + version='0.10.0a0', + packages=list(_PACKAGES), + package_dir=_PACKAGE_DIRECTORIES, + install_requires=_INSTALL_REQUIRES, + setup_requires=_SETUP_REQUIRES, + cmdclass=_COMMAND_CLASS +) diff --git a/src/python/grpcio_test/grpc_interop/_interop_test_case.py b/src/python/grpcio_test/grpc_interop/_interop_test_case.py index ed8f7ef009..b6d06b300d 100644 --- a/src/python/grpcio_test/grpc_interop/_interop_test_case.py +++ b/src/python/grpcio_test/grpc_interop/_interop_test_case.py @@ -59,3 +59,6 @@ class InteropTestCase(object): def testCancelAfterFirstResponse(self): methods.TestCase.CANCEL_AFTER_FIRST_RESPONSE.test_interoperability(self.stub, None) + + def testTimeoutOnSleepingServer(self): + methods.TestCase.TIMEOUT_ON_SLEEPING_SERVER.test_interoperability(self.stub, None) diff --git a/src/python/grpcio_test/grpc_interop/methods.py b/src/python/grpcio_test/grpc_interop/methods.py index f4c94685ee..7a831f3cbd 100644 --- a/src/python/grpcio_test/grpc_interop/methods.py +++ b/src/python/grpcio_test/grpc_interop/methods.py @@ -33,10 +33,12 @@ import enum import json import os import threading +import time from oauth2client import client as oauth2client_client from grpc.framework.alpha import utilities +from grpc.framework.alpha import exceptions from grpc_interop import empty_pb2 from grpc_interop import messages_pb2 @@ -318,6 +320,24 @@ def _cancel_after_first_response(stub): raise ValueError('expected call to be cancelled') +def _timeout_on_sleeping_server(stub): + request_payload_size = 27182 + with stub, _Pipe() as pipe: + response_iterator = stub.FullDuplexCall(pipe, 0.001) + + request = messages_pb2.StreamingOutputCallRequest( + response_type=messages_pb2.COMPRESSABLE, + payload=messages_pb2.Payload(body=b'\x00' * request_payload_size)) + pipe.add(request) + time.sleep(0.1) + try: + next(response_iterator) + except exceptions.ExpirationError: + pass + else: + raise ValueError('expected call to exceed deadline') + + def _compute_engine_creds(stub, args): response = _large_unary_common_behavior(stub, True, True) if args.default_service_account != response.username: @@ -351,6 +371,7 @@ class TestCase(enum.Enum): CANCEL_AFTER_FIRST_RESPONSE = 'cancel_after_first_response' COMPUTE_ENGINE_CREDS = 'compute_engine_creds' SERVICE_ACCOUNT_CREDS = 'service_account_creds' + TIMEOUT_ON_SLEEPING_SERVER = 'timeout_on_sleeping_server' def test_interoperability(self, stub, args): if self is TestCase.EMPTY_UNARY: @@ -367,6 +388,8 @@ class TestCase(enum.Enum): _cancel_after_begin(stub) elif self is TestCase.CANCEL_AFTER_FIRST_RESPONSE: _cancel_after_first_response(stub) + elif self is TestCase.TIMEOUT_ON_SLEEPING_SERVER: + _timeout_on_sleeping_server(stub) elif self is TestCase.COMPUTE_ENGINE_CREDS: _compute_engine_creds(stub, args) elif self is TestCase.SERVICE_ACCOUNT_CREDS: diff --git a/src/python/grpcio_test/grpc_protoc_plugin/__init__.py b/src/python/grpcio_test/grpc_protoc_plugin/__init__.py new file mode 100644 index 0000000000..7086519106 --- /dev/null +++ b/src/python/grpcio_test/grpc_protoc_plugin/__init__.py @@ -0,0 +1,30 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + diff --git a/src/python/grpcio_test/grpc_protoc_plugin/python_plugin_test.py b/src/python/grpcio_test/grpc_protoc_plugin/python_plugin_test.py new file mode 100644 index 0000000000..b200d129a9 --- /dev/null +++ b/src/python/grpcio_test/grpc_protoc_plugin/python_plugin_test.py @@ -0,0 +1,541 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import argparse +import contextlib +import distutils.spawn +import errno +import itertools +import os +import pkg_resources +import shutil +import subprocess +import sys +import tempfile +import threading +import time +import unittest + +from grpc.framework.alpha import exceptions +from grpc.framework.foundation import future + +# Identifiers of entities we expect to find in the generated module. +SERVICER_IDENTIFIER = 'EarlyAdopterTestServiceServicer' +SERVER_IDENTIFIER = 'EarlyAdopterTestServiceServer' +STUB_IDENTIFIER = 'EarlyAdopterTestServiceStub' +SERVER_FACTORY_IDENTIFIER = 'early_adopter_create_TestService_server' +STUB_FACTORY_IDENTIFIER = 'early_adopter_create_TestService_stub' + +# The timeout used in tests of RPCs that are supposed to expire. +SHORT_TIMEOUT = 2 +# The timeout used in tests of RPCs that are not supposed to expire. The +# absurdly large value doesn't matter since no passing execution of this test +# module will ever wait the duration. +LONG_TIMEOUT = 600 +NO_DELAY = 0 + + +class _ServicerMethods(object): + + def __init__(self, test_pb2, delay): + self._condition = threading.Condition() + self._delay = delay + self._paused = False + self._fail = False + self._test_pb2 = test_pb2 + + @contextlib.contextmanager + def pause(self): # pylint: disable=invalid-name + with self._condition: + self._paused = True + yield + with self._condition: + self._paused = False + self._condition.notify_all() + + @contextlib.contextmanager + def fail(self): # pylint: disable=invalid-name + with self._condition: + self._fail = True + yield + with self._condition: + self._fail = False + + def _control(self): # pylint: disable=invalid-name + with self._condition: + if self._fail: + raise ValueError() + while self._paused: + self._condition.wait() + time.sleep(self._delay) + + def UnaryCall(self, request, unused_rpc_context): + response = self._test_pb2.SimpleResponse() + response.payload.payload_type = self._test_pb2.COMPRESSABLE + response.payload.payload_compressable = 'a' * request.response_size + self._control() + return response + + def StreamingOutputCall(self, request, unused_rpc_context): + for parameter in request.response_parameters: + response = self._test_pb2.StreamingOutputCallResponse() + response.payload.payload_type = self._test_pb2.COMPRESSABLE + response.payload.payload_compressable = 'a' * parameter.size + self._control() + yield response + + def StreamingInputCall(self, request_iter, unused_rpc_context): + response = self._test_pb2.StreamingInputCallResponse() + aggregated_payload_size = 0 + for request in request_iter: + aggregated_payload_size += len(request.payload.payload_compressable) + response.aggregated_payload_size = aggregated_payload_size + self._control() + return response + + def FullDuplexCall(self, request_iter, unused_rpc_context): + for request in request_iter: + for parameter in request.response_parameters: + response = self._test_pb2.StreamingOutputCallResponse() + response.payload.payload_type = self._test_pb2.COMPRESSABLE + response.payload.payload_compressable = 'a' * parameter.size + self._control() + yield response + + def HalfDuplexCall(self, request_iter, unused_rpc_context): + responses = [] + for request in request_iter: + for parameter in request.response_parameters: + response = self._test_pb2.StreamingOutputCallResponse() + response.payload.payload_type = self._test_pb2.COMPRESSABLE + response.payload.payload_compressable = 'a' * parameter.size + self._control() + responses.append(response) + for response in responses: + yield response + + +@contextlib.contextmanager +def _CreateService(test_pb2, delay): + """Provides a servicer backend and a stub. + + The servicer is just the implementation + of the actual servicer passed to the face player of the python RPC + implementation; the two are detached. + + Non-zero delay puts a delay on each call to the servicer, representative of + communication latency. Timeout is the default timeout for the stub while + waiting for the service. + + Args: + test_pb2: The test_pb2 module generated by this test. + delay: Delay in seconds per response from the servicer. + + Yields: + A (servicer_methods, servicer, stub) three-tuple where servicer_methods is + the back-end of the service bound to the stub and the server and stub + are both activated and ready for use. + """ + servicer_methods = _ServicerMethods(test_pb2, delay) + + class Servicer(getattr(test_pb2, SERVICER_IDENTIFIER)): + + def UnaryCall(self, request, context): + return servicer_methods.UnaryCall(request, context) + + def StreamingOutputCall(self, request, context): + return servicer_methods.StreamingOutputCall(request, context) + + def StreamingInputCall(self, request_iter, context): + return servicer_methods.StreamingInputCall(request_iter, context) + + def FullDuplexCall(self, request_iter, context): + return servicer_methods.FullDuplexCall(request_iter, context) + + def HalfDuplexCall(self, request_iter, context): + return servicer_methods.HalfDuplexCall(request_iter, context) + + servicer = Servicer() + server = getattr( + test_pb2, SERVER_FACTORY_IDENTIFIER)(servicer, 0) + with server: + port = server.port() + stub = getattr(test_pb2, STUB_FACTORY_IDENTIFIER)('localhost', port) + with stub: + yield servicer_methods, stub, server + + +def _streaming_input_request_iterator(test_pb2): + for _ in range(3): + request = test_pb2.StreamingInputCallRequest() + request.payload.payload_type = test_pb2.COMPRESSABLE + request.payload.payload_compressable = 'a' + yield request + + +def _streaming_output_request(test_pb2): + request = test_pb2.StreamingOutputCallRequest() + sizes = [1, 2, 3] + request.response_parameters.add(size=sizes[0], interval_us=0) + request.response_parameters.add(size=sizes[1], interval_us=0) + request.response_parameters.add(size=sizes[2], interval_us=0) + return request + + +def _full_duplex_request_iterator(test_pb2): + request = test_pb2.StreamingOutputCallRequest() + request.response_parameters.add(size=1, interval_us=0) + yield request + request = test_pb2.StreamingOutputCallRequest() + request.response_parameters.add(size=2, interval_us=0) + request.response_parameters.add(size=3, interval_us=0) + yield request + + +class PythonPluginTest(unittest.TestCase): + """Test case for the gRPC Python protoc-plugin. + + While reading these tests, remember that the futures API + (`stub.method.async()`) only gives futures for the *non-streaming* responses, + else it behaves like its blocking cousin. + """ + + def setUp(self): + # Assume that the appropriate protoc and grpc_python_plugins are on the + # path. + protoc_command = 'protoc' + protoc_plugin_filename = distutils.spawn.find_executable( + 'grpc_python_plugin') + test_proto_filename = pkg_resources.resource_filename( + 'grpc_protoc_plugin', 'test.proto') + if not os.path.isfile(protoc_command): + # Assume that if we haven't built protoc that it's on the system. + protoc_command = 'protoc' + + # Ensure that the output directory exists. + self.outdir = tempfile.mkdtemp() + + # Invoke protoc with the plugin. + cmd = [ + protoc_command, + '--plugin=protoc-gen-python-grpc=%s' % protoc_plugin_filename, + '-I .', + '--python_out=%s' % self.outdir, + '--python-grpc_out=%s' % self.outdir, + os.path.basename(test_proto_filename), + ] + subprocess.check_call(' '.join(cmd), shell=True, env=os.environ, + cwd=os.path.dirname(test_proto_filename)) + sys.path.append(self.outdir) + + def tearDown(self): + try: + shutil.rmtree(self.outdir) + except OSError as exc: + if exc.errno != errno.ENOENT: + raise + + # TODO(atash): Figure out which of these tests is hanging flakily with small + # probability. + + def testImportAttributes(self): + # check that we can access the generated module and its members. + import test_pb2 # pylint: disable=g-import-not-at-top + self.assertIsNotNone(getattr(test_pb2, SERVICER_IDENTIFIER, None)) + self.assertIsNotNone(getattr(test_pb2, SERVER_IDENTIFIER, None)) + self.assertIsNotNone(getattr(test_pb2, STUB_IDENTIFIER, None)) + self.assertIsNotNone(getattr(test_pb2, SERVER_FACTORY_IDENTIFIER, None)) + self.assertIsNotNone(getattr(test_pb2, STUB_FACTORY_IDENTIFIER, None)) + + def testUpDown(self): + import test_pb2 + with _CreateService( + test_pb2, NO_DELAY) as (servicer, stub, unused_server): + request = test_pb2.SimpleRequest(response_size=13) + + def testUnaryCall(self): + import test_pb2 # pylint: disable=g-import-not-at-top + with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): + timeout = 6 # TODO(issue 2039): LONG_TIMEOUT like the other methods. + request = test_pb2.SimpleRequest(response_size=13) + response = stub.UnaryCall(request, timeout) + expected_response = methods.UnaryCall(request, 'not a real RpcContext!') + self.assertEqual(expected_response, response) + + def testUnaryCallAsync(self): + import test_pb2 # pylint: disable=g-import-not-at-top + request = test_pb2.SimpleRequest(response_size=13) + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + # Check that the call does not block waiting for the server to respond. + with methods.pause(): + response_future = stub.UnaryCall.async(request, LONG_TIMEOUT) + response = response_future.result() + expected_response = methods.UnaryCall(request, 'not a real RpcContext!') + self.assertEqual(expected_response, response) + + def testUnaryCallAsyncExpired(self): + import test_pb2 # pylint: disable=g-import-not-at-top + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + request = test_pb2.SimpleRequest(response_size=13) + with methods.pause(): + response_future = stub.UnaryCall.async(request, SHORT_TIMEOUT) + with self.assertRaises(exceptions.ExpirationError): + response_future.result() + + @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs ' + 'forever and fix.') + def testUnaryCallAsyncCancelled(self): + import test_pb2 # pylint: disable=g-import-not-at-top + request = test_pb2.SimpleRequest(response_size=13) + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.pause(): + response_future = stub.UnaryCall.async(request, 1) + response_future.cancel() + self.assertTrue(response_future.cancelled()) + + def testUnaryCallAsyncFailed(self): + import test_pb2 # pylint: disable=g-import-not-at-top + request = test_pb2.SimpleRequest(response_size=13) + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.fail(): + response_future = stub.UnaryCall.async(request, LONG_TIMEOUT) + self.assertIsNotNone(response_future.exception()) + + def testStreamingOutputCall(self): + import test_pb2 # pylint: disable=g-import-not-at-top + request = _streaming_output_request(test_pb2) + with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): + responses = stub.StreamingOutputCall(request, LONG_TIMEOUT) + expected_responses = methods.StreamingOutputCall( + request, 'not a real RpcContext!') + for expected_response, response in itertools.izip_longest( + expected_responses, responses): + self.assertEqual(expected_response, response) + + @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs ' + 'forever and fix.') + def testStreamingOutputCallExpired(self): + import test_pb2 # pylint: disable=g-import-not-at-top + request = _streaming_output_request(test_pb2) + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.pause(): + responses = stub.StreamingOutputCall(request, SHORT_TIMEOUT) + with self.assertRaises(exceptions.ExpirationError): + list(responses) + + @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs ' + 'forever and fix.') + def testStreamingOutputCallCancelled(self): + import test_pb2 # pylint: disable=g-import-not-at-top + request = _streaming_output_request(test_pb2) + with _CreateService(test_pb2, NO_DELAY) as ( + unused_methods, stub, unused_server): + responses = stub.StreamingOutputCall(request, SHORT_TIMEOUT) + next(responses) + responses.cancel() + with self.assertRaises(future.CancelledError): + next(responses) + + @unittest.skip('TODO(atash,nathaniel): figure out why this times out ' + 'instead of raising the proper error.') + def testStreamingOutputCallFailed(self): + import test_pb2 # pylint: disable=g-import-not-at-top + request = _streaming_output_request(test_pb2) + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.fail(): + responses = stub.StreamingOutputCall(request, 1) + self.assertIsNotNone(responses) + with self.assertRaises(exceptions.ServicerError): + next(responses) + + @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs ' + 'forever and fix.') + def testStreamingInputCall(self): + import test_pb2 # pylint: disable=g-import-not-at-top + with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): + response = stub.StreamingInputCall( + _streaming_input_request_iterator(test_pb2), LONG_TIMEOUT) + expected_response = methods.StreamingInputCall( + _streaming_input_request_iterator(test_pb2), 'not a real RpcContext!') + self.assertEqual(expected_response, response) + + def testStreamingInputCallAsync(self): + import test_pb2 # pylint: disable=g-import-not-at-top + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.pause(): + response_future = stub.StreamingInputCall.async( + _streaming_input_request_iterator(test_pb2), LONG_TIMEOUT) + response = response_future.result() + expected_response = methods.StreamingInputCall( + _streaming_input_request_iterator(test_pb2), 'not a real RpcContext!') + self.assertEqual(expected_response, response) + + def testStreamingInputCallAsyncExpired(self): + import test_pb2 # pylint: disable=g-import-not-at-top + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.pause(): + response_future = stub.StreamingInputCall.async( + _streaming_input_request_iterator(test_pb2), SHORT_TIMEOUT) + with self.assertRaises(exceptions.ExpirationError): + response_future.result() + self.assertIsInstance( + response_future.exception(), exceptions.ExpirationError) + + def testStreamingInputCallAsyncCancelled(self): + import test_pb2 # pylint: disable=g-import-not-at-top + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.pause(): + timeout = 6 # TODO(issue 2039): LONG_TIMEOUT like the other methods. + response_future = stub.StreamingInputCall.async( + _streaming_input_request_iterator(test_pb2), timeout) + response_future.cancel() + self.assertTrue(response_future.cancelled()) + with self.assertRaises(future.CancelledError): + response_future.result() + + def testStreamingInputCallAsyncFailed(self): + import test_pb2 # pylint: disable=g-import-not-at-top + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.fail(): + response_future = stub.StreamingInputCall.async( + _streaming_input_request_iterator(test_pb2), SHORT_TIMEOUT) + self.assertIsNotNone(response_future.exception()) + + def testFullDuplexCall(self): + import test_pb2 # pylint: disable=g-import-not-at-top + with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): + responses = stub.FullDuplexCall( + _full_duplex_request_iterator(test_pb2), LONG_TIMEOUT) + expected_responses = methods.FullDuplexCall( + _full_duplex_request_iterator(test_pb2), 'not a real RpcContext!') + for expected_response, response in itertools.izip_longest( + expected_responses, responses): + self.assertEqual(expected_response, response) + + @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs ' + 'forever and fix.') + def testFullDuplexCallExpired(self): + import test_pb2 # pylint: disable=g-import-not-at-top + request_iterator = _full_duplex_request_iterator(test_pb2) + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.pause(): + responses = stub.FullDuplexCall(request_iterator, SHORT_TIMEOUT) + with self.assertRaises(exceptions.ExpirationError): + list(responses) + + @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs ' + 'forever and fix.') + def testFullDuplexCallCancelled(self): + import test_pb2 # pylint: disable=g-import-not-at-top + with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): + request_iterator = _full_duplex_request_iterator(test_pb2) + responses = stub.FullDuplexCall(request_iterator, LONG_TIMEOUT) + next(responses) + responses.cancel() + with self.assertRaises(future.CancelledError): + next(responses) + + @unittest.skip('TODO(atash,nathaniel): figure out why this hangs forever ' + 'and fix.') + def testFullDuplexCallFailed(self): + import test_pb2 # pylint: disable=g-import-not-at-top + request_iterator = _full_duplex_request_iterator(test_pb2) + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.fail(): + responses = stub.FullDuplexCall(request_iterator, LONG_TIMEOUT) + self.assertIsNotNone(responses) + with self.assertRaises(exceptions.ServicerError): + next(responses) + + @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs ' + 'forever and fix.') + def testHalfDuplexCall(self): + import test_pb2 # pylint: disable=g-import-not-at-top + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + def half_duplex_request_iterator(): + request = test_pb2.StreamingOutputCallRequest() + request.response_parameters.add(size=1, interval_us=0) + yield request + request = test_pb2.StreamingOutputCallRequest() + request.response_parameters.add(size=2, interval_us=0) + request.response_parameters.add(size=3, interval_us=0) + yield request + responses = stub.HalfDuplexCall( + half_duplex_request_iterator(), LONG_TIMEOUT) + expected_responses = methods.HalfDuplexCall( + half_duplex_request_iterator(), 'not a real RpcContext!') + for check in itertools.izip_longest(expected_responses, responses): + expected_response, response = check + self.assertEqual(expected_response, response) + + def testHalfDuplexCallWedged(self): + import test_pb2 # pylint: disable=g-import-not-at-top + condition = threading.Condition() + wait_cell = [False] + @contextlib.contextmanager + def wait(): # pylint: disable=invalid-name + # Where's Python 3's 'nonlocal' statement when you need it? + with condition: + wait_cell[0] = True + yield + with condition: + wait_cell[0] = False + condition.notify_all() + def half_duplex_request_iterator(): + request = test_pb2.StreamingOutputCallRequest() + request.response_parameters.add(size=1, interval_us=0) + yield request + with condition: + while wait_cell[0]: + condition.wait() + with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): + with wait(): + responses = stub.HalfDuplexCall( + half_duplex_request_iterator(), SHORT_TIMEOUT) + # half-duplex waits for the client to send all info + with self.assertRaises(exceptions.ExpirationError): + next(responses) + + +if __name__ == '__main__': + os.chdir(os.path.dirname(sys.argv[0])) + unittest.main(verbosity=2) diff --git a/src/python/grpcio_test/grpc_protoc_plugin/test.proto b/src/python/grpcio_test/grpc_protoc_plugin/test.proto new file mode 100644 index 0000000000..ed7c6a7b79 --- /dev/null +++ b/src/python/grpcio_test/grpc_protoc_plugin/test.proto @@ -0,0 +1,139 @@ +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +// An integration test service that covers all the method signature permutations +// of unary/streaming requests/responses. +// This file is duplicated around the code base. See GitHub issue #526. +syntax = "proto2"; + +package grpc.testing; + +enum PayloadType { + // Compressable text format. + COMPRESSABLE= 1; + + // Uncompressable binary format. + UNCOMPRESSABLE = 2; + + // Randomly chosen from all other formats defined in this enum. + RANDOM = 3; +} + +message Payload { + required PayloadType payload_type = 1; + oneof payload_body { + string payload_compressable = 2; + bytes payload_uncompressable = 3; + } +} + +message SimpleRequest { + // Desired payload type in the response from the server. + // If response_type is RANDOM, server randomly chooses one from other formats. + optional PayloadType response_type = 1 [default=COMPRESSABLE]; + + // Desired payload size in the response from the server. + // If response_type is COMPRESSABLE, this denotes the size before compression. + optional int32 response_size = 2; + + // Optional input payload sent along with the request. + optional Payload payload = 3; +} + +message SimpleResponse { + optional Payload payload = 1; +} + +message StreamingInputCallRequest { + // Optional input payload sent along with the request. + optional Payload payload = 1; + + // Not expecting any payload from the response. +} + +message StreamingInputCallResponse { + // Aggregated size of payloads received from the client. + optional int32 aggregated_payload_size = 1; +} + +message ResponseParameters { + // Desired payload sizes in responses from the server. + // If response_type is COMPRESSABLE, this denotes the size before compression. + required int32 size = 1; + + // Desired interval between consecutive responses in the response stream in + // microseconds. + required int32 interval_us = 2; +} + +message StreamingOutputCallRequest { + // Desired payload type in the response from the server. + // If response_type is RANDOM, the payload from each response in the stream + // might be of different types. This is to simulate a mixed type of payload + // stream. + optional PayloadType response_type = 1 [default=COMPRESSABLE]; + + repeated ResponseParameters response_parameters = 2; + + // Optional input payload sent along with the request. + optional Payload payload = 3; +} + +message StreamingOutputCallResponse { + optional Payload payload = 1; +} + +service TestService { + // One request followed by one response. + // The server returns the client payload as-is. + rpc UnaryCall(SimpleRequest) returns (SimpleResponse); + + // One request followed by a sequence of responses (streamed download). + // The server returns the payload with client desired type and sizes. + rpc StreamingOutputCall(StreamingOutputCallRequest) + returns (stream StreamingOutputCallResponse); + + // A sequence of requests followed by one response (streamed upload). + // The server returns the aggregated size of client payload as the result. + rpc StreamingInputCall(stream StreamingInputCallRequest) + returns (StreamingInputCallResponse); + + // A sequence of requests with each request served by the server immediately. + // As one request could lead to multiple responses, this interface + // demonstrates the idea of full duplexing. + rpc FullDuplexCall(stream StreamingOutputCallRequest) + returns (stream StreamingOutputCallResponse); + + // A sequence of requests followed by a sequence of responses. + // The server buffers all the client requests and then serves them in order. A + // stream of responses are returned to the client when the server starts with + // first request. + rpc HalfDuplexCall(stream StreamingOutputCallRequest) + returns (stream StreamingOutputCallResponse); +} diff --git a/src/python/grpcio_test/grpc_test/_adapter/_low_test.py b/src/python/grpcio_test/grpc_test/_adapter/_low_test.py index 9a8edfad0c..44fe760fbc 100644 --- a/src/python/grpcio_test/grpc_test/_adapter/_low_test.py +++ b/src/python/grpcio_test/grpc_test/_adapter/_low_test.py @@ -31,11 +31,12 @@ import threading import time import unittest +from grpc import _grpcio_metadata from grpc._adapter import _types from grpc._adapter import _low -def WaitForEvents(completion_queues, deadline): +def wait_for_events(completion_queues, deadline): """ Args: completion_queues: list of completion queues to wait for events on @@ -62,6 +63,7 @@ def WaitForEvents(completion_queues, deadline): thread.join() return results + class InsecureServerInsecureClient(unittest.TestCase): def setUp(self): @@ -115,7 +117,7 @@ class InsecureServerInsecureClient(unittest.TestCase): client_initial_metadata = [(CLIENT_METADATA_ASCII_KEY, CLIENT_METADATA_ASCII_VALUE), (CLIENT_METADATA_BIN_KEY, CLIENT_METADATA_BIN_VALUE)] client_start_batch_result = client_call.start_batch([ _types.OpArgs.send_initial_metadata(client_initial_metadata), - _types.OpArgs.send_message(REQUEST), + _types.OpArgs.send_message(REQUEST, 0), _types.OpArgs.send_close_from_client(), _types.OpArgs.recv_initial_metadata(), _types.OpArgs.recv_message(), @@ -123,20 +125,34 @@ class InsecureServerInsecureClient(unittest.TestCase): ], client_call_tag) self.assertEquals(_types.CallError.OK, client_start_batch_result) - client_no_event, request_event, = WaitForEvents([self.client_completion_queue, self.server_completion_queue], time.time() + 2) + client_no_event, request_event, = wait_for_events([self.client_completion_queue, self.server_completion_queue], time.time() + 2) self.assertEquals(client_no_event, None) self.assertEquals(_types.EventType.OP_COMPLETE, request_event.type) self.assertIsInstance(request_event.call, _low.Call) self.assertIs(server_request_tag, request_event.tag) self.assertEquals(1, len(request_event.results)) - got_initial_metadata = dict(request_event.results[0].initial_metadata) + received_initial_metadata = dict(request_event.results[0].initial_metadata) + # Check that our metadata were transmitted self.assertEquals( dict(client_initial_metadata), - dict((x, got_initial_metadata[x]) for x in zip(*client_initial_metadata)[0])) + dict((x, received_initial_metadata[x]) for x in zip(*client_initial_metadata)[0])) + # Check that Python's user agent string is a part of the full user agent + # string + self.assertIn('Python-gRPC-{}'.format(_grpcio_metadata.__version__), + received_initial_metadata['user-agent']) self.assertEquals(METHOD, request_event.call_details.method) self.assertEquals(HOST, request_event.call_details.host) self.assertLess(abs(DEADLINE - request_event.call_details.deadline), DEADLINE_TOLERANCE) + # Check that the channel is connected, and that both it and the call have + # the proper target and peer; do this after the first flurry of messages to + # avoid the possibility that connection was delayed by the core until the + # first message was sent. + self.assertEqual(_types.ConnectivityState.READY, + self.client_channel.check_connectivity_state(False)) + self.assertIsNotNone(self.client_channel.target()) + self.assertIsNotNone(client_call.peer()) + server_call_tag = object() server_call = request_event.call server_initial_metadata = [(SERVER_INITIAL_METADATA_KEY, SERVER_INITIAL_METADATA_VALUE)] @@ -144,13 +160,13 @@ class InsecureServerInsecureClient(unittest.TestCase): server_start_batch_result = server_call.start_batch([ _types.OpArgs.send_initial_metadata(server_initial_metadata), _types.OpArgs.recv_message(), - _types.OpArgs.send_message(RESPONSE), + _types.OpArgs.send_message(RESPONSE, 0), _types.OpArgs.recv_close_on_server(), _types.OpArgs.send_status_from_server(server_trailing_metadata, SERVER_STATUS_CODE, SERVER_STATUS_DETAILS) ], server_call_tag) self.assertEquals(_types.CallError.OK, server_start_batch_result) - client_event, server_event, = WaitForEvents([self.client_completion_queue, self.server_completion_queue], time.time() + 1) + client_event, server_event, = wait_for_events([self.client_completion_queue, self.server_completion_queue], time.time() + 1) self.assertEquals(6, len(client_event.results)) found_client_op_types = set() diff --git a/src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py b/src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py new file mode 100644 index 0000000000..72b1ae5642 --- /dev/null +++ b/src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py @@ -0,0 +1,165 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Tests the RPC Framework Core's implementation of the Base interface.""" + +import collections +import logging +import random +import time +import unittest + +from grpc._adapter import _intermediary_low +from grpc._links import invocation +from grpc._links import service +from grpc.framework.core import implementations +from grpc.framework.interfaces.base import utilities +from grpc_test import test_common as grpc_test_common +from grpc_test.framework.common import test_constants +from grpc_test.framework.interfaces.base import test_cases +from grpc_test.framework.interfaces.base import test_interfaces + +_INVOCATION_INITIAL_METADATA = ((b'0', b'abc'), (b'1', b'def'), (b'2', b'ghi'),) +_SERVICE_INITIAL_METADATA = ((b'3', b'jkl'), (b'4', b'mno'), (b'5', b'pqr'),) +_SERVICE_TERMINAL_METADATA = ((b'6', b'stu'), (b'7', b'vwx'), (b'8', b'yza'),) +_CODE = _intermediary_low.Code.OK +_MESSAGE = b'test message' + + +class _SerializationBehaviors( + collections.namedtuple( + '_SerializationBehaviors', + ('request_serializers', 'request_deserializers', 'response_serializers', + 'response_deserializers',))): + pass + + +class _Links( + collections.namedtuple( + '_Links', + ('invocation_end_link', 'invocation_grpc_link', 'service_grpc_link', + 'service_end_link'))): + pass + + +def _serialization_behaviors_from_serializations(serializations): + request_serializers = {} + request_deserializers = {} + response_serializers = {} + response_deserializers = {} + for (group, method), serialization in serializations.iteritems(): + request_serializers[group, method] = serialization.serialize_request + request_deserializers[group, method] = serialization.deserialize_request + response_serializers[group, method] = serialization.serialize_response + response_deserializers[group, method] = serialization.deserialize_response + return _SerializationBehaviors( + request_serializers, request_deserializers, response_serializers, + response_deserializers) + + +class _Implementation(test_interfaces.Implementation): + + def instantiate(self, serializations, servicer): + serialization_behaviors = _serialization_behaviors_from_serializations( + serializations) + invocation_end_link = implementations.invocation_end_link() + service_end_link = implementations.service_end_link( + servicer, test_constants.DEFAULT_TIMEOUT, + test_constants.MAXIMUM_TIMEOUT) + service_grpc_link = service.service_link( + serialization_behaviors.request_deserializers, + serialization_behaviors.response_serializers) + port = service_grpc_link.add_port(0, None) + channel = _intermediary_low.Channel('localhost:%d' % port, None) + invocation_grpc_link = invocation.invocation_link( + channel, b'localhost', + serialization_behaviors.request_serializers, + serialization_behaviors.response_deserializers) + + invocation_end_link.join_link(invocation_grpc_link) + invocation_grpc_link.join_link(invocation_end_link) + service_end_link.join_link(service_grpc_link) + service_grpc_link.join_link(service_end_link) + invocation_grpc_link.start() + service_grpc_link.start() + return invocation_end_link, service_end_link, ( + invocation_grpc_link, service_grpc_link) + + def destantiate(self, memo): + invocation_grpc_link, service_grpc_link = memo + invocation_grpc_link.stop() + service_grpc_link.stop_gracefully() + + def invocation_initial_metadata(self): + return _INVOCATION_INITIAL_METADATA + + def service_initial_metadata(self): + return _SERVICE_INITIAL_METADATA + + def invocation_completion(self): + return utilities.completion(None, None, None) + + def service_completion(self): + return utilities.completion(_SERVICE_TERMINAL_METADATA, _CODE, _MESSAGE) + + def metadata_transmitted(self, original_metadata, transmitted_metadata): + return original_metadata is None or grpc_test_common.metadata_transmitted( + original_metadata, transmitted_metadata) + + def completion_transmitted(self, original_completion, transmitted_completion): + if (original_completion.terminal_metadata is not None and + not grpc_test_common.metadata_transmitted( + original_completion.terminal_metadata, + transmitted_completion.terminal_metadata)): + return False + elif original_completion.code is not transmitted_completion.code: + return False + elif original_completion.message != transmitted_completion.message: + return False + else: + return True + + +def setUpModule(): + logging.warn('setUpModule!') + + +def tearDownModule(): + logging.warn('tearDownModule!') + + +def load_tests(loader, tests, pattern): + return unittest.TestSuite( + tests=tuple( + loader.loadTestsFromTestCase(test_case_class) + for test_case_class in test_cases.test_cases(_Implementation()))) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio_test/grpc_test/framework/common/test_constants.py b/src/python/grpcio_test/grpc_test/framework/common/test_constants.py index 3126d0d82c..e1d3c2709d 100644 --- a/src/python/grpcio_test/grpc_test/framework/common/test_constants.py +++ b/src/python/grpcio_test/grpc_test/framework/common/test_constants.py @@ -29,15 +29,25 @@ """Constants shared among tests throughout RPC Framework.""" +# Value for maximum duration in seconds that a test is allowed for its actual +# behavioral logic, excluding all time spent deliberately waiting in the test. +TIME_ALLOWANCE = 10 # Value for maximum duration in seconds of RPCs that may time out as part of a # test. SHORT_TIMEOUT = 4 # Absurdly large value for maximum duration in seconds for should-not-time-out # RPCs made during tests. LONG_TIMEOUT = 3000 +# Values to supply on construction of an object that will service RPCs; these +# should not be used as the actual timeout values of any RPCs made during tests. +DEFAULT_TIMEOUT = 300 +MAXIMUM_TIMEOUT = 3600 # The number of payloads to transmit in streaming tests. STREAM_LENGTH = 200 +# The size of payloads to transmit in tests. +PAYLOAD_SIZE = 256 * 1024 + 17 + # The size of thread pools to use in tests. POOL_SIZE = 10 diff --git a/src/python/grpcio_test/grpc_test/framework/core/__init__.py b/src/python/grpcio_test/grpc_test/framework/core/__init__.py new file mode 100644 index 0000000000..7086519106 --- /dev/null +++ b/src/python/grpcio_test/grpc_test/framework/core/__init__.py @@ -0,0 +1,30 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + diff --git a/src/python/grpcio_test/grpc_test/framework/core/_base_interface_test.py b/src/python/grpcio_test/grpc_test/framework/core/_base_interface_test.py new file mode 100644 index 0000000000..8d72f131d5 --- /dev/null +++ b/src/python/grpcio_test/grpc_test/framework/core/_base_interface_test.py @@ -0,0 +1,96 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Tests the RPC Framework Core's implementation of the Base interface.""" + +import logging +import random +import time +import unittest + +from grpc.framework.core import implementations +from grpc.framework.interfaces.base import utilities +from grpc_test.framework.common import test_constants +from grpc_test.framework.interfaces.base import test_cases +from grpc_test.framework.interfaces.base import test_interfaces + + +class _Implementation(test_interfaces.Implementation): + + def __init__(self): + self._invocation_initial_metadata = object() + self._service_initial_metadata = object() + self._invocation_terminal_metadata = object() + self._service_terminal_metadata = object() + + def instantiate(self, serializations, servicer): + invocation = implementations.invocation_end_link() + service = implementations.service_end_link( + servicer, test_constants.DEFAULT_TIMEOUT, + test_constants.MAXIMUM_TIMEOUT) + invocation.join_link(service) + service.join_link(invocation) + return invocation, service, None + + def destantiate(self, memo): + pass + + def invocation_initial_metadata(self): + return self._invocation_initial_metadata + + def service_initial_metadata(self): + return self._service_initial_metadata + + def invocation_completion(self): + return utilities.completion(self._invocation_terminal_metadata, None, None) + + def service_completion(self): + return utilities.completion(self._service_terminal_metadata, None, None) + + def metadata_transmitted(self, original_metadata, transmitted_metadata): + return transmitted_metadata is original_metadata + + def completion_transmitted(self, original_completion, transmitted_completion): + return ( + (original_completion.terminal_metadata is + transmitted_completion.terminal_metadata) and + original_completion.code is transmitted_completion.code and + original_completion.message is transmitted_completion.message + ) + + +def load_tests(loader, tests, pattern): + return unittest.TestSuite( + tests=tuple( + loader.loadTestsFromTestCase(test_case_class) + for test_case_class in test_cases.test_cases(_Implementation()))) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio_test/grpc_test/framework/face/testing/blocking_invocation_inline_service_test_case.py b/src/python/grpcio_test/grpc_test/framework/face/testing/blocking_invocation_inline_service_test_case.py index 7e1158f96b..251e1eb68e 100644 --- a/src/python/grpcio_test/grpc_test/framework/face/testing/blocking_invocation_inline_service_test_case.py +++ b/src/python/grpcio_test/grpc_test/framework/face/testing/blocking_invocation_inline_service_test_case.py @@ -34,15 +34,13 @@ import abc import unittest # pylint: disable=unused-import from grpc.framework.face import exceptions +from grpc_test.framework.common import test_constants from grpc_test.framework.face.testing import control from grpc_test.framework.face.testing import coverage from grpc_test.framework.face.testing import digest from grpc_test.framework.face.testing import stock_service from grpc_test.framework.face.testing import test_case -_TIMEOUT = 3 -_LONG_TIMEOUT = 45 - class BlockingInvocationInlineServiceTestCase( test_case.FaceTestCase, coverage.BlockingCoverage): @@ -79,7 +77,7 @@ class BlockingInvocationInlineServiceTestCase( request = test_messages.request() response = self.stub.blocking_value_in_value_out( - name, request, _LONG_TIMEOUT) + name, request, test_constants.LONG_TIMEOUT) test_messages.verify(request, response, self) @@ -90,7 +88,7 @@ class BlockingInvocationInlineServiceTestCase( request = test_messages.request() response_iterator = self.stub.inline_value_in_stream_out( - name, request, _LONG_TIMEOUT) + name, request, test_constants.LONG_TIMEOUT) responses = list(response_iterator) test_messages.verify(request, responses, self) @@ -102,7 +100,7 @@ class BlockingInvocationInlineServiceTestCase( requests = test_messages.requests() response = self.stub.blocking_stream_in_value_out( - name, iter(requests), _LONG_TIMEOUT) + name, iter(requests), test_constants.LONG_TIMEOUT) test_messages.verify(requests, response, self) @@ -113,7 +111,7 @@ class BlockingInvocationInlineServiceTestCase( requests = test_messages.requests() response_iterator = self.stub.inline_stream_in_stream_out( - name, iter(requests), _LONG_TIMEOUT) + name, iter(requests), test_constants.LONG_TIMEOUT) responses = list(response_iterator) test_messages.verify(requests, responses, self) @@ -126,12 +124,12 @@ class BlockingInvocationInlineServiceTestCase( second_request = test_messages.request() first_response = self.stub.blocking_value_in_value_out( - name, first_request, _TIMEOUT) + name, first_request, test_constants.SHORT_TIMEOUT) test_messages.verify(first_request, first_response, self) second_response = self.stub.blocking_value_in_value_out( - name, second_request, _TIMEOUT) + name, second_request, test_constants.SHORT_TIMEOUT) test_messages.verify(second_request, second_response, self) @@ -144,7 +142,7 @@ class BlockingInvocationInlineServiceTestCase( with self.control.pause(), self.assertRaises( exceptions.ExpirationError): multi_callable = self.stub.unary_unary_multi_callable(name) - multi_callable(request, _TIMEOUT) + multi_callable(request, test_constants.SHORT_TIMEOUT) def testExpiredUnaryRequestStreamResponse(self): for name, test_messages_sequence in ( @@ -155,7 +153,7 @@ class BlockingInvocationInlineServiceTestCase( with self.control.pause(), self.assertRaises( exceptions.ExpirationError): response_iterator = self.stub.inline_value_in_stream_out( - name, request, _TIMEOUT) + name, request, test_constants.SHORT_TIMEOUT) list(response_iterator) def testExpiredStreamRequestUnaryResponse(self): @@ -167,7 +165,7 @@ class BlockingInvocationInlineServiceTestCase( with self.control.pause(), self.assertRaises( exceptions.ExpirationError): multi_callable = self.stub.stream_unary_multi_callable(name) - multi_callable(iter(requests), _TIMEOUT) + multi_callable(iter(requests), test_constants.SHORT_TIMEOUT) def testExpiredStreamRequestStreamResponse(self): for name, test_messages_sequence in ( @@ -178,7 +176,7 @@ class BlockingInvocationInlineServiceTestCase( with self.control.pause(), self.assertRaises( exceptions.ExpirationError): response_iterator = self.stub.inline_stream_in_stream_out( - name, iter(requests), _TIMEOUT) + name, iter(requests), test_constants.SHORT_TIMEOUT) list(response_iterator) def testFailedUnaryRequestUnaryResponse(self): @@ -188,7 +186,8 @@ class BlockingInvocationInlineServiceTestCase( request = test_messages.request() with self.control.fail(), self.assertRaises(exceptions.ServicerError): - self.stub.blocking_value_in_value_out(name, request, _TIMEOUT) + self.stub.blocking_value_in_value_out(name, request, + test_constants.SHORT_TIMEOUT) def testFailedUnaryRequestStreamResponse(self): for name, test_messages_sequence in ( @@ -198,7 +197,7 @@ class BlockingInvocationInlineServiceTestCase( with self.control.fail(), self.assertRaises(exceptions.ServicerError): response_iterator = self.stub.inline_value_in_stream_out( - name, request, _TIMEOUT) + name, request, test_constants.SHORT_TIMEOUT) list(response_iterator) def testFailedStreamRequestUnaryResponse(self): @@ -208,7 +207,8 @@ class BlockingInvocationInlineServiceTestCase( requests = test_messages.requests() with self.control.fail(), self.assertRaises(exceptions.ServicerError): - self.stub.blocking_stream_in_value_out(name, iter(requests), _TIMEOUT) + self.stub.blocking_stream_in_value_out(name, iter(requests), + test_constants.SHORT_TIMEOUT) def testFailedStreamRequestStreamResponse(self): for name, test_messages_sequence in ( @@ -218,5 +218,5 @@ class BlockingInvocationInlineServiceTestCase( with self.control.fail(), self.assertRaises(exceptions.ServicerError): response_iterator = self.stub.inline_stream_in_stream_out( - name, iter(requests), _TIMEOUT) + name, iter(requests), test_constants.SHORT_TIMEOUT) list(response_iterator) diff --git a/src/python/grpcio_test/grpc_test/framework/face/testing/event_invocation_synchronous_event_service_test_case.py b/src/python/grpcio_test/grpc_test/framework/face/testing/event_invocation_synchronous_event_service_test_case.py index 18eed53d6e..9df77678eb 100644 --- a/src/python/grpcio_test/grpc_test/framework/face/testing/event_invocation_synchronous_event_service_test_case.py +++ b/src/python/grpcio_test/grpc_test/framework/face/testing/event_invocation_synchronous_event_service_test_case.py @@ -33,6 +33,7 @@ import abc import unittest from grpc.framework.face import interfaces +from grpc_test.framework.common import test_constants from grpc_test.framework.face.testing import callback as testing_callback from grpc_test.framework.face.testing import control from grpc_test.framework.face.testing import coverage @@ -40,8 +41,6 @@ from grpc_test.framework.face.testing import digest from grpc_test.framework.face.testing import stock_service from grpc_test.framework.face.testing import test_case -_TIMEOUT = 3 - class EventInvocationSynchronousEventServiceTestCase( test_case.FaceTestCase, coverage.FullCoverage): @@ -79,7 +78,8 @@ class EventInvocationSynchronousEventServiceTestCase( callback = testing_callback.Callback() self.stub.event_value_in_value_out( - name, request, callback.complete, callback.abort, _TIMEOUT) + name, request, callback.complete, callback.abort, + test_constants.SHORT_TIMEOUT) callback.block_until_terminated() response = callback.response() @@ -93,7 +93,8 @@ class EventInvocationSynchronousEventServiceTestCase( callback = testing_callback.Callback() self.stub.event_value_in_stream_out( - name, request, callback, callback.abort, _TIMEOUT) + name, request, callback, callback.abort, + test_constants.SHORT_TIMEOUT) callback.block_until_terminated() responses = callback.responses() @@ -107,7 +108,8 @@ class EventInvocationSynchronousEventServiceTestCase( callback = testing_callback.Callback() unused_call, request_consumer = self.stub.event_stream_in_value_out( - name, callback.complete, callback.abort, _TIMEOUT) + name, callback.complete, callback.abort, + test_constants.SHORT_TIMEOUT) for request in requests: request_consumer.consume(request) request_consumer.terminate() @@ -124,7 +126,7 @@ class EventInvocationSynchronousEventServiceTestCase( callback = testing_callback.Callback() unused_call, request_consumer = self.stub.event_stream_in_stream_out( - name, callback, callback.abort, _TIMEOUT) + name, callback, callback.abort, test_constants.SHORT_TIMEOUT) for request in requests: request_consumer.consume(request) request_consumer.terminate() @@ -147,11 +149,11 @@ class EventInvocationSynchronousEventServiceTestCase( first_callback.complete(first_response) self.stub.event_value_in_value_out( name, second_request, second_callback.complete, - second_callback.abort, _TIMEOUT) + second_callback.abort, test_constants.SHORT_TIMEOUT) self.stub.event_value_in_value_out( name, first_request, make_second_invocation, first_callback.abort, - _TIMEOUT) + test_constants.SHORT_TIMEOUT) second_callback.block_until_terminated() first_response = first_callback.response() @@ -168,7 +170,8 @@ class EventInvocationSynchronousEventServiceTestCase( with self.control.pause(): self.stub.event_value_in_value_out( - name, request, callback.complete, callback.abort, _TIMEOUT) + name, request, callback.complete, callback.abort, + test_constants.SHORT_TIMEOUT) callback.block_until_terminated() self.assertEqual(interfaces.Abortion.EXPIRED, callback.abortion()) @@ -182,7 +185,8 @@ class EventInvocationSynchronousEventServiceTestCase( with self.control.pause(): self.stub.event_value_in_stream_out( - name, request, callback, callback.abort, _TIMEOUT) + name, request, callback, callback.abort, + test_constants.SHORT_TIMEOUT) callback.block_until_terminated() self.assertEqual(interfaces.Abortion.EXPIRED, callback.abortion()) @@ -194,7 +198,8 @@ class EventInvocationSynchronousEventServiceTestCase( callback = testing_callback.Callback() self.stub.event_stream_in_value_out( - name, callback.complete, callback.abort, _TIMEOUT) + name, callback.complete, callback.abort, + test_constants.SHORT_TIMEOUT) callback.block_until_terminated() self.assertEqual(interfaces.Abortion.EXPIRED, callback.abortion()) @@ -207,7 +212,7 @@ class EventInvocationSynchronousEventServiceTestCase( callback = testing_callback.Callback() unused_call, request_consumer = self.stub.event_stream_in_stream_out( - name, callback, callback.abort, _TIMEOUT) + name, callback, callback.abort, test_constants.SHORT_TIMEOUT) for request in requests: request_consumer.consume(request) callback.block_until_terminated() @@ -223,10 +228,12 @@ class EventInvocationSynchronousEventServiceTestCase( with self.control.fail(): self.stub.event_value_in_value_out( - name, request, callback.complete, callback.abort, _TIMEOUT) + name, request, callback.complete, callback.abort, + test_constants.SHORT_TIMEOUT) callback.block_until_terminated() - self.assertEqual(interfaces.Abortion.SERVICER_FAILURE, callback.abortion()) + self.assertEqual(interfaces.Abortion.SERVICER_FAILURE, + callback.abortion()) def testFailedUnaryRequestStreamResponse(self): for name, test_messages_sequence in ( @@ -237,10 +244,12 @@ class EventInvocationSynchronousEventServiceTestCase( with self.control.fail(): self.stub.event_value_in_stream_out( - name, request, callback, callback.abort, _TIMEOUT) + name, request, callback, callback.abort, + test_constants.SHORT_TIMEOUT) callback.block_until_terminated() - self.assertEqual(interfaces.Abortion.SERVICER_FAILURE, callback.abortion()) + self.assertEqual(interfaces.Abortion.SERVICER_FAILURE, + callback.abortion()) def testFailedStreamRequestUnaryResponse(self): for name, test_messages_sequence in ( @@ -251,13 +260,15 @@ class EventInvocationSynchronousEventServiceTestCase( with self.control.fail(): unused_call, request_consumer = self.stub.event_stream_in_value_out( - name, callback.complete, callback.abort, _TIMEOUT) + name, callback.complete, callback.abort, + test_constants.SHORT_TIMEOUT) for request in requests: request_consumer.consume(request) request_consumer.terminate() callback.block_until_terminated() - self.assertEqual(interfaces.Abortion.SERVICER_FAILURE, callback.abortion()) + self.assertEqual(interfaces.Abortion.SERVICER_FAILURE, + callback.abortion()) def testFailedStreamRequestStreamResponse(self): for name, test_messages_sequence in ( @@ -268,7 +279,7 @@ class EventInvocationSynchronousEventServiceTestCase( with self.control.fail(): unused_call, request_consumer = self.stub.event_stream_in_stream_out( - name, callback, callback.abort, _TIMEOUT) + name, callback, callback.abort, test_constants.SHORT_TIMEOUT) for request in requests: request_consumer.consume(request) request_consumer.terminate() @@ -287,10 +298,10 @@ class EventInvocationSynchronousEventServiceTestCase( self.stub.event_value_in_value_out( name, first_request, first_callback.complete, first_callback.abort, - _TIMEOUT) + test_constants.SHORT_TIMEOUT) self.stub.event_value_in_value_out( name, second_request, second_callback.complete, - second_callback.abort, _TIMEOUT) + second_callback.abort, test_constants.SHORT_TIMEOUT) first_callback.block_until_terminated() second_callback.block_until_terminated() @@ -312,7 +323,8 @@ class EventInvocationSynchronousEventServiceTestCase( with self.control.pause(): call = self.stub.event_value_in_value_out( - name, request, callback.complete, callback.abort, _TIMEOUT) + name, request, callback.complete, callback.abort, + test_constants.SHORT_TIMEOUT) call.cancel() callback.block_until_terminated() @@ -326,7 +338,8 @@ class EventInvocationSynchronousEventServiceTestCase( callback = testing_callback.Callback() call = self.stub.event_value_in_stream_out( - name, request, callback, callback.abort, _TIMEOUT) + name, request, callback, callback.abort, + test_constants.SHORT_TIMEOUT) call.cancel() callback.block_until_terminated() @@ -340,7 +353,8 @@ class EventInvocationSynchronousEventServiceTestCase( callback = testing_callback.Callback() call, request_consumer = self.stub.event_stream_in_value_out( - name, callback.complete, callback.abort, _TIMEOUT) + name, callback.complete, callback.abort, + test_constants.SHORT_TIMEOUT) for request in requests: request_consumer.consume(request) call.cancel() @@ -355,7 +369,7 @@ class EventInvocationSynchronousEventServiceTestCase( callback = testing_callback.Callback() call, unused_request_consumer = self.stub.event_stream_in_stream_out( - name, callback, callback.abort, _TIMEOUT) + name, callback, callback.abort, test_constants.SHORT_TIMEOUT) call.cancel() callback.block_until_terminated() diff --git a/src/python/grpcio_test/grpc_test/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py b/src/python/grpcio_test/grpc_test/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py index 3b42914342..70d86a0422 100644 --- a/src/python/grpcio_test/grpc_test/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py +++ b/src/python/grpcio_test/grpc_test/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py @@ -37,13 +37,13 @@ import unittest from grpc.framework.face import exceptions from grpc.framework.foundation import future from grpc.framework.foundation import logging_pool +from grpc_test.framework.common import test_constants from grpc_test.framework.face.testing import control from grpc_test.framework.face.testing import coverage from grpc_test.framework.face.testing import digest from grpc_test.framework.face.testing import stock_service from grpc_test.framework.face.testing import test_case -_TIMEOUT = 3 _MAXIMUM_POOL_SIZE = 10 @@ -110,7 +110,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( request = test_messages.request() response_future = self.stub.future_value_in_value_out( - name, request, _TIMEOUT) + name, request, test_constants.SHORT_TIMEOUT) response = response_future.result() test_messages.verify(request, response, self) @@ -122,7 +122,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( request = test_messages.request() response_iterator = self.stub.inline_value_in_stream_out( - name, request, _TIMEOUT) + name, request, test_constants.SHORT_TIMEOUT) responses = list(response_iterator) test_messages.verify(request, responses, self) @@ -138,7 +138,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( # returned to calling code before the iterator yields any requests. with request_iterator.pause(): response_future = self.stub.future_stream_in_value_out( - name, request_iterator, _TIMEOUT) + name, request_iterator, test_constants.SHORT_TIMEOUT) response = response_future.result() test_messages.verify(requests, response, self) @@ -154,7 +154,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( # returned to calling code before the iterator yields any requests. with request_iterator.pause(): response_iterator = self.stub.inline_stream_in_stream_out( - name, request_iterator, _TIMEOUT) + name, request_iterator, test_constants.SHORT_TIMEOUT) responses = list(response_iterator) test_messages.verify(requests, responses, self) @@ -167,13 +167,13 @@ class FutureInvocationAsynchronousEventServiceTestCase( second_request = test_messages.request() first_response_future = self.stub.future_value_in_value_out( - name, first_request, _TIMEOUT) + name, first_request, test_constants.SHORT_TIMEOUT) first_response = first_response_future.result() test_messages.verify(first_request, first_response, self) second_response_future = self.stub.future_value_in_value_out( - name, second_request, _TIMEOUT) + name, second_request, test_constants.SHORT_TIMEOUT) second_response = second_response_future.result() test_messages.verify(second_request, second_response, self) @@ -186,7 +186,8 @@ class FutureInvocationAsynchronousEventServiceTestCase( with self.control.pause(): multi_callable = self.stub.unary_unary_multi_callable(name) - response_future = multi_callable.future(request, _TIMEOUT) + response_future = multi_callable.future(request, + test_constants.SHORT_TIMEOUT) self.assertIsInstance( response_future.exception(), exceptions.ExpirationError) with self.assertRaises(exceptions.ExpirationError): @@ -200,7 +201,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( with self.control.pause(): response_iterator = self.stub.inline_value_in_stream_out( - name, request, _TIMEOUT) + name, request, test_constants.SHORT_TIMEOUT) with self.assertRaises(exceptions.ExpirationError): list(response_iterator) @@ -212,7 +213,8 @@ class FutureInvocationAsynchronousEventServiceTestCase( with self.control.pause(): multi_callable = self.stub.stream_unary_multi_callable(name) - response_future = multi_callable.future(iter(requests), _TIMEOUT) + response_future = multi_callable.future(iter(requests), + test_constants.SHORT_TIMEOUT) self.assertIsInstance( response_future.exception(), exceptions.ExpirationError) with self.assertRaises(exceptions.ExpirationError): @@ -226,7 +228,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( with self.control.pause(): response_iterator = self.stub.inline_stream_in_stream_out( - name, iter(requests), _TIMEOUT) + name, iter(requests), test_constants.SHORT_TIMEOUT) with self.assertRaises(exceptions.ExpirationError): list(response_iterator) @@ -238,7 +240,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( with self.control.fail(): response_future = self.stub.future_value_in_value_out( - name, request, _TIMEOUT) + name, request, test_constants.SHORT_TIMEOUT) # Because the servicer fails outside of the thread from which the # servicer-side runtime called into it its failure is @@ -261,7 +263,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( # expiration of the RPC. with self.control.fail(), self.assertRaises(exceptions.ExpirationError): response_iterator = self.stub.inline_value_in_stream_out( - name, request, _TIMEOUT) + name, request, test_constants.SHORT_TIMEOUT) list(response_iterator) def testFailedStreamRequestUnaryResponse(self): @@ -272,7 +274,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( with self.control.fail(): response_future = self.stub.future_stream_in_value_out( - name, iter(requests), _TIMEOUT) + name, iter(requests), test_constants.SHORT_TIMEOUT) # Because the servicer fails outside of the thread from which the # servicer-side runtime called into it its failure is @@ -295,7 +297,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( # expiration of the RPC. with self.control.fail(), self.assertRaises(exceptions.ExpirationError): response_iterator = self.stub.inline_stream_in_stream_out( - name, iter(requests), _TIMEOUT) + name, iter(requests), test_constants.SHORT_TIMEOUT) list(response_iterator) def testParallelInvocations(self): @@ -305,10 +307,11 @@ class FutureInvocationAsynchronousEventServiceTestCase( first_request = test_messages.request() second_request = test_messages.request() + # TODO(bug 2039): use LONG_TIMEOUT instead first_response_future = self.stub.future_value_in_value_out( - name, first_request, _TIMEOUT) + name, first_request, test_constants.SHORT_TIMEOUT) second_response_future = self.stub.future_value_in_value_out( - name, second_request, _TIMEOUT) + name, second_request, test_constants.SHORT_TIMEOUT) first_response = first_response_future.result() second_response = second_response_future.result() @@ -327,7 +330,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( with self.control.pause(): response_future = self.stub.future_value_in_value_out( - name, request, _TIMEOUT) + name, request, test_constants.SHORT_TIMEOUT) cancel_method_return_value = response_future.cancel() self.assertFalse(cancel_method_return_value) @@ -341,7 +344,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( with self.control.pause(): response_iterator = self.stub.inline_value_in_stream_out( - name, request, _TIMEOUT) + name, request, test_constants.SHORT_TIMEOUT) response_iterator.cancel() with self.assertRaises(future.CancelledError): @@ -355,7 +358,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( with self.control.pause(): response_future = self.stub.future_stream_in_value_out( - name, iter(requests), _TIMEOUT) + name, iter(requests), test_constants.SHORT_TIMEOUT) cancel_method_return_value = response_future.cancel() self.assertFalse(cancel_method_return_value) @@ -369,7 +372,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( with self.control.pause(): response_iterator = self.stub.inline_stream_in_stream_out( - name, iter(requests), _TIMEOUT) + name, iter(requests), test_constants.SHORT_TIMEOUT) response_iterator.cancel() with self.assertRaises(future.CancelledError): diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/base/__init__.py b/src/python/grpcio_test/grpc_test/framework/interfaces/base/__init__.py new file mode 100644 index 0000000000..7086519106 --- /dev/null +++ b/src/python/grpcio_test/grpc_test/framework/interfaces/base/__init__.py @@ -0,0 +1,30 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/base/_control.py b/src/python/grpcio_test/grpc_test/framework/interfaces/base/_control.py new file mode 100644 index 0000000000..e4d2a7a0d7 --- /dev/null +++ b/src/python/grpcio_test/grpc_test/framework/interfaces/base/_control.py @@ -0,0 +1,568 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Part of the tests of the base interface of RPC Framework.""" + +import abc +import collections +import enum +import random # pylint: disable=unused-import +import threading +import time + +from grpc.framework.interfaces.base import base +from grpc_test.framework.common import test_constants +from grpc_test.framework.interfaces.base import _sequence +from grpc_test.framework.interfaces.base import _state +from grpc_test.framework.interfaces.base import test_interfaces # pylint: disable=unused-import + +_GROUP = 'base test cases test group' +_METHOD = 'base test cases test method' + +_PAYLOAD_RANDOM_SECTION_MAXIMUM_SIZE = test_constants.PAYLOAD_SIZE / 20 +_MINIMUM_PAYLOAD_SIZE = test_constants.PAYLOAD_SIZE / 600 + + +def _create_payload(randomness): + length = randomness.randint( + _MINIMUM_PAYLOAD_SIZE, test_constants.PAYLOAD_SIZE) + random_section_length = randomness.randint( + 0, min(_PAYLOAD_RANDOM_SECTION_MAXIMUM_SIZE, length)) + random_section = bytes( + bytearray( + randomness.getrandbits(8) for _ in range(random_section_length))) + sevens_section = '\x07' * (length - random_section_length) + return b''.join(randomness.sample((random_section, sevens_section), 2)) + + +def _anything_in_flight(state): + return ( + state.invocation_initial_metadata_in_flight is not None or + state.invocation_payloads_in_flight or + state.invocation_completion_in_flight is not None or + state.service_initial_metadata_in_flight is not None or + state.service_payloads_in_flight or + state.service_completion_in_flight is not None or + 0 < state.invocation_allowance_in_flight or + 0 < state.service_allowance_in_flight + ) + + +def _verify_service_advance_and_update_state( + initial_metadata, payload, completion, allowance, state, implementation): + if initial_metadata is not None: + if state.invocation_initial_metadata_received: + return 'Later invocation initial metadata received: %s' % ( + initial_metadata,) + if state.invocation_payloads_received: + return 'Invocation initial metadata received after payloads: %s' % ( + state.invocation_payloads_received) + if state.invocation_completion_received: + return 'Invocation initial metadata received after invocation completion!' + if not implementation.metadata_transmitted( + state.invocation_initial_metadata_in_flight, initial_metadata): + return 'Invocation initial metadata maltransmitted: %s, %s' % ( + state.invocation_initial_metadata_in_flight, initial_metadata) + else: + state.invocation_initial_metadata_in_flight = None + state.invocation_initial_metadata_received = True + + if payload is not None: + if state.invocation_completion_received: + return 'Invocation payload received after invocation completion!' + elif not state.invocation_payloads_in_flight: + return 'Invocation payload "%s" received but not in flight!' % (payload,) + elif state.invocation_payloads_in_flight[0] != payload: + return 'Invocation payload mismatch: %s, %s' % ( + state.invocation_payloads_in_flight[0], payload) + elif state.service_side_invocation_allowance < 1: + return 'Disallowed invocation payload!' + else: + state.invocation_payloads_in_flight.pop(0) + state.invocation_payloads_received += 1 + state.service_side_invocation_allowance -= 1 + + if completion is not None: + if state.invocation_completion_received: + return 'Later invocation completion received: %s' % (completion,) + elif not implementation.completion_transmitted( + state.invocation_completion_in_flight, completion): + return 'Invocation completion maltransmitted: %s, %s' % ( + state.invocation_completion_in_flight, completion) + else: + state.invocation_completion_in_flight = None + state.invocation_completion_received = True + + if allowance is not None: + if allowance <= 0: + return 'Illegal allowance value: %s' % (allowance,) + else: + state.service_allowance_in_flight -= allowance + state.service_side_service_allowance += allowance + + +def _verify_invocation_advance_and_update_state( + initial_metadata, payload, completion, allowance, state, implementation): + if initial_metadata is not None: + if state.service_initial_metadata_received: + return 'Later service initial metadata received: %s' % (initial_metadata,) + if state.service_payloads_received: + return 'Service initial metadata received after service payloads: %s' % ( + state.service_payloads_received) + if state.service_completion_received: + return 'Service initial metadata received after service completion!' + if not implementation.metadata_transmitted( + state.service_initial_metadata_in_flight, initial_metadata): + return 'Service initial metadata maltransmitted: %s, %s' % ( + state.service_initial_metadata_in_flight, initial_metadata) + else: + state.service_initial_metadata_in_flight = None + state.service_initial_metadata_received = True + + if payload is not None: + if state.service_completion_received: + return 'Service payload received after service completion!' + elif not state.service_payloads_in_flight: + return 'Service payload "%s" received but not in flight!' % (payload,) + elif state.service_payloads_in_flight[0] != payload: + return 'Service payload mismatch: %s, %s' % ( + state.invocation_payloads_in_flight[0], payload) + elif state.invocation_side_service_allowance < 1: + return 'Disallowed service payload!' + else: + state.service_payloads_in_flight.pop(0) + state.service_payloads_received += 1 + state.invocation_side_service_allowance -= 1 + + if completion is not None: + if state.service_completion_received: + return 'Later service completion received: %s' % (completion,) + elif not implementation.completion_transmitted( + state.service_completion_in_flight, completion): + return 'Service completion maltransmitted: %s, %s' % ( + state.service_completion_in_flight, completion) + else: + state.service_completion_in_flight = None + state.service_completion_received = True + + if allowance is not None: + if allowance <= 0: + return 'Illegal allowance value: %s' % (allowance,) + else: + state.invocation_allowance_in_flight -= allowance + state.invocation_side_service_allowance += allowance + + +class Invocation( + collections.namedtuple( + 'Invocation', + ('group', 'method', 'subscription_kind', 'timeout', 'initial_metadata', + 'payload', 'completion',))): + """A description of operation invocation. + + Attributes: + group: The group identifier for the operation. + method: The method identifier for the operation. + subscription_kind: A base.Subscription.Kind value describing the kind of + subscription to use for the operation. + timeout: A duration in seconds to pass as the timeout value for the + operation. + initial_metadata: An object to pass as the initial metadata for the + operation or None. + payload: An object to pass as a payload value for the operation or None. + completion: An object to pass as a completion value for the operation or + None. + """ + + +class OnAdvance( + collections.namedtuple( + 'OnAdvance', + ('kind', 'initial_metadata', 'payload', 'completion', 'allowance'))): + """Describes action to be taken in a test in response to an advance call. + + Attributes: + kind: A Kind value describing the overall kind of response. + initial_metadata: An initial metadata value to pass to a call of the advance + method of the operator under test. Only valid if kind is Kind.ADVANCE and + may be None. + payload: A payload value to pass to a call of the advance method of the + operator under test. Only valid if kind is Kind.ADVANCE and may be None. + completion: A base.Completion value to pass to a call of the advance method + of the operator under test. Only valid if kind is Kind.ADVANCE and may be + None. + allowance: An allowance value to pass to a call of the advance method of the + operator under test. Only valid if kind is Kind.ADVANCE and may be None. + """ + + @enum.unique + class Kind(enum.Enum): + ADVANCE = 'advance' + DEFECT = 'defect' + IDLE = 'idle' + + +_DEFECT_ON_ADVANCE = OnAdvance(OnAdvance.Kind.DEFECT, None, None, None, None) +_IDLE_ON_ADVANCE = OnAdvance(OnAdvance.Kind.IDLE, None, None, None, None) + + +class Instruction( + collections.namedtuple( + 'Instruction', + ('kind', 'advance_args', 'advance_kwargs', 'conclude_success', + 'conclude_message', 'conclude_invocation_outcome', + 'conclude_service_outcome',))): + """""" + + @enum.unique + class Kind(enum.Enum): + ADVANCE = 'ADVANCE' + CANCEL = 'CANCEL' + CONCLUDE = 'CONCLUDE' + + +class Controller(object): + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def failed(self, message): + """""" + raise NotImplementedError() + + @abc.abstractmethod + def serialize_request(self, request): + """""" + raise NotImplementedError() + + @abc.abstractmethod + def deserialize_request(self, serialized_request): + """""" + raise NotImplementedError() + + @abc.abstractmethod + def serialize_response(self, response): + """""" + raise NotImplementedError() + + @abc.abstractmethod + def deserialize_response(self, serialized_response): + """""" + raise NotImplementedError() + + @abc.abstractmethod + def invocation(self): + """""" + raise NotImplementedError() + + @abc.abstractmethod + def poll(self): + """""" + raise NotImplementedError() + + @abc.abstractmethod + def on_service_advance( + self, initial_metadata, payload, completion, allowance): + """""" + raise NotImplementedError() + + @abc.abstractmethod + def on_invocation_advance( + self, initial_metadata, payload, completion, allowance): + """""" + raise NotImplementedError() + + @abc.abstractmethod + def service_on_termination(self, outcome): + """""" + raise NotImplementedError() + + @abc.abstractmethod + def invocation_on_termination(self, outcome): + """""" + raise NotImplementedError() + + +class ControllerCreator(object): + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def name(self): + """""" + raise NotImplementedError() + + @abc.abstractmethod + def controller(self, implementation, randomness): + """""" + raise NotImplementedError() + + +class _Remainder( + collections.namedtuple( + '_Remainder', + ('invocation_payloads', 'service_payloads', 'invocation_completion', + 'service_completion',))): + """Describes work remaining to be done in a portion of a test. + + Attributes: + invocation_payloads: The number of payloads to be sent from the invocation + side of the operation to the service side of the operation. + service_payloads: The number of payloads to be sent from the service side of + the operation to the invocation side of the operation. + invocation_completion: Whether or not completion from the invocation side of + the operation should be indicated and has yet to be indicated. + service_completion: Whether or not completion from the service side of the + operation should be indicated and has yet to be indicated. + """ + + +class _SequenceController(Controller): + + def __init__(self, sequence, implementation, randomness): + """Constructor. + + Args: + sequence: A _sequence.Sequence describing the steps to be taken in the + test at a relatively high level. + implementation: A test_interfaces.Implementation encapsulating the + base interface implementation that is the system under test. + randomness: A random.Random instance for use in the test. + """ + self._condition = threading.Condition() + self._sequence = sequence + self._implementation = implementation + self._randomness = randomness + + self._until = None + self._remaining_elements = None + self._poll_next = None + self._message = None + + self._state = _state.OperationState() + self._todo = None + + # called with self._condition + def _failed(self, message): + self._message = message + self._condition.notify_all() + + def _passed(self, invocation_outcome, service_outcome): + self._poll_next = Instruction( + Instruction.Kind.CONCLUDE, None, None, True, None, invocation_outcome, + service_outcome) + self._condition.notify_all() + + def failed(self, message): + with self._condition: + self._failed(message) + + def serialize_request(self, request): + return request + request + + def deserialize_request(self, serialized_request): + return serialized_request[:len(serialized_request) / 2] + + def serialize_response(self, response): + return response * 3 + + def deserialize_response(self, serialized_response): + return serialized_response[2 * len(serialized_response) / 3:] + + def invocation(self): + with self._condition: + self._until = time.time() + self._sequence.maximum_duration + self._remaining_elements = list(self._sequence.elements) + if self._sequence.invocation.initial_metadata: + initial_metadata = self._implementation.invocation_initial_metadata() + self._state.invocation_initial_metadata_in_flight = initial_metadata + else: + initial_metadata = None + if self._sequence.invocation.payload: + payload = _create_payload(self._randomness) + self._state.invocation_payloads_in_flight.append(payload) + else: + payload = None + if self._sequence.invocation.complete: + completion = self._implementation.invocation_completion() + self._state.invocation_completion_in_flight = completion + else: + completion = None + return Invocation( + _GROUP, _METHOD, base.Subscription.Kind.FULL, + self._sequence.invocation.timeout, initial_metadata, payload, + completion) + + def poll(self): + with self._condition: + while True: + if self._message is not None: + return Instruction( + Instruction.Kind.CONCLUDE, None, None, False, self._message, None, + None) + elif self._poll_next: + poll_next = self._poll_next + self._poll_next = None + return poll_next + elif self._until < time.time(): + return Instruction( + Instruction.Kind.CONCLUDE, None, None, False, + 'overran allotted time!', None, None) + else: + self._condition.wait(timeout=self._until-time.time()) + + def on_service_advance( + self, initial_metadata, payload, completion, allowance): + with self._condition: + message = _verify_service_advance_and_update_state( + initial_metadata, payload, completion, allowance, self._state, + self._implementation) + if message is not None: + self._failed(message) + if self._todo is not None: + raise ValueError('TODO!!!') + elif _anything_in_flight(self._state): + return _IDLE_ON_ADVANCE + elif self._remaining_elements: + element = self._remaining_elements.pop(0) + if element.kind is _sequence.Element.Kind.SERVICE_TRANSMISSION: + if element.transmission.initial_metadata: + initial_metadata = self._implementation.service_initial_metadata() + self._state.service_initial_metadata_in_flight = initial_metadata + else: + initial_metadata = None + if element.transmission.payload: + payload = _create_payload(self._randomness) + self._state.service_payloads_in_flight.append(payload) + self._state.service_side_service_allowance -= 1 + else: + payload = None + if element.transmission.complete: + completion = self._implementation.service_completion() + self._state.service_completion_in_flight = completion + else: + completion = None + if (not self._state.invocation_completion_received and + 0 <= self._state.service_side_invocation_allowance): + allowance = 1 + self._state.service_side_invocation_allowance += 1 + self._state.invocation_allowance_in_flight += 1 + else: + allowance = None + return OnAdvance( + OnAdvance.Kind.ADVANCE, initial_metadata, payload, completion, + allowance) + else: + raise ValueError('TODO!!!') + else: + return _IDLE_ON_ADVANCE + + def on_invocation_advance( + self, initial_metadata, payload, completion, allowance): + with self._condition: + message = _verify_invocation_advance_and_update_state( + initial_metadata, payload, completion, allowance, self._state, + self._implementation) + if message is not None: + self._failed(message) + if self._todo is not None: + raise ValueError('TODO!!!') + elif _anything_in_flight(self._state): + return _IDLE_ON_ADVANCE + elif self._remaining_elements: + element = self._remaining_elements.pop(0) + if element.kind is _sequence.Element.Kind.INVOCATION_TRANSMISSION: + if element.transmission.initial_metadata: + initial_metadata = self._implementation.invocation_initial_metadata() + self._state.invocation_initial_metadata_in_fight = initial_metadata + else: + initial_metadata = None + if element.transmission.payload: + payload = _create_payload(self._randomness) + self._state.invocation_payloads_in_flight.append(payload) + self._state.invocation_side_invocation_allowance -= 1 + else: + payload = None + if element.transmission.complete: + completion = self._implementation.invocation_completion() + self._state.invocation_completion_in_flight = completion + else: + completion = None + if (not self._state.service_completion_received and + 0 <= self._state.invocation_side_service_allowance): + allowance = 1 + self._state.invocation_side_service_allowance += 1 + self._state.service_allowance_in_flight += 1 + else: + allowance = None + return OnAdvance( + OnAdvance.Kind.ADVANCE, initial_metadata, payload, completion, + allowance) + else: + raise ValueError('TODO!!!') + else: + return _IDLE_ON_ADVANCE + + def service_on_termination(self, outcome): + with self._condition: + self._state.service_side_outcome = outcome + if self._todo is not None or self._remaining_elements: + self._failed('Premature service-side outcome %s!' % (outcome,)) + elif outcome is not self._sequence.outcome.service: + self._failed( + 'Incorrect service-side outcome: %s should have been %s' % ( + outcome, self._sequence.outcome.service)) + elif self._state.invocation_side_outcome is not None: + self._passed(self._state.invocation_side_outcome, outcome) + + def invocation_on_termination(self, outcome): + with self._condition: + self._state.invocation_side_outcome = outcome + if self._todo is not None or self._remaining_elements: + self._failed('Premature invocation-side outcome %s!' % (outcome,)) + elif outcome is not self._sequence.outcome.invocation: + self._failed( + 'Incorrect invocation-side outcome: %s should have been %s' % ( + outcome, self._sequence.outcome.invocation)) + elif self._state.service_side_outcome is not None: + self._passed(outcome, self._state.service_side_outcome) + + +class _SequenceControllerCreator(ControllerCreator): + + def __init__(self, sequence): + self._sequence = sequence + + def name(self): + return self._sequence.name + + def controller(self, implementation, randomness): + return _SequenceController(self._sequence, implementation, randomness) + + +CONTROLLER_CREATORS = tuple( + _SequenceControllerCreator(sequence) for sequence in _sequence.SEQUENCES) diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/base/_sequence.py b/src/python/grpcio_test/grpc_test/framework/interfaces/base/_sequence.py new file mode 100644 index 0000000000..1d77aaebe6 --- /dev/null +++ b/src/python/grpcio_test/grpc_test/framework/interfaces/base/_sequence.py @@ -0,0 +1,168 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Part of the tests of the base interface of RPC Framework.""" + +import collections +import enum + +from grpc.framework.interfaces.base import base +from grpc_test.framework.common import test_constants + + +class Invocation( + collections.namedtuple( + 'Invocation', ('timeout', 'initial_metadata', 'payload', 'complete',))): + """A recipe for operation invocation. + + Attributes: + timeout: A duration in seconds to pass to the system under test as the + operation's timeout value. + initial_metadata: A boolean indicating whether or not to pass initial + metadata when invoking the operation. + payload: A boolean indicating whether or not to pass a payload when + invoking the operation. + complete: A boolean indicating whether or not to indicate completion of + transmissions from the invoking side of the operation when invoking the + operation. + """ + + +class Transmission( + collections.namedtuple( + 'Transmission', ('initial_metadata', 'payload', 'complete',))): + """A recipe for a single transmission in an operation. + + Attributes: + initial_metadata: A boolean indicating whether or not to pass initial + metadata as part of the transmission. + payload: A boolean indicating whether or not to pass a payload as part of + the transmission. + complete: A boolean indicating whether or not to indicate completion of + transmission from the transmitting side of the operation as part of the + transmission. + """ + + +class Intertransmission( + collections.namedtuple('Intertransmission', ('invocation', 'service',))): + """A recipe for multiple transmissions in an operation. + + Attributes: + invocation: An integer describing the number of payloads to send from the + invocation side of the operation to the service side. + service: An integer describing the number of payloads to send from the + service side of the operation to the invocation side. + """ + + +class Element(collections.namedtuple('Element', ('kind', 'transmission',))): + """A sum type for steps to perform when testing an operation. + + Attributes: + kind: A Kind value describing the kind of step to perform in the test. + transmission: Only valid for kinds Kind.INVOCATION_TRANSMISSION and + Kind.SERVICE_TRANSMISSION, a Transmission value describing the details of + the transmission to be made. + """ + + @enum.unique + class Kind(enum.Enum): + INVOCATION_TRANSMISSION = 'invocation transmission' + SERVICE_TRANSMISSION = 'service transmission' + INTERTRANSMISSION = 'intertransmission' + INVOCATION_CANCEL = 'invocation cancel' + SERVICE_CANCEL = 'service cancel' + INVOCATION_FAILURE = 'invocation failure' + SERVICE_FAILURE = 'service failure' + + +class Outcome(collections.namedtuple('Outcome', ('invocation', 'service',))): + """A description of the expected outcome of an operation test. + + Attributes: + invocation: The base.Outcome value expected on the invocation side of the + operation. + service: The base.Outcome value expected on the service side of the + operation. + """ + + +class Sequence( + collections.namedtuple( + 'Sequence', + ('name', 'maximum_duration', 'invocation', 'elements', 'outcome',))): + """Describes at a high level steps to perform in a test. + + Attributes: + name: The string name of the sequence. + maximum_duration: A length of time in seconds to allow for the test before + declaring it to have failed. + invocation: An Invocation value describing how to invoke the operation + under test. + elements: A sequence of Element values describing at coarse granularity + actions to take during the operation under test. + outcome: An Outcome value describing the expected outcome of the test. + """ + +_EASY = Sequence( + 'Easy', + test_constants.TIME_ALLOWANCE, + Invocation(test_constants.LONG_TIMEOUT, True, True, True), + ( + Element( + Element.Kind.SERVICE_TRANSMISSION, Transmission(True, True, True)), + ), + Outcome(base.Outcome.COMPLETED, base.Outcome.COMPLETED)) + +_PEASY = Sequence( + 'Peasy', + test_constants.TIME_ALLOWANCE, + Invocation(test_constants.LONG_TIMEOUT, True, True, False), + ( + Element( + Element.Kind.SERVICE_TRANSMISSION, Transmission(True, True, False)), + Element( + Element.Kind.INVOCATION_TRANSMISSION, + Transmission(False, True, True)), + Element( + Element.Kind.SERVICE_TRANSMISSION, Transmission(False, True, True)), + ), + Outcome(base.Outcome.COMPLETED, base.Outcome.COMPLETED)) + + +# TODO(issue 2959): Finish this test suite. This tuple of sequences should +# contain at least the values in the Cartesian product of (half-duplex, +# full-duplex) * (zero payloads, one payload, test_constants.STREAM_LENGTH +# payloads) * (completion, cancellation, expiration, programming defect in +# servicer code). +SEQUENCES = ( + _EASY, + _PEASY, +) diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/base/_state.py b/src/python/grpcio_test/grpc_test/framework/interfaces/base/_state.py new file mode 100644 index 0000000000..21cf33aeb6 --- /dev/null +++ b/src/python/grpcio_test/grpc_test/framework/interfaces/base/_state.py @@ -0,0 +1,55 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Part of the tests of the base interface of RPC Framework.""" + + +class OperationState(object): + + def __init__(self): + self.invocation_initial_metadata_in_flight = None + self.invocation_initial_metadata_received = False + self.invocation_payloads_in_flight = [] + self.invocation_payloads_received = 0 + self.invocation_completion_in_flight = None + self.invocation_completion_received = False + self.service_initial_metadata_in_flight = None + self.service_initial_metadata_received = False + self.service_payloads_in_flight = [] + self.service_payloads_received = 0 + self.service_completion_in_flight = None + self.service_completion_received = False + self.invocation_side_invocation_allowance = 1 + self.invocation_side_service_allowance = 1 + self.service_side_invocation_allowance = 1 + self.service_side_service_allowance = 1 + self.invocation_allowance_in_flight = 0 + self.service_allowance_in_flight = 0 + self.invocation_side_outcome = None + self.service_side_outcome = None diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py b/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py new file mode 100644 index 0000000000..5c8b176da4 --- /dev/null +++ b/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py @@ -0,0 +1,262 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Tests of the base interface of RPC Framework.""" + +import logging +import random +import threading +import time +import unittest + +from grpc.framework.foundation import logging_pool +from grpc.framework.interfaces.base import base +from grpc.framework.interfaces.base import utilities +from grpc_test.framework.common import test_constants +from grpc_test.framework.interfaces.base import _control +from grpc_test.framework.interfaces.base import test_interfaces + +_SYNCHRONICITY_VARIATION = (('Sync', False), ('Async', True)) + +_EMPTY_OUTCOME_DICT = {outcome: 0 for outcome in base.Outcome} + + +class _Serialization(test_interfaces.Serialization): + + def serialize_request(self, request): + return request + request + + def deserialize_request(self, serialized_request): + return serialized_request[:len(serialized_request) / 2] + + def serialize_response(self, response): + return response * 3 + + def deserialize_response(self, serialized_response): + return serialized_response[2 * len(serialized_response) / 3:] + + +def _advance(quadruples, operator, controller): + try: + for quadruple in quadruples: + operator.advance( + initial_metadata=quadruple[0], payload=quadruple[1], + completion=quadruple[2], allowance=quadruple[3]) + except Exception as e: # pylint: disable=broad-except + controller.failed('Exception on advance: %e' % e) + + +class _Operator(base.Operator): + + def __init__(self, controller, on_advance, pool, operator_under_test): + self._condition = threading.Condition() + self._controller = controller + self._on_advance = on_advance + self._pool = pool + self._operator_under_test = operator_under_test + self._pending_advances = [] + + def set_operator_under_test(self, operator_under_test): + with self._condition: + self._operator_under_test = operator_under_test + pent_advances = self._pending_advances + self._pending_advances = [] + pool = self._pool + controller = self._controller + + if pool is None: + _advance(pent_advances, operator_under_test, controller) + else: + pool.submit(_advance, pent_advances, operator_under_test, controller) + + def advance( + self, initial_metadata=None, payload=None, completion=None, + allowance=None): + on_advance = self._on_advance( + initial_metadata, payload, completion, allowance) + if on_advance.kind is _control.OnAdvance.Kind.ADVANCE: + with self._condition: + pool = self._pool + operator_under_test = self._operator_under_test + controller = self._controller + + quadruple = ( + on_advance.initial_metadata, on_advance.payload, + on_advance.completion, on_advance.allowance) + if pool is None: + _advance((quadruple,), operator_under_test, controller) + else: + pool.submit(_advance, (quadruple,), operator_under_test, controller) + elif on_advance.kind is _control.OnAdvance.Kind.DEFECT: + raise ValueError( + 'Deliberately raised exception from Operator.advance (in a test)!') + + +class _Servicer(base.Servicer): + """An base.Servicer with instrumented for testing.""" + + def __init__(self, group, method, controllers, pool): + self._condition = threading.Condition() + self._group = group + self._method = method + self._pool = pool + self._controllers = list(controllers) + + def service(self, group, method, context, output_operator): + with self._condition: + controller = self._controllers.pop(0) + if group != self._group or method != self._method: + controller.fail( + '%s != %s or %s != %s' % (group, self._group, method, self._method)) + raise base.NoSuchMethodError() + else: + operator = _Operator( + controller, controller.on_service_advance, self._pool, + output_operator) + outcome = context.add_termination_callback( + controller.service_on_termination) + if outcome is not None: + controller.service_on_termination(outcome) + return utilities.full_subscription(operator) + + +class _OperationTest(unittest.TestCase): + + def setUp(self): + if self._synchronicity_variation: + self._pool = logging_pool.pool(test_constants.POOL_SIZE) + else: + self._pool = None + self._controller = self._controller_creator.controller( + self._implementation, self._randomness) + + def tearDown(self): + if self._synchronicity_variation: + self._pool.shutdown(wait=True) + else: + self._pool = None + + def test_operation(self): + invocation = self._controller.invocation() + if invocation.subscription_kind is base.Subscription.Kind.FULL: + test_operator = _Operator( + self._controller, self._controller.on_invocation_advance, + self._pool, None) + subscription = utilities.full_subscription(test_operator) + else: + # TODO(nathaniel): support and test other subscription kinds. + self.fail('Non-full subscriptions not yet supported!') + + servicer = _Servicer( + invocation.group, invocation.method, (self._controller,), self._pool) + + invocation_end, service_end, memo = self._implementation.instantiate( + {(invocation.group, invocation.method): _Serialization()}, servicer) + + try: + invocation_end.start() + service_end.start() + operation_context, operator_under_test = invocation_end.operate( + invocation.group, invocation.method, subscription, invocation.timeout, + initial_metadata=invocation.initial_metadata, payload=invocation.payload, + completion=invocation.completion) + test_operator.set_operator_under_test(operator_under_test) + outcome = operation_context.add_termination_callback( + self._controller.invocation_on_termination) + if outcome is not None: + self._controller.invocation_on_termination(outcome) + except Exception as e: # pylint: disable=broad-except + self._controller.failed('Exception on invocation: %s' % e) + self.fail(e) + + while True: + instruction = self._controller.poll() + if instruction.kind is _control.Instruction.Kind.ADVANCE: + try: + test_operator.advance( + *instruction.advance_args, **instruction.advance_kwargs) + except Exception as e: # pylint: disable=broad-except + self._controller.failed('Exception on instructed advance: %s' % e) + elif instruction.kind is _control.Instruction.Kind.CANCEL: + try: + operation_context.cancel() + except Exception as e: # pylint: disable=broad-except + self._controller.failed('Exception on cancel: %s' % e) + elif instruction.kind is _control.Instruction.Kind.CONCLUDE: + break + + invocation_stop_event = invocation_end.stop(0) + service_stop_event = service_end.stop(0) + invocation_stop_event.wait() + service_stop_event.wait() + invocation_stats = invocation_end.operation_stats() + service_stats = service_end.operation_stats() + + self._implementation.destantiate(memo) + + self.assertTrue( + instruction.conclude_success, msg=instruction.conclude_message) + + expected_invocation_stats = dict(_EMPTY_OUTCOME_DICT) + expected_invocation_stats[instruction.conclude_invocation_outcome] += 1 + self.assertDictEqual(expected_invocation_stats, invocation_stats) + expected_service_stats = dict(_EMPTY_OUTCOME_DICT) + expected_service_stats[instruction.conclude_service_outcome] += 1 + self.assertDictEqual(expected_service_stats, service_stats) + + +def test_cases(implementation): + """Creates unittest.TestCase classes for a given Base implementation. + + Args: + implementation: A test_interfaces.Implementation specifying creation and + destruction of the Base implementation under test. + + Returns: + A sequence of subclasses of unittest.TestCase defining tests of the + specified Base layer implementation. + """ + random_seed = hash(time.time()) + logging.warning('Random seed for this execution: %s', random_seed) + randomness = random.Random(x=random_seed) + + test_case_classes = [] + for synchronicity_variation in _SYNCHRONICITY_VARIATION: + for controller_creator in _control.CONTROLLER_CREATORS: + name = ''.join( + (synchronicity_variation[0], controller_creator.name(), 'Test',)) + test_case_classes.append( + type(name, (_OperationTest,), + {'_implementation': implementation, + '_randomness': randomness, + '_synchronicity_variation': synchronicity_variation[1], + '_controller_creator': controller_creator, + })) + + return test_case_classes diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_interfaces.py b/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_interfaces.py new file mode 100644 index 0000000000..02426ab846 --- /dev/null +++ b/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_interfaces.py @@ -0,0 +1,186 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Interfaces used in tests of implementations of the Base layer.""" + +import abc + +from grpc.framework.interfaces.base import base # pylint: disable=unused-import + + +class Serialization(object): + """Specifies serialization and deserialization of test payloads.""" + __metaclass__ = abc.ABCMeta + + def serialize_request(self, request): + """Serializes a request value used in a test. + + Args: + request: A request value created by a test. + + Returns: + A bytestring that is the serialization of the given request. + """ + raise NotImplementedError() + + def deserialize_request(self, serialized_request): + """Deserializes a request value used in a test. + + Args: + serialized_request: A bytestring that is the serialization of some request + used in a test. + + Returns: + The request value encoded by the given bytestring. + """ + raise NotImplementedError() + + def serialize_response(self, response): + """Serializes a response value used in a test. + + Args: + response: A response value created by a test. + + Returns: + A bytestring that is the serialization of the given response. + """ + raise NotImplementedError() + + def deserialize_response(self, serialized_response): + """Deserializes a response value used in a test. + + Args: + serialized_response: A bytestring that is the serialization of some + response used in a test. + + Returns: + The response value encoded by the given bytestring. + """ + raise NotImplementedError() + + +class Implementation(object): + """Specifies an implementation of the Base layer.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def instantiate(self, serializations, servicer): + """Instantiates the Base layer implementation to be used in a test. + + Args: + serializations: A dict from group-method pair to Serialization object + specifying how to serialize and deserialize payload values used in the + test. + servicer: A base.Servicer object to be called to service RPCs made during + the test. + + Returns: + A sequence of length three the first element of which is a + base.End to be used to invoke RPCs, the second element of which is a + base.End to be used to service invoked RPCs, and the third element of + which is an arbitrary memo object to be kept and passed to destantiate + at the conclusion of the test. + """ + raise NotImplementedError() + + @abc.abstractmethod + def destantiate(self, memo): + """Destroys the Base layer implementation under test. + + Args: + memo: The object from the third position of the return value of a call to + instantiate. + """ + raise NotImplementedError() + + @abc.abstractmethod + def invocation_initial_metadata(self): + """Provides an operation's invocation-side initial metadata. + + Returns: + A value to use for an operation's invocation-side initial metadata, or + None. + """ + raise NotImplementedError() + + @abc.abstractmethod + def service_initial_metadata(self): + """Provices an operation's service-side initial metadata. + + Returns: + A value to use for an operation's service-side initial metadata, or + None. + """ + raise NotImplementedError() + + @abc.abstractmethod + def invocation_completion(self): + """Provides an operation's invocation-side completion. + + Returns: + A base.Completion to use for an operation's invocation-side completion. + """ + raise NotImplementedError() + + @abc.abstractmethod + def service_completion(self): + """Provides an operation's service-side completion. + + Returns: + A base.Completion to use for an operation's service-side completion. + """ + raise NotImplementedError() + + @abc.abstractmethod + def metadata_transmitted(self, original_metadata, transmitted_metadata): + """Identifies whether or not metadata was properly transmitted. + + Args: + original_metadata: A metadata value passed to the system under test. + transmitted_metadata: The same metadata value after having been + transmitted through the system under test. + + Returns: + Whether or not the metadata was properly transmitted. + """ + raise NotImplementedError() + + @abc.abstractmethod + def completion_transmitted(self, original_completion, transmitted_completion): + """Identifies whether or not a base.Completion was properly transmitted. + + Args: + original_completion: A base.Completion passed to the system under test. + transmitted_completion: The same completion value after having been + transmitted through the system under test. + + Returns: + Whether or not the completion was properly transmitted. + """ + raise NotImplementedError() diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_cases.py b/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_cases.py index 26ca035c44..1e575d1a9e 100644 --- a/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_cases.py +++ b/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_cases.py @@ -303,16 +303,9 @@ class TransmissionTest(object): invocation_message, links.Ticket.Termination.COMPLETION) self._invocation_link.accept_ticket(original_invocation_ticket) - # TODO(nathaniel): This shouldn't be necessary. Detecting the end of the - # invocation-side ticket sequence shouldn't require granting allowance for - # another payload. self._service_mate.block_until_tickets_satisfy( at_least_n_payloads_received_predicate(1)) service_operation_id = self._service_mate.tickets()[0].operation_id - self._service_link.accept_ticket( - links.Ticket( - service_operation_id, 0, None, None, links.Ticket.Subscription.FULL, - None, 1, None, None, None, None, None, None)) self._service_mate.block_until_tickets_satisfy(terminated) self._assert_is_valid_invocation_sequence( @@ -321,7 +314,7 @@ class TransmissionTest(object): invocation_terminal_metadata, links.Ticket.Termination.COMPLETION) original_service_ticket = links.Ticket( - service_operation_id, 1, None, None, links.Ticket.Subscription.FULL, + service_operation_id, 0, None, None, links.Ticket.Subscription.FULL, timeout, 0, service_initial_metadata, service_payload, service_terminal_metadata, service_code, service_message, links.Ticket.Termination.COMPLETION) diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_utilities.py b/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_utilities.py index 6c2e3346aa..a2bd7107c1 100644 --- a/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_utilities.py +++ b/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_utilities.py @@ -29,9 +29,42 @@ """State and behavior appropriate for use in tests.""" +import logging import threading +import time from grpc.framework.interfaces.links import links +from grpc.framework.interfaces.links import utilities + +# A more-or-less arbitrary limit on the length of raw data values to be logged. +_UNCOMFORTABLY_LONG = 48 + + +def _safe_for_log_ticket(ticket): + """Creates a safe-for-printing-to-the-log ticket for a given ticket. + + Args: + ticket: Any links.Ticket. + + Returns: + A links.Ticket that is as much as can be equal to the given ticket but + possibly features values like the string "<payload of length 972321>" in + place of the actual values of the given ticket. + """ + if isinstance(ticket.payload, (basestring,)): + payload_length = len(ticket.payload) + else: + payload_length = -1 + if payload_length < _UNCOMFORTABLY_LONG: + return ticket + else: + return links.Ticket( + ticket.operation_id, ticket.sequence_number, + ticket.group, ticket.method, ticket.subscription, ticket.timeout, + ticket.allowance, ticket.initial_metadata, + '<payload of length {}>'.format(payload_length), + ticket.terminal_metadata, ticket.code, ticket.message, + ticket.termination) class RecordingLink(links.Link): @@ -64,3 +97,71 @@ class RecordingLink(links.Link): """Returns a copy of the list of all tickets received by this Link.""" with self._condition: return tuple(self._tickets) + + +class _Pipe(object): + """A conduit that logs all tickets passed through it.""" + + def __init__(self, name): + self._lock = threading.Lock() + self._name = name + self._left_mate = utilities.NULL_LINK + self._right_mate = utilities.NULL_LINK + + def accept_left_to_right_ticket(self, ticket): + with self._lock: + logging.warning( + '%s: moving left to right through %s: %s', time.time(), self._name, + _safe_for_log_ticket(ticket)) + try: + self._right_mate.accept_ticket(ticket) + except Exception as e: # pylint: disable=broad-except + logging.exception(e) + + def accept_right_to_left_ticket(self, ticket): + with self._lock: + logging.warning( + '%s: moving right to left through %s: %s', time.time(), self._name, + _safe_for_log_ticket(ticket)) + try: + self._left_mate.accept_ticket(ticket) + except Exception as e: # pylint: disable=broad-except + logging.exception(e) + + def join_left_mate(self, left_mate): + with self._lock: + self._left_mate = utilities.NULL_LINK if left_mate is None else left_mate + + def join_right_mate(self, right_mate): + with self._lock: + self._right_mate = ( + utilities.NULL_LINK if right_mate is None else right_mate) + + +class _Facade(links.Link): + + def __init__(self, accept, join): + self._accept = accept + self._join = join + + def accept_ticket(self, ticket): + self._accept(ticket) + + def join_link(self, link): + self._join(link) + + +def logging_links(name): + """Creates a conduit that logs all tickets passed through it. + + Args: + name: A name to use for the conduit to identify itself in logging output. + + Returns: + Two links.Links, the first of which is the "left" side of the conduit + and the second of which is the "right" side of the conduit. + """ + pipe = _Pipe(name) + left_facade = _Facade(pipe.accept_left_to_right_ticket, pipe.join_left_mate) + right_facade = _Facade(pipe.accept_right_to_left_ticket, pipe.join_right_mate) + return left_facade, right_facade diff --git a/src/python/grpcio_test/setup.py b/src/python/grpcio_test/setup.py index 925c32720f..a6203cae2d 100644 --- a/src/python/grpcio_test/setup.py +++ b/src/python/grpcio_test/setup.py @@ -48,8 +48,13 @@ _PACKAGE_DIRECTORIES = { _PACKAGE_DATA = { 'grpc_interop': [ - 'credentials/ca.pem', 'credentials/server1.key', - 'credentials/server1.pem',] + 'credentials/ca.pem', + 'credentials/server1.key', + 'credentials/server1.pem', + ], + 'grpc_protoc_plugin': [ + 'test.proto', + ], } _SETUP_REQUIRES = ( @@ -75,5 +80,5 @@ setuptools.setup( package_data=_PACKAGE_DATA, install_requires=_INSTALL_REQUIRES + _SETUP_REQUIRES, setup_requires=_SETUP_REQUIRES, - cmdclass=_COMMAND_CLASS + cmdclass=_COMMAND_CLASS, ) |