diff options
Diffstat (limited to 'src')
29 files changed, 4090 insertions, 1 deletions
diff --git a/src/python/setup.py b/src/python/setup.py index 26019aaf5b..58dc3b17df 100644 --- a/src/python/setup.py +++ b/src/python/setup.py @@ -31,7 +31,39 @@ from distutils import core as _core +_EXTENSION_SOURCES = ( + 'src/_adapter/_c.c', + 'src/_adapter/_call.c', + 'src/_adapter/_channel.c', + 'src/_adapter/_completion_queue.c', + 'src/_adapter/_error.c', + 'src/_adapter/_server.c', +) + +_EXTENSION_INCLUDE_DIRECTORIES = ( + 'src', + # TODO(nathaniel): Can this path specification be made to work? + #'../../include', +) + +_EXTENSION_LIBRARIES = ( + 'gpr', + 'grpc', +) + +_EXTENSION_LIBRARY_DIRECTORIES = ( + # TODO(nathaniel): Can this path specification be made to work? + #'../../libs/dbg', +) + +_EXTENSION_MODULE = _core.Extension( + '_adapter._c', sources=list(_EXTENSION_SOURCES), + include_dirs=_EXTENSION_INCLUDE_DIRECTORIES, + libraries=_EXTENSION_LIBRARIES, + library_dirs=_EXTENSION_LIBRARY_DIRECTORIES) + _PACKAGES=( + '_adapter', '_framework', '_framework.base', '_framework.base.packets', @@ -43,10 +75,12 @@ _PACKAGES=( ) _PACKAGE_DIRECTORIES = { + '_adapter': 'src/_adapter', '_framework': 'src/_framework', '_junkdrawer': 'src/_junkdrawer', } _core.setup( - name='grpc', version='0.0.1', packages=_PACKAGES, + name='grpc', version='0.0.1', + ext_modules=[_EXTENSION_MODULE], packages=_PACKAGES, package_dir=_PACKAGE_DIRECTORIES) diff --git a/src/python/src/_adapter/__init__.py b/src/python/src/_adapter/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/src/python/src/_adapter/__init__.py diff --git a/src/python/src/_adapter/_blocking_invocation_inline_service_test.py b/src/python/src/_adapter/_blocking_invocation_inline_service_test.py new file mode 100644 index 0000000000..873ce9a5a4 --- /dev/null +++ b/src/python/src/_adapter/_blocking_invocation_inline_service_test.py @@ -0,0 +1,17 @@ +"""One of the tests of the Face layer of RPC Framework.""" + +import unittest + +from _adapter import _face_test_case +from _framework.face.testing import blocking_invocation_inline_service_test_case as test_case + + +class BlockingInvocationInlineServiceTest( + _face_test_case.FaceTestCase, + test_case.BlockingInvocationInlineServiceTestCase, + unittest.TestCase): + pass + + +if __name__ == '__main__': + unittest.main() diff --git a/src/python/src/_adapter/_c.c b/src/python/src/_adapter/_c.c new file mode 100644 index 0000000000..d1f7fbb0d5 --- /dev/null +++ b/src/python/src/_adapter/_c.c @@ -0,0 +1,77 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <Python.h> +#include <grpc/grpc.h> + +#include "_adapter/_completion_queue.h" +#include "_adapter/_channel.h" +#include "_adapter/_call.h" +#include "_adapter/_server.h" + +static PyObject *init(PyObject *self, PyObject *args) { + grpc_init(); + Py_RETURN_NONE; +} + +static PyObject *shutdown(PyObject *self, PyObject *args) { + grpc_shutdown(); + Py_RETURN_NONE; +} + +static PyMethodDef _c_methods[] = { + {"init", init, METH_VARARGS, "Initialize the module's static state."}, + {"shut_down", shutdown, METH_VARARGS, + "Shut down the module's static state."}, + {NULL}, +}; + +PyMODINIT_FUNC init_c(void) { + PyObject *module; + + module = Py_InitModule3("_c", _c_methods, + "Wrappings of C structures and functions."); + + if (pygrpc_add_completion_queue(module) == -1) { + return; + } + if (pygrpc_add_channel(module) == -1) { + return; + } + if (pygrpc_add_call(module) == -1) { + return; + } + if (pygrpc_add_server(module) == -1) { + return; + } +} diff --git a/src/python/src/_adapter/_c_test.py b/src/python/src/_adapter/_c_test.py new file mode 100644 index 0000000000..bc0a622cc4 --- /dev/null +++ b/src/python/src/_adapter/_c_test.py @@ -0,0 +1,141 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Tests for _adapter._c.""" + +import threading +import time +import unittest + +from _adapter import _c +from _adapter import _datatypes + +_TIMEOUT = 3 +_FUTURE = time.time() + 60 * 60 * 24 +_IDEMPOTENCE_DEMONSTRATION = 7 + + +class _CTest(unittest.TestCase): + + def testUpAndDown(self): + _c.init() + _c.shut_down() + + def testCompletionQueue(self): + _c.init() + + completion_queue = _c.CompletionQueue() + event = completion_queue.get(0) + self.assertIsNone(event) + event = completion_queue.get(time.time()) + self.assertIsNone(event) + event = completion_queue.get(time.time() + _TIMEOUT) + self.assertIsNone(event) + completion_queue.stop() + for _ in range(_IDEMPOTENCE_DEMONSTRATION): + event = completion_queue.get(time.time() + _TIMEOUT) + self.assertIs(event.kind, _datatypes.Event.Kind.STOP) + + del completion_queue + del event + + _c.shut_down() + + def testChannel(self): + _c.init() + + channel = _c.Channel('test host:12345') + del channel + + _c.shut_down() + + def testCall(self): + method = 'test method' + host = 'test host' + + _c.init() + + channel = _c.Channel('%s:%d' % (host, 12345)) + call = _c.Call(channel, method, host, time.time() + _TIMEOUT) + del call + del channel + + _c.shut_down() + + def testServer(self): + _c.init() + + completion_queue = _c.CompletionQueue() + server = _c.Server(completion_queue) + server.add_http2_addr('[::]:0') + server.start() + server.stop() + completion_queue.stop() + del server + del completion_queue + + service_tag = object() + completion_queue = _c.CompletionQueue() + server = _c.Server(completion_queue) + server.add_http2_addr('[::]:0') + server.start() + server.service(service_tag) + server.stop() + completion_queue.stop() + event = completion_queue.get(time.time() + _TIMEOUT) + self.assertIs(event.kind, _datatypes.Event.Kind.SERVICE_ACCEPTED) + self.assertIs(event.tag, service_tag) + self.assertIsNone(event.service_acceptance) + for _ in range(_IDEMPOTENCE_DEMONSTRATION): + event = completion_queue.get(time.time() + _TIMEOUT) + self.assertIs(event.kind, _datatypes.Event.Kind.STOP) + del server + del completion_queue + + completion_queue = _c.CompletionQueue() + server = _c.Server(completion_queue) + server.add_http2_addr('[::]:0') + server.start() + thread = threading.Thread(target=completion_queue.get, args=(_FUTURE,)) + thread.start() + time.sleep(1) + server.stop() + completion_queue.stop() + for _ in range(_IDEMPOTENCE_DEMONSTRATION): + event = completion_queue.get(time.time() + _TIMEOUT) + self.assertIs(event.kind, _datatypes.Event.Kind.STOP) + thread.join() + del server + del completion_queue + + _c.shut_down() + + +if __name__ == '__main__': + unittest.main() diff --git a/src/python/src/_adapter/_call.c b/src/python/src/_adapter/_call.c new file mode 100644 index 0000000000..1f91090f7d --- /dev/null +++ b/src/python/src/_adapter/_call.c @@ -0,0 +1,292 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "_adapter/_call.h" + +#include <math.h> +#include <Python.h> +#include <grpc/grpc.h> + +#include "_adapter/_channel.h" +#include "_adapter/_completion_queue.h" +#include "_adapter/_error.h" + +static int pygrpc_call_init(Call *self, PyObject *args, PyObject *kwds) { + const PyObject *channel; + const char *method; + const char *host; + const double deadline; + + if (!PyArg_ParseTuple(args, "O!ssd", &pygrpc_ChannelType, &channel, &method, + &host, &deadline)) { + self->c_call = NULL; + return -1; + } + + /* TODO(nathaniel): Hoist the gpr_timespec <-> PyFloat arithmetic into its own + * function with its own test coverage. + */ + self->c_call = + grpc_channel_create_call(((Channel *)channel)->c_channel, method, host, + gpr_time_from_nanos(deadline * GPR_NS_PER_SEC)); + + return 0; +} + +static void pygrpc_call_dealloc(Call *self) { + if (self->c_call != NULL) { + grpc_call_destroy(self->c_call); + } + self->ob_type->tp_free((PyObject *)self); +} + +static const PyObject *pygrpc_call_invoke(Call *self, PyObject *args) { + const PyObject *completion_queue; + const PyObject *metadata_tag; + const PyObject *finish_tag; + grpc_call_error call_error; + const PyObject *result; + + if (!(PyArg_ParseTuple(args, "O!OO", &pygrpc_CompletionQueueType, + &completion_queue, &metadata_tag, &finish_tag))) { + return NULL; + } + + call_error = grpc_call_invoke( + self->c_call, ((CompletionQueue *)completion_queue)->c_completion_queue, + (void *)metadata_tag, (void *)finish_tag, 0); + + result = pygrpc_translate_call_error(call_error); + if (result != NULL) { + Py_INCREF(metadata_tag); + Py_INCREF(finish_tag); + } + return result; +} + +static const PyObject *pygrpc_call_write(Call *self, PyObject *args) { + const char *bytes; + int length; + const PyObject *tag; + gpr_slice slice; + grpc_byte_buffer *byte_buffer; + grpc_call_error call_error; + const PyObject *result; + + if (!(PyArg_ParseTuple(args, "s#O", &bytes, &length, &tag))) { + return NULL; + } + + slice = gpr_slice_from_copied_buffer(bytes, length); + byte_buffer = grpc_byte_buffer_create(&slice, 1); + gpr_slice_unref(slice); + + call_error = grpc_call_start_write(self->c_call, byte_buffer, (void *)tag, 0); + + grpc_byte_buffer_destroy(byte_buffer); + + result = pygrpc_translate_call_error(call_error); + if (result != NULL) { + Py_INCREF(tag); + } + return result; +} + +static const PyObject *pygrpc_call_complete(Call *self, PyObject *args) { + const PyObject *tag; + grpc_call_error call_error; + const PyObject *result; + + if (!(PyArg_ParseTuple(args, "O", &tag))) { + return NULL; + } + + call_error = grpc_call_writes_done(self->c_call, (void *)tag); + + result = pygrpc_translate_call_error(call_error); + if (result != NULL) { + Py_INCREF(tag); + } + return result; +} + +static const PyObject *pygrpc_call_accept(Call *self, PyObject *args) { + const PyObject *completion_queue; + const PyObject *tag; + grpc_call_error call_error; + const PyObject *result; + + if (!(PyArg_ParseTuple(args, "O!O", &pygrpc_CompletionQueueType, + &completion_queue, &tag))) { + return NULL; + } + + call_error = grpc_call_server_accept( + self->c_call, ((CompletionQueue *)completion_queue)->c_completion_queue, + (void *)tag); + result = pygrpc_translate_call_error(call_error); + + if (result != NULL) { + Py_INCREF(tag); + } + + return result; +} + +static const PyObject *pygrpc_call_premetadata(Call *self, PyObject *args) { + /* TODO(b/18702680): Actually support metadata. */ + return pygrpc_translate_call_error( + grpc_call_server_end_initial_metadata(self->c_call, 0)); +} + +static const PyObject *pygrpc_call_read(Call *self, PyObject *args) { + const PyObject *tag; + grpc_call_error call_error; + const PyObject *result; + + if (!(PyArg_ParseTuple(args, "O", &tag))) { + return NULL; + } + + call_error = grpc_call_start_read(self->c_call, (void *)tag); + + result = pygrpc_translate_call_error(call_error); + if (result != NULL) { + Py_INCREF(tag); + } + return result; +} + +static const PyObject *pygrpc_call_status(Call *self, PyObject *args) { + PyObject *status; + PyObject *code; + PyObject *details; + const PyObject *tag; + grpc_status_code c_code; + char *c_message; + grpc_call_error call_error; + const PyObject *result; + + if (!(PyArg_ParseTuple(args, "OO", &status, &tag))) { + return NULL; + } + + code = PyObject_GetAttrString(status, "code"); + details = PyObject_GetAttrString(status, "details"); + c_code = PyInt_AsLong(code); + c_message = PyBytes_AsString(details); + Py_DECREF(code); + Py_DECREF(details); + + call_error = grpc_call_start_write_status(self->c_call, c_code, c_message, + (void *)tag); + + result = pygrpc_translate_call_error(call_error); + if (result != NULL) { + Py_INCREF(tag); + } + return result; +} + +static const PyObject *pygrpc_call_cancel(Call *self) { + return pygrpc_translate_call_error(grpc_call_cancel(self->c_call)); +} + +static PyMethodDef methods[] = { + {"invoke", (PyCFunction)pygrpc_call_invoke, METH_VARARGS, + "Invoke this call."}, + {"write", (PyCFunction)pygrpc_call_write, METH_VARARGS, + "Write bytes to this call."}, + {"complete", (PyCFunction)pygrpc_call_complete, METH_VARARGS, + "Complete writes to this call."}, + {"accept", (PyCFunction)pygrpc_call_accept, METH_VARARGS, "Accept an RPC."}, + {"premetadata", (PyCFunction)pygrpc_call_premetadata, METH_VARARGS, + "Indicate the end of leading metadata in the response."}, + {"read", (PyCFunction)pygrpc_call_read, METH_VARARGS, + "Read bytes from this call."}, + {"status", (PyCFunction)pygrpc_call_status, METH_VARARGS, + "Report this call's status."}, + {"cancel", (PyCFunction)pygrpc_call_cancel, METH_NOARGS, + "Cancel this call."}, + {NULL}}; + +PyTypeObject pygrpc_CallType = { + PyObject_HEAD_INIT(NULL)0, /*ob_size*/ + "_grpc.Call", /*tp_name*/ + sizeof(Call), /*tp_basicsize*/ + 0, /*tp_itemsize*/ + (destructor)pygrpc_call_dealloc, /*tp_dealloc*/ + 0, /*tp_print*/ + 0, /*tp_getattr*/ + 0, /*tp_setattr*/ + 0, /*tp_compare*/ + 0, /*tp_repr*/ + 0, /*tp_as_number*/ + 0, /*tp_as_sequence*/ + 0, /*tp_as_mapping*/ + 0, /*tp_hash */ + 0, /*tp_call*/ + 0, /*tp_str*/ + 0, /*tp_getattro*/ + 0, /*tp_setattro*/ + 0, /*tp_as_buffer*/ + Py_TPFLAGS_DEFAULT, /*tp_flags*/ + "Wrapping of grpc_call.", /* tp_doc */ + 0, /* tp_traverse */ + 0, /* tp_clear */ + 0, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + 0, /* tp_iter */ + 0, /* tp_iternext */ + methods, /* tp_methods */ + 0, /* tp_members */ + 0, /* tp_getset */ + 0, /* tp_base */ + 0, /* tp_dict */ + 0, /* tp_descr_get */ + 0, /* tp_descr_set */ + 0, /* tp_dictoffset */ + (initproc)pygrpc_call_init, /* tp_init */ +}; + +int pygrpc_add_call(PyObject *module) { + pygrpc_CallType.tp_new = PyType_GenericNew; + if (PyType_Ready(&pygrpc_CallType) < 0) { + PyErr_SetString(PyExc_RuntimeError, "Error defining pygrpc_CallType!"); + return -1; + } + if (PyModule_AddObject(module, "Call", (PyObject *)&pygrpc_CallType) == -1) { + PyErr_SetString(PyExc_ImportError, "Couldn't add Call type to module!"); + } + return 0; +} diff --git a/src/python/src/_adapter/_call.h b/src/python/src/_adapter/_call.h new file mode 100644 index 0000000000..a936e23023 --- /dev/null +++ b/src/python/src/_adapter/_call.h @@ -0,0 +1,46 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef _ADAPTER__CALL_H_ +#define _ADAPTER__CALL_H_ + +#include <Python.h> +#include <grpc/grpc.h> + +typedef struct { PyObject_HEAD grpc_call *c_call; } Call; + +PyTypeObject pygrpc_CallType; + +int pygrpc_add_call(PyObject *module); + +#endif /* _ADAPTER__CALL_H_ */ diff --git a/src/python/src/_adapter/_channel.c b/src/python/src/_adapter/_channel.c new file mode 100644 index 0000000000..d41ebd4479 --- /dev/null +++ b/src/python/src/_adapter/_channel.c @@ -0,0 +1,109 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "_adapter/_channel.h" + +#include <Python.h> +#include <grpc/grpc.h> + +static int pygrpc_channel_init(Channel *self, PyObject *args, PyObject *kwds) { + const char *hostport; + + if (!(PyArg_ParseTuple(args, "s", &hostport))) { + self->c_channel = NULL; + return -1; + } + + self->c_channel = grpc_channel_create(hostport, NULL); + return 0; +} + +static void pygrpc_channel_dealloc(Channel *self) { + if (self->c_channel != NULL) { + grpc_channel_destroy(self->c_channel); + } + self->ob_type->tp_free((PyObject *)self); +} + +PyTypeObject pygrpc_ChannelType = { + PyObject_HEAD_INIT(NULL)0, /*ob_size*/ + "_grpc.Channel", /*tp_name*/ + sizeof(Channel), /*tp_basicsize*/ + 0, /*tp_itemsize*/ + (destructor)pygrpc_channel_dealloc, /*tp_dealloc*/ + 0, /*tp_print*/ + 0, /*tp_getattr*/ + 0, /*tp_setattr*/ + 0, /*tp_compare*/ + 0, /*tp_repr*/ + 0, /*tp_as_number*/ + 0, /*tp_as_sequence*/ + 0, /*tp_as_mapping*/ + 0, /*tp_hash */ + 0, /*tp_call*/ + 0, /*tp_str*/ + 0, /*tp_getattro*/ + 0, /*tp_setattro*/ + 0, /*tp_as_buffer*/ + Py_TPFLAGS_DEFAULT, /*tp_flags*/ + "Wrapping of grpc_channel.", /* tp_doc */ + 0, /* tp_traverse */ + 0, /* tp_clear */ + 0, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + 0, /* tp_iter */ + 0, /* tp_iternext */ + 0, /* tp_methods */ + 0, /* tp_members */ + 0, /* tp_getset */ + 0, /* tp_base */ + 0, /* tp_dict */ + 0, /* tp_descr_get */ + 0, /* tp_descr_set */ + 0, /* tp_dictoffset */ + (initproc)pygrpc_channel_init, /* tp_init */ +}; + +int pygrpc_add_channel(PyObject *module) { + pygrpc_ChannelType.tp_new = PyType_GenericNew; + if (PyType_Ready(&pygrpc_ChannelType) < 0) { + PyErr_SetString(PyExc_RuntimeError, "Error defining pygrpc_ChannelType!"); + return -1; + } + if (PyModule_AddObject(module, "Channel", (PyObject *)&pygrpc_ChannelType) == + -1) { + PyErr_SetString(PyExc_ImportError, "Couldn't add Channel type to module!"); + return -1; + } + return 0; +} diff --git a/src/python/src/_adapter/_channel.h b/src/python/src/_adapter/_channel.h new file mode 100644 index 0000000000..6241ccd02e --- /dev/null +++ b/src/python/src/_adapter/_channel.h @@ -0,0 +1,46 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef _ADAPTER__CHANNEL_H_ +#define _ADAPTER__CHANNEL_H_ + +#include <Python.h> +#include <grpc/grpc.h> + +typedef struct { PyObject_HEAD grpc_channel *c_channel; } Channel; + +PyTypeObject pygrpc_ChannelType; + +int pygrpc_add_channel(PyObject *module); + +#endif /* _ADAPTER__CHANNEL_H_ */ diff --git a/src/python/src/_adapter/_common.py b/src/python/src/_adapter/_common.py new file mode 100644 index 0000000000..492849f4cb --- /dev/null +++ b/src/python/src/_adapter/_common.py @@ -0,0 +1,76 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""State used by both invocation-side and service-side code.""" + +import enum + + +@enum.unique +class HighWrite(enum.Enum): + """The possible categories of high-level write state.""" + + OPEN = 'OPEN' + CLOSED = 'CLOSED' + + +class WriteState(object): + """A description of the state of writing to an RPC. + + Attributes: + low: A side-specific value describing the low-level state of writing. + high: A HighWrite value describing the high-level state of writing. + pending: A list of bytestrings for the RPC waiting to be written to the + other side of the RPC. + """ + + def __init__(self, low, high, pending): + self.low = low + self.high = high + self.pending = pending + + +class CommonRPCState(object): + """A description of an RPC's state. + + Attributes: + write: A WriteState describing the state of writing to the RPC. + sequence_number: The lowest-unused sequence number for use in generating + tickets locally describing the progress of the RPC. + deserializer: The behavior to be used to deserialize payload bytestreams + taken off the wire. + serializer: The behavior to be used to serialize payloads to be sent on the + wire. + """ + + def __init__(self, write, sequence_number, deserializer, serializer): + self.write = write + self.sequence_number = sequence_number + self.deserializer = deserializer + self.serializer = serializer diff --git a/src/python/src/_adapter/_completion_queue.c b/src/python/src/_adapter/_completion_queue.c new file mode 100644 index 0000000000..7c951d24a0 --- /dev/null +++ b/src/python/src/_adapter/_completion_queue.c @@ -0,0 +1,541 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "_adapter/_completion_queue.h" + +#include <Python.h> +#include <grpc/grpc.h> +#include <grpc/support/alloc.h> + +#include "_adapter/_call.h" + +static PyObject *status_class; +static PyObject *service_acceptance_class; +static PyObject *event_class; + +static PyObject *ok_status_code; +static PyObject *cancelled_status_code; +static PyObject *unknown_status_code; +static PyObject *invalid_argument_status_code; +static PyObject *expired_status_code; +static PyObject *not_found_status_code; +static PyObject *already_exists_status_code; +static PyObject *permission_denied_status_code; +static PyObject *unauthenticated_status_code; +static PyObject *resource_exhausted_status_code; +static PyObject *failed_precondition_status_code; +static PyObject *aborted_status_code; +static PyObject *out_of_range_status_code; +static PyObject *unimplemented_status_code; +static PyObject *internal_error_status_code; +static PyObject *unavailable_status_code; +static PyObject *data_loss_status_code; + +static PyObject *stop_event_kind; +static PyObject *write_event_kind; +static PyObject *complete_event_kind; +static PyObject *service_event_kind; +static PyObject *read_event_kind; +static PyObject *metadata_event_kind; +static PyObject *finish_event_kind; + +static PyObject *pygrpc_as_py_time(gpr_timespec *timespec) { + return Py_BuildValue("f", + timespec->tv_sec + ((double)timespec->tv_nsec) / 1.0E9); +} + +static PyObject *pygrpc_status_code(grpc_status_code c_status_code) { + switch (c_status_code) { + case GRPC_STATUS_OK: + return ok_status_code; + case GRPC_STATUS_CANCELLED: + return cancelled_status_code; + case GRPC_STATUS_UNKNOWN: + return unknown_status_code; + case GRPC_STATUS_INVALID_ARGUMENT: + return invalid_argument_status_code; + case GRPC_STATUS_DEADLINE_EXCEEDED: + return expired_status_code; + case GRPC_STATUS_NOT_FOUND: + return not_found_status_code; + case GRPC_STATUS_ALREADY_EXISTS: + return already_exists_status_code; + case GRPC_STATUS_PERMISSION_DENIED: + return permission_denied_status_code; + case GRPC_STATUS_UNAUTHENTICATED: + return unauthenticated_status_code; + case GRPC_STATUS_RESOURCE_EXHAUSTED: + return resource_exhausted_status_code; + case GRPC_STATUS_FAILED_PRECONDITION: + return failed_precondition_status_code; + case GRPC_STATUS_ABORTED: + return aborted_status_code; + case GRPC_STATUS_OUT_OF_RANGE: + return out_of_range_status_code; + case GRPC_STATUS_UNIMPLEMENTED: + return unimplemented_status_code; + case GRPC_STATUS_INTERNAL: + return internal_error_status_code; + case GRPC_STATUS_UNAVAILABLE: + return unavailable_status_code; + case GRPC_STATUS_DATA_LOSS: + return data_loss_status_code; + default: + return NULL; + } +} + +static PyObject *pygrpc_stop_event_args(grpc_event *c_event) { + return Py_BuildValue("(OOOOOOO)", stop_event_kind, Py_None, Py_None, Py_None, + Py_None, Py_None, Py_None); +} + +static PyObject *pygrpc_write_event_args(grpc_event *c_event) { + PyObject *write_accepted = + c_event->data.write_accepted == GRPC_OP_OK ? Py_True : Py_False; + return Py_BuildValue("(OOOOOOO)", write_event_kind, (PyObject *)c_event->tag, + write_accepted, Py_None, Py_None, Py_None, Py_None); +} + +static PyObject *pygrpc_complete_event_args(grpc_event *c_event) { + PyObject *complete_accepted = + c_event->data.finish_accepted == GRPC_OP_OK ? Py_True : Py_False; + return Py_BuildValue("(OOOOOOO)", complete_event_kind, + (PyObject *)c_event->tag, Py_None, complete_accepted, + Py_None, Py_None, Py_None); +} + +static PyObject *pygrpc_service_event_args(grpc_event *c_event) { + if (c_event->data.server_rpc_new.method == NULL) { + return Py_BuildValue("(OOOOOOO)", service_event_kind, c_event->tag, + Py_None, Py_None, Py_None, Py_None, Py_None); + } else { + PyObject *method = PyBytes_FromString(c_event->data.server_rpc_new.method); + PyObject *host = PyBytes_FromString(c_event->data.server_rpc_new.host); + PyObject *service_deadline = + pygrpc_as_py_time(&c_event->data.server_rpc_new.deadline); + + Call *call; + PyObject *service_acceptance_args; + PyObject *service_acceptance; + PyObject *event_args; + + call = PyObject_New(Call, &pygrpc_CallType); + call->c_call = c_event->call; + + service_acceptance_args = + Py_BuildValue("(OOOO)", call, method, host, service_deadline); + Py_DECREF(call); + Py_DECREF(method); + Py_DECREF(host); + Py_DECREF(service_deadline); + + service_acceptance = + PyObject_CallObject(service_acceptance_class, service_acceptance_args); + Py_DECREF(service_acceptance_args); + + event_args = Py_BuildValue("(OOOOOOO)", service_event_kind, + (PyObject *)c_event->tag, Py_None, Py_None, + service_acceptance, Py_None, Py_None); + Py_DECREF(service_acceptance); + return event_args; + } +} + +static PyObject *pygrpc_read_event_args(grpc_event *c_event) { + if (c_event->data.read == NULL) { + return Py_BuildValue("(OOOOOOO)", read_event_kind, + (PyObject *)c_event->tag, Py_None, Py_None, Py_None, + Py_None, Py_None); + } else { + size_t length; + size_t offset; + grpc_byte_buffer_reader *reader; + gpr_slice slice; + char *c_bytes; + PyObject *bytes; + PyObject *event_args; + + length = grpc_byte_buffer_length(c_event->data.read); + reader = grpc_byte_buffer_reader_create(c_event->data.read); + c_bytes = gpr_malloc(length); + offset = 0; + while (grpc_byte_buffer_reader_next(reader, &slice)) { + memcpy(c_bytes + offset, GPR_SLICE_START_PTR(slice), + GPR_SLICE_LENGTH(slice)); + offset += GPR_SLICE_LENGTH(slice); + } + grpc_byte_buffer_reader_destroy(reader); + bytes = PyBytes_FromStringAndSize(c_bytes, length); + gpr_free(c_bytes); + event_args = + Py_BuildValue("(OOOOOOO)", read_event_kind, (PyObject *)c_event->tag, + Py_None, Py_None, Py_None, bytes, Py_None); + Py_DECREF(bytes); + return event_args; + } +} + +static PyObject *pygrpc_metadata_event_args(grpc_event *c_event) { + /* TODO(nathaniel): Actual transmission of metadata. */ + return Py_BuildValue("(OOOOOOO)", metadata_event_kind, + (PyObject *)c_event->tag, Py_None, Py_None, Py_None, + Py_None, Py_None); +} + +static PyObject *pygrpc_finished_event_args(grpc_event *c_event) { + PyObject *code; + PyObject *details; + PyObject *status_args; + PyObject *status; + PyObject *event_args; + + code = pygrpc_status_code(c_event->data.finished.status); + if (code == NULL) { + PyErr_SetString(PyExc_RuntimeError, "Unrecognized status code!"); + return NULL; + } + if (c_event->data.finished.details == NULL) { + details = PyBytes_FromString(""); + } else { + details = PyBytes_FromString(c_event->data.finished.details); + } + status_args = Py_BuildValue("(OO)", code, details); + Py_DECREF(details); + status = PyObject_CallObject(status_class, status_args); + Py_DECREF(status_args); + event_args = + Py_BuildValue("(OOOOOOO)", finish_event_kind, (PyObject *)c_event->tag, + Py_None, Py_None, Py_None, Py_None, status); + Py_DECREF(status); + return event_args; +} + +static int pygrpc_completion_queue_init(CompletionQueue *self, PyObject *args, + PyObject *kwds) { + self->c_completion_queue = grpc_completion_queue_create(); + return 0; +} + +static void pygrpc_completion_queue_dealloc(CompletionQueue *self) { + grpc_completion_queue_destroy(self->c_completion_queue); + self->ob_type->tp_free((PyObject *)self); +} + +static PyObject *pygrpc_completion_queue_get(CompletionQueue *self, + PyObject *args) { + PyObject *deadline; + double double_deadline; + gpr_timespec deadline_timespec; + grpc_event *c_event; + + PyObject *event_args; + PyObject *event; + + if (!(PyArg_ParseTuple(args, "O", &deadline))) { + return NULL; + } + + if (deadline == Py_None) { + deadline_timespec = gpr_inf_future; + } else { + double_deadline = PyFloat_AsDouble(deadline); + deadline_timespec = gpr_time_from_nanos((long)(double_deadline * 1.0E9)); + } + + /* TODO(nathaniel): Suppress clang-format in this block and remove the + unnecessary and unPythonic semicolons trailing the _ALLOW_THREADS macros. + (Right now clang-format only understands //-demarcated suppressions.) */ + Py_BEGIN_ALLOW_THREADS; + c_event = + grpc_completion_queue_next(self->c_completion_queue, deadline_timespec); + Py_END_ALLOW_THREADS; + + if (c_event == NULL) { + Py_RETURN_NONE; + } + + switch (c_event->type) { + case GRPC_QUEUE_SHUTDOWN: + event_args = pygrpc_stop_event_args(c_event); + break; + case GRPC_WRITE_ACCEPTED: + event_args = pygrpc_write_event_args(c_event); + break; + case GRPC_FINISH_ACCEPTED: + event_args = pygrpc_complete_event_args(c_event); + break; + case GRPC_SERVER_RPC_NEW: + event_args = pygrpc_service_event_args(c_event); + break; + case GRPC_READ: + event_args = pygrpc_read_event_args(c_event); + break; + case GRPC_CLIENT_METADATA_READ: + event_args = pygrpc_metadata_event_args(c_event); + break; + case GRPC_FINISHED: + event_args = pygrpc_finished_event_args(c_event); + break; + default: + PyErr_SetString(PyExc_Exception, "Unrecognized event type!"); + return NULL; + } + + if (event_args == NULL) { + return NULL; + } + + event = PyObject_CallObject(event_class, event_args); + + Py_DECREF(event_args); + Py_XDECREF((PyObject *)c_event->tag); + grpc_event_finish(c_event); + + return event; +} + +static PyObject *pygrpc_completion_queue_stop(CompletionQueue *self) { + grpc_completion_queue_shutdown(self->c_completion_queue); + + Py_RETURN_NONE; +} + +static PyMethodDef methods[] = { + {"get", (PyCFunction)pygrpc_completion_queue_get, METH_VARARGS, + "Get the next event."}, + {"stop", (PyCFunction)pygrpc_completion_queue_stop, METH_NOARGS, + "Stop this completion queue."}, + {NULL}}; + +PyTypeObject pygrpc_CompletionQueueType = { + PyObject_HEAD_INIT(NULL)0, /*ob_size*/ + "_gprc.CompletionQueue", /*tp_name*/ + sizeof(CompletionQueue), /*tp_basicsize*/ + 0, /*tp_itemsize*/ + (destructor)pygrpc_completion_queue_dealloc, /*tp_dealloc*/ + 0, /*tp_print*/ + 0, /*tp_getattr*/ + 0, /*tp_setattr*/ + 0, /*tp_compare*/ + 0, /*tp_repr*/ + 0, /*tp_as_number*/ + 0, /*tp_as_sequence*/ + 0, /*tp_as_mapping*/ + 0, /*tp_hash */ + 0, /*tp_call*/ + 0, /*tp_str*/ + 0, /*tp_getattro*/ + 0, /*tp_setattro*/ + 0, /*tp_as_buffer*/ + Py_TPFLAGS_DEFAULT, /*tp_flags*/ + "Wrapping of grpc_completion_queue.", /* tp_doc */ + 0, /* tp_traverse */ + 0, /* tp_clear */ + 0, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + 0, /* tp_iter */ + 0, /* tp_iternext */ + methods, /* tp_methods */ + 0, /* tp_members */ + 0, /* tp_getset */ + 0, /* tp_base */ + 0, /* tp_dict */ + 0, /* tp_descr_get */ + 0, /* tp_descr_set */ + 0, /* tp_dictoffset */ + (initproc)pygrpc_completion_queue_init, /* tp_init */ +}; + +static int pygrpc_get_status_codes(PyObject *datatypes_module) { + PyObject *code_class = PyObject_GetAttrString(datatypes_module, "Code"); + if (code_class == NULL) { + return -1; + } + ok_status_code = PyObject_GetAttrString(code_class, "OK"); + if (ok_status_code == NULL) { + return -1; + } + cancelled_status_code = PyObject_GetAttrString(code_class, "CANCELLED"); + if (cancelled_status_code == NULL) { + return -1; + } + unknown_status_code = PyObject_GetAttrString(code_class, "UNKNOWN"); + if (unknown_status_code == NULL) { + return -1; + } + invalid_argument_status_code = + PyObject_GetAttrString(code_class, "INVALID_ARGUMENT"); + if (invalid_argument_status_code == NULL) { + return -1; + } + expired_status_code = PyObject_GetAttrString(code_class, "EXPIRED"); + if (expired_status_code == NULL) { + return -1; + } + not_found_status_code = PyObject_GetAttrString(code_class, "NOT_FOUND"); + if (not_found_status_code == NULL) { + return -1; + } + already_exists_status_code = + PyObject_GetAttrString(code_class, "ALREADY_EXISTS"); + if (already_exists_status_code == NULL) { + return -1; + } + permission_denied_status_code = + PyObject_GetAttrString(code_class, "PERMISSION_DENIED"); + if (permission_denied_status_code == NULL) { + return -1; + } + unauthenticated_status_code = + PyObject_GetAttrString(code_class, "UNAUTHENTICATED"); + if (unauthenticated_status_code == NULL) { + return -1; + } + resource_exhausted_status_code = + PyObject_GetAttrString(code_class, "RESOURCE_EXHAUSTED"); + if (resource_exhausted_status_code == NULL) { + return -1; + } + failed_precondition_status_code = + PyObject_GetAttrString(code_class, "FAILED_PRECONDITION"); + if (failed_precondition_status_code == NULL) { + return -1; + } + aborted_status_code = PyObject_GetAttrString(code_class, "ABORTED"); + if (aborted_status_code == NULL) { + return -1; + } + out_of_range_status_code = PyObject_GetAttrString(code_class, "OUT_OF_RANGE"); + if (out_of_range_status_code == NULL) { + return -1; + } + unimplemented_status_code = + PyObject_GetAttrString(code_class, "UNIMPLEMENTED"); + if (unimplemented_status_code == NULL) { + return -1; + } + internal_error_status_code = + PyObject_GetAttrString(code_class, "INTERNAL_ERROR"); + if (internal_error_status_code == NULL) { + return -1; + } + unavailable_status_code = PyObject_GetAttrString(code_class, "UNAVAILABLE"); + if (unavailable_status_code == NULL) { + return -1; + } + data_loss_status_code = PyObject_GetAttrString(code_class, "DATA_LOSS"); + if (data_loss_status_code == NULL) { + return -1; + } + Py_DECREF(code_class); + return 0; +} + +static int pygrpc_get_event_kinds(PyObject *event_class) { + PyObject *kind_class = PyObject_GetAttrString(event_class, "Kind"); + if (kind_class == NULL) { + return -1; + } + stop_event_kind = PyObject_GetAttrString(kind_class, "STOP"); + if (stop_event_kind == NULL) { + return -1; + } + write_event_kind = PyObject_GetAttrString(kind_class, "WRITE_ACCEPTED"); + if (write_event_kind == NULL) { + return -1; + } + complete_event_kind = PyObject_GetAttrString(kind_class, "COMPLETE_ACCEPTED"); + if (complete_event_kind == NULL) { + return -1; + } + service_event_kind = PyObject_GetAttrString(kind_class, "SERVICE_ACCEPTED"); + if (service_event_kind == NULL) { + return -1; + } + read_event_kind = PyObject_GetAttrString(kind_class, "READ_ACCEPTED"); + if (read_event_kind == NULL) { + return -1; + } + metadata_event_kind = PyObject_GetAttrString(kind_class, "METADATA_ACCEPTED"); + if (metadata_event_kind == NULL) { + return -1; + } + finish_event_kind = PyObject_GetAttrString(kind_class, "FINISH"); + if (finish_event_kind == NULL) { + return -1; + } + Py_DECREF(kind_class); + return 0; +} + +int pygrpc_add_completion_queue(PyObject *module) { + char *datatypes_module_path = "_adapter._datatypes"; + PyObject *datatypes_module = PyImport_ImportModule(datatypes_module_path); + if (datatypes_module == NULL) { + PyErr_SetString(PyExc_ImportError, datatypes_module_path); + return -1; + } + status_class = PyObject_GetAttrString(datatypes_module, "Status"); + service_acceptance_class = + PyObject_GetAttrString(datatypes_module, "ServiceAcceptance"); + event_class = PyObject_GetAttrString(datatypes_module, "Event"); + if (status_class == NULL || service_acceptance_class == NULL || + event_class == NULL) { + PyErr_SetString(PyExc_ImportError, "Missing classes in _datatypes module!"); + return -1; + } + if (pygrpc_get_status_codes(datatypes_module) == -1) { + PyErr_SetString(PyExc_ImportError, "Status codes import broken!"); + return -1; + } + if (pygrpc_get_event_kinds(event_class) == -1) { + PyErr_SetString(PyExc_ImportError, "Event kinds import broken!"); + return -1; + } + Py_DECREF(datatypes_module); + + pygrpc_CompletionQueueType.tp_new = PyType_GenericNew; + if (PyType_Ready(&pygrpc_CompletionQueueType) < 0) { + PyErr_SetString(PyExc_RuntimeError, + "Error defining pygrpc_CompletionQueueType!"); + return -1; + } + if (PyModule_AddObject(module, "CompletionQueue", + (PyObject *)&pygrpc_CompletionQueueType) == -1) { + PyErr_SetString(PyExc_ImportError, + "Couldn't add CompletionQueue type to module!"); + return -1; + } + return 0; +} diff --git a/src/python/src/_adapter/_completion_queue.h b/src/python/src/_adapter/_completion_queue.h new file mode 100644 index 0000000000..8e5ee9f406 --- /dev/null +++ b/src/python/src/_adapter/_completion_queue.h @@ -0,0 +1,48 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef _ADAPTER__COMPLETION_QUEUE_H_ +#define _ADAPTER__COMPLETION_QUEUE_H_ + +#include <Python.h> +#include <grpc/grpc.h> + +typedef struct { + PyObject_HEAD grpc_completion_queue *c_completion_queue; +} CompletionQueue; + +PyTypeObject pygrpc_CompletionQueueType; + +int pygrpc_add_completion_queue(PyObject *module); + +#endif /* _ADAPTER__COMPLETION_QUEUE_H_ */ diff --git a/src/python/src/_adapter/_datatypes.py b/src/python/src/_adapter/_datatypes.py new file mode 100644 index 0000000000..e271ec83b9 --- /dev/null +++ b/src/python/src/_adapter/_datatypes.py @@ -0,0 +1,86 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Datatypes passed between Python and C code.""" + +import collections +import enum + + +@enum.unique +class Code(enum.IntEnum): + """One Platform error codes (see status.h and codes.proto).""" + + OK = 0 + CANCELLED = 1 + UNKNOWN = 2 + INVALID_ARGUMENT = 3 + EXPIRED = 4 + NOT_FOUND = 5 + ALREADY_EXISTS = 6 + PERMISSION_DENIED = 7 + UNAUTHENTICATED = 16 + RESOURCE_EXHAUSTED = 8 + FAILED_PRECONDITION = 9 + ABORTED = 10 + OUT_OF_RANGE = 11 + UNIMPLEMENTED = 12 + INTERNAL_ERROR = 13 + UNAVAILABLE = 14 + DATA_LOSS = 15 + + +class Status(collections.namedtuple('Status', ['code', 'details'])): + """Describes an RPC's overall status.""" + + +class ServiceAcceptance( + collections.namedtuple( + 'ServiceAcceptance', ['call', 'method', 'host', 'deadline'])): + """Describes an RPC on the service side at the start of service.""" + + +class Event( + collections.namedtuple( + 'Event', + ['kind', 'tag', 'write_accepted', 'complete_accepted', + 'service_acceptance', 'bytes', 'status'])): + """Describes an event emitted from a completion queue.""" + + @enum.unique + class Kind(enum.Enum): + """Describes the kind of an event.""" + + STOP = object() + WRITE_ACCEPTED = object() + COMPLETE_ACCEPTED = object() + SERVICE_ACCEPTED = object() + READ_ACCEPTED = object() + METADATA_ACCEPTED = object() + FINISH = object() diff --git a/src/python/src/_adapter/_error.c b/src/python/src/_adapter/_error.c new file mode 100644 index 0000000000..8c04f4bcea --- /dev/null +++ b/src/python/src/_adapter/_error.c @@ -0,0 +1,79 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "_adapter/_error.h" + +#include <Python.h> +#include <grpc/grpc.h> + +const PyObject *pygrpc_translate_call_error(grpc_call_error call_error) { + switch (call_error) { + case GRPC_CALL_OK: + Py_RETURN_NONE; + case GRPC_CALL_ERROR: + PyErr_SetString(PyExc_Exception, "Defect: unknown defect!"); + return NULL; + case GRPC_CALL_ERROR_NOT_ON_SERVER: + PyErr_SetString(PyExc_Exception, + "Defect: client-only method called on server!"); + return NULL; + case GRPC_CALL_ERROR_NOT_ON_CLIENT: + PyErr_SetString(PyExc_Exception, + "Defect: server-only method called on client!"); + return NULL; + case GRPC_CALL_ERROR_ALREADY_ACCEPTED: + PyErr_SetString(PyExc_Exception, + "Defect: attempted to accept already-accepted call!"); + return NULL; + case GRPC_CALL_ERROR_ALREADY_INVOKED: + PyErr_SetString(PyExc_Exception, + "Defect: attempted to invoke already-invoked call!"); + return NULL; + case GRPC_CALL_ERROR_NOT_INVOKED: + PyErr_SetString(PyExc_Exception, "Defect: Call not yet invoked!"); + return NULL; + case GRPC_CALL_ERROR_ALREADY_FINISHED: + PyErr_SetString(PyExc_Exception, "Defect: Call already finished!"); + return NULL; + case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS: + PyErr_SetString(PyExc_Exception, + "Defect: Attempted extra read or extra write on call!"); + return NULL; + case GRPC_CALL_ERROR_INVALID_FLAGS: + PyErr_SetString(PyExc_Exception, "Defect: invalid flags!"); + return NULL; + default: + PyErr_SetString(PyExc_Exception, "Defect: Unknown call error!"); + return NULL; + } +} diff --git a/src/python/src/_adapter/_error.h b/src/python/src/_adapter/_error.h new file mode 100644 index 0000000000..6988b1c95e --- /dev/null +++ b/src/python/src/_adapter/_error.h @@ -0,0 +1,42 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef _ADAPTER__ERROR_H_ +#define _ADAPTER__ERROR_H_ + +#include <Python.h> +#include <grpc/grpc.h> + +const PyObject *pygrpc_translate_call_error(grpc_call_error call_error); + +#endif /* _ADAPTER__ERROR_H_ */ diff --git a/src/python/src/_adapter/_event_invocation_synchronous_event_service_test.py b/src/python/src/_adapter/_event_invocation_synchronous_event_service_test.py new file mode 100644 index 0000000000..69d91ec7da --- /dev/null +++ b/src/python/src/_adapter/_event_invocation_synchronous_event_service_test.py @@ -0,0 +1,46 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""One of the tests of the Face layer of RPC Framework.""" + +import unittest + +from _adapter import _face_test_case +from _framework.face.testing import event_invocation_synchronous_event_service_test_case as test_case + + +class EventInvocationSynchronousEventServiceTest( + _face_test_case.FaceTestCase, + test_case.EventInvocationSynchronousEventServiceTestCase, + unittest.TestCase): + pass + + +if __name__ == '__main__': + unittest.main() diff --git a/src/python/src/_adapter/_face_test_case.py b/src/python/src/_adapter/_face_test_case.py new file mode 100644 index 0000000000..112dcfb928 --- /dev/null +++ b/src/python/src/_adapter/_face_test_case.py @@ -0,0 +1,124 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Common construction and destruction for GRPC-backed Face-layer tests.""" + +import unittest + +from _adapter import fore +from _adapter import rear +from _framework.base import util +from _framework.base.packets import implementations as tickets_implementations +from _framework.face import implementations as face_implementations +from _framework.face.testing import coverage +from _framework.face.testing import serial +from _framework.face.testing import test_case +from _framework.foundation import logging_pool + +_TIMEOUT = 3 +_MAXIMUM_TIMEOUT = 90 +_MAXIMUM_POOL_SIZE = 400 + + +class FaceTestCase(test_case.FaceTestCase, coverage.BlockingCoverage): + """Provides abstract Face-layer tests a GRPC-backed implementation.""" + + def set_up_implementation( + self, + name, + methods, + inline_value_in_value_out_methods, + inline_value_in_stream_out_methods, + inline_stream_in_value_out_methods, + inline_stream_in_stream_out_methods, + event_value_in_value_out_methods, + event_value_in_stream_out_methods, + event_stream_in_value_out_methods, + event_stream_in_stream_out_methods, + multi_method): + pool = logging_pool.pool(_MAXIMUM_POOL_SIZE) + + servicer = face_implementations.servicer( + pool, + inline_value_in_value_out_methods=inline_value_in_value_out_methods, + inline_value_in_stream_out_methods=inline_value_in_stream_out_methods, + inline_stream_in_value_out_methods=inline_stream_in_value_out_methods, + inline_stream_in_stream_out_methods=inline_stream_in_stream_out_methods, + event_value_in_value_out_methods=event_value_in_value_out_methods, + event_value_in_stream_out_methods=event_value_in_stream_out_methods, + event_stream_in_value_out_methods=event_stream_in_value_out_methods, + event_stream_in_stream_out_methods=event_stream_in_stream_out_methods, + multi_method=multi_method) + + serialization = serial.serialization(methods) + + fore_link = fore.ForeLink( + pool, serialization.request_deserializers, + serialization.response_serializers) + port = fore_link.start() + rear_link = rear.RearLink( + 'localhost', port, pool, + serialization.request_serializers, serialization.response_deserializers) + rear_link.start() + front = tickets_implementations.front(pool, pool, pool) + back = tickets_implementations.back( + servicer, pool, pool, pool, _TIMEOUT, _MAXIMUM_TIMEOUT) + fore_link.join_rear_link(back) + back.join_fore_link(fore_link) + rear_link.join_fore_link(front) + front.join_rear_link(rear_link) + + server = face_implementations.server() + stub = face_implementations.stub(front, pool) + return server, stub, (rear_link, fore_link, front, back) + + def tear_down_implementation(self, memo): + rear_link, fore_link, front, back = memo + # TODO(nathaniel): Waiting for the front and back to idle possibly should + # not be necessary - investigate as part of graceful shutdown work. + util.wait_for_idle(front) + util.wait_for_idle(back) + rear_link.stop() + fore_link.stop() + + @unittest.skip('Service-side failure not transmitted by GRPC.') + def testFailedUnaryRequestUnaryResponse(self): + raise NotImplementedError() + + @unittest.skip('Service-side failure not transmitted by GRPC.') + def testFailedUnaryRequestStreamResponse(self): + raise NotImplementedError() + + @unittest.skip('Service-side failure not transmitted by GRPC.') + def testFailedStreamRequestUnaryResponse(self): + raise NotImplementedError() + + @unittest.skip('Service-side failure not transmitted by GRPC.') + def testFailedStreamRequestStreamResponse(self): + raise NotImplementedError() diff --git a/src/python/src/_adapter/_future_invocation_asynchronous_event_service_test.py b/src/python/src/_adapter/_future_invocation_asynchronous_event_service_test.py new file mode 100644 index 0000000000..3db39dd154 --- /dev/null +++ b/src/python/src/_adapter/_future_invocation_asynchronous_event_service_test.py @@ -0,0 +1,46 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""One of the tests of the Face layer of RPC Framework.""" + +import unittest + +from _adapter import _face_test_case +from _framework.face.testing import future_invocation_asynchronous_event_service_test_case as test_case + + +class FutureInvocationAsynchronousEventServiceTest( + _face_test_case.FaceTestCase, + test_case.FutureInvocationAsynchronousEventServiceTestCase, + unittest.TestCase): + pass + + +if __name__ == '__main__': + unittest.main() diff --git a/src/python/src/_adapter/_links_test.py b/src/python/src/_adapter/_links_test.py new file mode 100644 index 0000000000..94f17d007b --- /dev/null +++ b/src/python/src/_adapter/_links_test.py @@ -0,0 +1,245 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Test of the GRPC-backed ForeLink and RearLink.""" + +import threading +import unittest + +from _adapter import _proto_scenarios +from _adapter import _test_links +from _adapter import fore +from _adapter import rear +from _framework.base import interfaces +from _framework.base.packets import packets as tickets +from _framework.foundation import logging_pool + +_IDENTITY = lambda x: x +_TIMEOUT = 2 + + +class RoundTripTest(unittest.TestCase): + + def setUp(self): + self.fore_link_pool = logging_pool.pool(80) + self.rear_link_pool = logging_pool.pool(80) + + def tearDown(self): + self.rear_link_pool.shutdown(wait=True) + self.fore_link_pool.shutdown(wait=True) + + def testZeroMessageRoundTrip(self): + test_operation_id = object() + test_method = 'test method' + test_fore_link = _test_links.ForeLink(None, None) + def rear_action(front_to_back_ticket, fore_link): + if front_to_back_ticket.kind in ( + tickets.Kind.COMPLETION, tickets.Kind.ENTIRE): + back_to_front_ticket = tickets.BackToFrontPacket( + front_to_back_ticket.operation_id, 0, tickets.Kind.COMPLETION, None) + fore_link.accept_back_to_front_ticket(back_to_front_ticket) + test_rear_link = _test_links.RearLink(rear_action, None) + + fore_link = fore.ForeLink( + self.fore_link_pool, {test_method: None}, {test_method: None}) + fore_link.join_rear_link(test_rear_link) + test_rear_link.join_fore_link(fore_link) + port = fore_link.start() + + rear_link = rear.RearLink( + 'localhost', port, self.rear_link_pool, {test_method: None}, + {test_method: None}) + rear_link.join_fore_link(test_fore_link) + test_fore_link.join_rear_link(rear_link) + rear_link.start() + + front_to_back_ticket = tickets.FrontToBackPacket( + test_operation_id, 0, tickets.Kind.ENTIRE, test_method, interfaces.FULL, + None, None, _TIMEOUT) + rear_link.accept_front_to_back_ticket(front_to_back_ticket) + + with test_fore_link.condition: + while (not test_fore_link.tickets or + test_fore_link.tickets[-1].kind is tickets.Kind.CONTINUATION): + test_fore_link.condition.wait() + + rear_link.stop() + fore_link.stop() + + with test_fore_link.condition: + self.assertIs(test_fore_link.tickets[-1].kind, tickets.Kind.COMPLETION) + + def testEntireRoundTrip(self): + test_operation_id = object() + test_method = 'test method' + test_front_to_back_datum = b'\x07' + test_back_to_front_datum = b'\x08' + test_fore_link = _test_links.ForeLink(None, None) + rear_sequence_number = [0] + def rear_action(front_to_back_ticket, fore_link): + if front_to_back_ticket.payload is None: + payload = None + else: + payload = test_back_to_front_datum + terminal = front_to_back_ticket.kind in ( + tickets.Kind.COMPLETION, tickets.Kind.ENTIRE) + if payload is not None or terminal: + back_to_front_ticket = tickets.BackToFrontPacket( + front_to_back_ticket.operation_id, rear_sequence_number[0], + tickets.Kind.COMPLETION if terminal else tickets.Kind.CONTINUATION, + payload) + rear_sequence_number[0] += 1 + fore_link.accept_back_to_front_ticket(back_to_front_ticket) + test_rear_link = _test_links.RearLink(rear_action, None) + + fore_link = fore.ForeLink( + self.fore_link_pool, {test_method: _IDENTITY}, + {test_method: _IDENTITY}) + fore_link.join_rear_link(test_rear_link) + test_rear_link.join_fore_link(fore_link) + port = fore_link.start() + + rear_link = rear.RearLink( + 'localhost', port, self.rear_link_pool, {test_method: _IDENTITY}, + {test_method: _IDENTITY}) + rear_link.join_fore_link(test_fore_link) + test_fore_link.join_rear_link(rear_link) + rear_link.start() + + front_to_back_ticket = tickets.FrontToBackPacket( + test_operation_id, 0, tickets.Kind.ENTIRE, test_method, interfaces.FULL, + None, test_front_to_back_datum, _TIMEOUT) + rear_link.accept_front_to_back_ticket(front_to_back_ticket) + + with test_fore_link.condition: + while (not test_fore_link.tickets or + test_fore_link.tickets[-1].kind is not tickets.Kind.COMPLETION): + test_fore_link.condition.wait() + + rear_link.stop() + fore_link.stop() + + with test_rear_link.condition: + front_to_back_payloads = tuple( + ticket.payload for ticket in test_rear_link.tickets + if ticket.payload is not None) + with test_fore_link.condition: + back_to_front_payloads = tuple( + ticket.payload for ticket in test_fore_link.tickets + if ticket.payload is not None) + self.assertTupleEqual((test_front_to_back_datum,), front_to_back_payloads) + self.assertTupleEqual((test_back_to_front_datum,), back_to_front_payloads) + + def _perform_scenario_test(self, scenario): + test_operation_id = object() + test_method = scenario.method() + test_fore_link = _test_links.ForeLink(None, None) + rear_lock = threading.Lock() + rear_sequence_number = [0] + def rear_action(front_to_back_ticket, fore_link): + with rear_lock: + if front_to_back_ticket.payload is not None: + response = scenario.response_for_request(front_to_back_ticket.payload) + else: + response = None + terminal = front_to_back_ticket.kind in ( + tickets.Kind.COMPLETION, tickets.Kind.ENTIRE) + if response is not None or terminal: + back_to_front_ticket = tickets.BackToFrontPacket( + front_to_back_ticket.operation_id, rear_sequence_number[0], + tickets.Kind.COMPLETION if terminal else tickets.Kind.CONTINUATION, + response) + rear_sequence_number[0] += 1 + fore_link.accept_back_to_front_ticket(back_to_front_ticket) + test_rear_link = _test_links.RearLink(rear_action, None) + + fore_link = fore.ForeLink( + self.fore_link_pool, {test_method: scenario.deserialize_request}, + {test_method: scenario.serialize_response}) + fore_link.join_rear_link(test_rear_link) + test_rear_link.join_fore_link(fore_link) + port = fore_link.start() + + rear_link = rear.RearLink( + 'localhost', port, self.rear_link_pool, + {test_method: scenario.serialize_request}, + {test_method: scenario.deserialize_response}) + rear_link.join_fore_link(test_fore_link) + test_fore_link.join_rear_link(rear_link) + rear_link.start() + + commencement_ticket = tickets.FrontToBackPacket( + test_operation_id, 0, tickets.Kind.COMMENCEMENT, test_method, + interfaces.FULL, None, None, _TIMEOUT) + fore_sequence_number = 1 + rear_link.accept_front_to_back_ticket(commencement_ticket) + for request in scenario.requests(): + continuation_ticket = tickets.FrontToBackPacket( + test_operation_id, fore_sequence_number, tickets.Kind.CONTINUATION, + None, None, None, request, None) + fore_sequence_number += 1 + rear_link.accept_front_to_back_ticket(continuation_ticket) + completion_ticket = tickets.FrontToBackPacket( + test_operation_id, fore_sequence_number, tickets.Kind.COMPLETION, None, + None, None, None, None) + fore_sequence_number += 1 + rear_link.accept_front_to_back_ticket(completion_ticket) + + with test_fore_link.condition: + while (not test_fore_link.tickets or + test_fore_link.tickets[-1].kind is not tickets.Kind.COMPLETION): + test_fore_link.condition.wait() + + rear_link.stop() + fore_link.stop() + + with test_rear_link.condition: + requests = tuple( + ticket.payload for ticket in test_rear_link.tickets + if ticket.payload is not None) + with test_fore_link.condition: + responses = tuple( + ticket.payload for ticket in test_fore_link.tickets + if ticket.payload is not None) + self.assertTrue(scenario.verify_requests(requests)) + self.assertTrue(scenario.verify_responses(responses)) + + def testEmptyScenario(self): + self._perform_scenario_test(_proto_scenarios.EmptyScenario()) + + def testBidirectionallyUnaryScenario(self): + self._perform_scenario_test(_proto_scenarios.BidirectionallyUnaryScenario()) + + def testBidirectionallyStreamingScenario(self): + self._perform_scenario_test( + _proto_scenarios.BidirectionallyStreamingScenario()) + + +if __name__ == '__main__': + unittest.main() diff --git a/src/python/src/_adapter/_lonely_rear_link_test.py b/src/python/src/_adapter/_lonely_rear_link_test.py new file mode 100644 index 0000000000..18279e05a2 --- /dev/null +++ b/src/python/src/_adapter/_lonely_rear_link_test.py @@ -0,0 +1,97 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""A test of invocation-side code unconnected to an RPC server.""" + +import unittest + +from _adapter import _test_links +from _adapter import rear +from _framework.base import interfaces +from _framework.base.packets import packets +from _framework.foundation import logging_pool + +_IDENTITY = lambda x: x +_TIMEOUT = 2 + + +class LonelyRearLinkTest(unittest.TestCase): + + def setUp(self): + self.pool = logging_pool.pool(80) + + def tearDown(self): + self.pool.shutdown(wait=True) + + def testUpAndDown(self): + rear_link = rear.RearLink('nonexistent', 54321, self.pool, {}, {}) + + rear_link.start() + rear_link.stop() + + def _perform_lonely_client_test_with_ticket_kind( + self, front_to_back_ticket_kind): + test_operation_id = object() + test_method = 'test method' + fore_link = _test_links.ForeLink(None, None) + + rear_link = rear.RearLink( + 'nonexistent', 54321, self.pool, {test_method: None}, + {test_method: None}) + rear_link.join_fore_link(fore_link) + rear_link.start() + + front_to_back_ticket = packets.FrontToBackPacket( + test_operation_id, 0, front_to_back_ticket_kind, test_method, + interfaces.FULL, None, None, _TIMEOUT) + rear_link.accept_front_to_back_ticket(front_to_back_ticket) + + with fore_link.condition: + while True: + if (fore_link.tickets and + fore_link.tickets[-1].kind is not packets.Kind.CONTINUATION): + break + fore_link.condition.wait() + + rear_link.stop() + + with fore_link.condition: + self.assertIsNot(fore_link.tickets[-1].kind, packets.Kind.COMPLETION) + + @unittest.skip('TODO(nathaniel): This seems to have broken in the last few weeks; fix it.') + def testLonelyClientCommencementPacket(self): + self._perform_lonely_client_test_with_ticket_kind( + packets.Kind.COMMENCEMENT) + + def testLonelyClientEntirePacket(self): + self._perform_lonely_client_test_with_ticket_kind(packets.Kind.ENTIRE) + + +if __name__ == '__main__': + unittest.main() diff --git a/src/python/src/_adapter/_low.py b/src/python/src/_adapter/_low.py new file mode 100644 index 0000000000..6c24087dad --- /dev/null +++ b/src/python/src/_adapter/_low.py @@ -0,0 +1,55 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""A Python interface for GRPC C core structures and behaviors.""" + +import atexit +import gc + +from _adapter import _c +from _adapter import _datatypes + +def _shut_down(): + # force garbage collection before shutting down grpc, to ensure all grpc + # objects are cleaned up + gc.collect() + _c.shut_down() + +_c.init() +atexit.register(_shut_down) + +# pylint: disable=invalid-name +Code = _datatypes.Code +Status = _datatypes.Status +Event = _datatypes.Event +Call = _c.Call +Channel = _c.Channel +CompletionQueue = _c.CompletionQueue +Server = _c.Server +# pylint: enable=invalid-name diff --git a/src/python/src/_adapter/_low_test.py b/src/python/src/_adapter/_low_test.py new file mode 100644 index 0000000000..57b3be66a0 --- /dev/null +++ b/src/python/src/_adapter/_low_test.py @@ -0,0 +1,371 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Tests for _adapter._low.""" + +import time +import unittest + +from _adapter import _low + +_STREAM_LENGTH = 300 +_TIMEOUT = 5 +_AFTER_DELAY = 2 +_FUTURE = time.time() + 60 * 60 * 24 +_BYTE_SEQUENCE = b'\abcdefghijklmnopqrstuvwxyz0123456789' * 200 +_BYTE_SEQUENCE_SEQUENCE = tuple( + bytes(bytearray((row + column) % 256 for column in range(row))) + for row in range(_STREAM_LENGTH)) + + +class LonelyClientTest(unittest.TestCase): + + def testLonelyClient(self): + host = 'nosuchhostexists' + port = 54321 + method = 'test method' + deadline = time.time() + _TIMEOUT + after_deadline = deadline + _AFTER_DELAY + metadata_tag = object() + finish_tag = object() + + completion_queue = _low.CompletionQueue() + channel = _low.Channel('%s:%d' % (host, port)) + client_call = _low.Call(channel, method, host, deadline) + + client_call.invoke(completion_queue, metadata_tag, finish_tag) + first_event = completion_queue.get(after_deadline) + self.assertIsNotNone(first_event) + second_event = completion_queue.get(after_deadline) + self.assertIsNotNone(second_event) + kinds = [event.kind for event in (first_event, second_event)] + self.assertItemsEqual( + (_low.Event.Kind.METADATA_ACCEPTED, _low.Event.Kind.FINISH), + kinds) + + self.assertIsNone(completion_queue.get(after_deadline)) + + completion_queue.stop() + stop_event = completion_queue.get(_FUTURE) + self.assertEqual(_low.Event.Kind.STOP, stop_event.kind) + + +class EchoTest(unittest.TestCase): + + def setUp(self): + self.host = 'localhost' + + self.server_completion_queue = _low.CompletionQueue() + self.server = _low.Server(self.server_completion_queue) + port = self.server.add_http2_addr('[::]:0') + self.server.start() + + self.client_completion_queue = _low.CompletionQueue() + self.channel = _low.Channel('%s:%d' % (self.host, port)) + + def tearDown(self): + self.server.stop() + # NOTE(nathaniel): Yep, this is weird; it's a consequence of + # grpc_server_destroy's being what has the effect of telling the server's + # completion queue to pump out all pending events/tags immediately rather + # than gracefully completing all outstanding RPCs while accepting no new + # ones. + # TODO(nathaniel): Deallocation of a Python object shouldn't have this kind + # of observable side effect let alone such an important one. + del self.server + self.server_completion_queue.stop() + self.client_completion_queue.stop() + while True: + event = self.server_completion_queue.get(_FUTURE) + if event is not None and event.kind is _low.Event.Kind.STOP: + break + while True: + event = self.client_completion_queue.get(_FUTURE) + if event is not None and event.kind is _low.Event.Kind.STOP: + break + self.server_completion_queue = None + self.client_completion_queue = None + + def _perform_echo_test(self, test_data): + method = 'test method' + details = 'test details' + deadline = _FUTURE + metadata_tag = object() + finish_tag = object() + write_tag = object() + complete_tag = object() + service_tag = object() + read_tag = object() + status_tag = object() + + server_data = [] + client_data = [] + + client_call = _low.Call(self.channel, method, self.host, deadline) + + client_call.invoke(self.client_completion_queue, metadata_tag, finish_tag) + + self.server.service(service_tag) + service_accepted = self.server_completion_queue.get(_FUTURE) + self.assertIsNotNone(service_accepted) + self.assertIs(service_accepted.kind, _low.Event.Kind.SERVICE_ACCEPTED) + self.assertIs(service_accepted.tag, service_tag) + self.assertEqual(method, service_accepted.service_acceptance.method) + self.assertEqual(self.host, service_accepted.service_acceptance.host) + self.assertIsNotNone(service_accepted.service_acceptance.call) + server_call = service_accepted.service_acceptance.call + server_call.accept(self.server_completion_queue, finish_tag) + server_call.premetadata() + + metadata_accepted = self.client_completion_queue.get(_FUTURE) + self.assertIsNotNone(metadata_accepted) + self.assertEqual(_low.Event.Kind.METADATA_ACCEPTED, metadata_accepted.kind) + self.assertEqual(metadata_tag, metadata_accepted.tag) + # TODO(nathaniel): Test transmission and reception of metadata. + + for datum in test_data: + client_call.write(datum, write_tag) + write_accepted = self.client_completion_queue.get(_FUTURE) + self.assertIsNotNone(write_accepted) + self.assertIs(write_accepted.kind, _low.Event.Kind.WRITE_ACCEPTED) + self.assertIs(write_accepted.tag, write_tag) + self.assertIs(write_accepted.write_accepted, True) + + server_call.read(read_tag) + read_accepted = self.server_completion_queue.get(_FUTURE) + self.assertIsNotNone(read_accepted) + self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind) + self.assertEqual(read_tag, read_accepted.tag) + self.assertIsNotNone(read_accepted.bytes) + server_data.append(read_accepted.bytes) + + server_call.write(read_accepted.bytes, write_tag) + write_accepted = self.server_completion_queue.get(_FUTURE) + self.assertIsNotNone(write_accepted) + self.assertEqual(_low.Event.Kind.WRITE_ACCEPTED, write_accepted.kind) + self.assertEqual(write_tag, write_accepted.tag) + self.assertTrue(write_accepted.write_accepted) + + client_call.read(read_tag) + read_accepted = self.client_completion_queue.get(_FUTURE) + self.assertIsNotNone(read_accepted) + self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind) + self.assertEqual(read_tag, read_accepted.tag) + self.assertIsNotNone(read_accepted.bytes) + client_data.append(read_accepted.bytes) + + client_call.complete(complete_tag) + complete_accepted = self.client_completion_queue.get(_FUTURE) + self.assertIsNotNone(complete_accepted) + self.assertIs(complete_accepted.kind, _low.Event.Kind.COMPLETE_ACCEPTED) + self.assertIs(complete_accepted.tag, complete_tag) + self.assertIs(complete_accepted.complete_accepted, True) + + server_call.read(read_tag) + read_accepted = self.server_completion_queue.get(_FUTURE) + self.assertIsNotNone(read_accepted) + self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind) + self.assertEqual(read_tag, read_accepted.tag) + self.assertIsNone(read_accepted.bytes) + + server_call.status(_low.Status(_low.Code.OK, details), status_tag) + server_terminal_event_one = self.server_completion_queue.get(_FUTURE) + server_terminal_event_two = self.server_completion_queue.get(_FUTURE) + if server_terminal_event_one.kind == _low.Event.Kind.COMPLETE_ACCEPTED: + status_accepted = server_terminal_event_one + rpc_accepted = server_terminal_event_two + else: + status_accepted = server_terminal_event_two + rpc_accepted = server_terminal_event_one + self.assertIsNotNone(status_accepted) + self.assertIsNotNone(rpc_accepted) + self.assertEqual(_low.Event.Kind.COMPLETE_ACCEPTED, status_accepted.kind) + self.assertEqual(status_tag, status_accepted.tag) + self.assertTrue(status_accepted.complete_accepted) + self.assertEqual(_low.Event.Kind.FINISH, rpc_accepted.kind) + self.assertEqual(finish_tag, rpc_accepted.tag) + self.assertEqual(_low.Status(_low.Code.OK, ''), rpc_accepted.status) + + client_call.read(read_tag) + client_terminal_event_one = self.client_completion_queue.get(_FUTURE) + client_terminal_event_two = self.client_completion_queue.get(_FUTURE) + if client_terminal_event_one.kind == _low.Event.Kind.READ_ACCEPTED: + read_accepted = client_terminal_event_one + finish_accepted = client_terminal_event_two + else: + read_accepted = client_terminal_event_two + finish_accepted = client_terminal_event_one + self.assertIsNotNone(read_accepted) + self.assertIsNotNone(finish_accepted) + self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind) + self.assertEqual(read_tag, read_accepted.tag) + self.assertIsNone(read_accepted.bytes) + self.assertEqual(_low.Event.Kind.FINISH, finish_accepted.kind) + self.assertEqual(finish_tag, finish_accepted.tag) + self.assertEqual(_low.Status(_low.Code.OK, details), finish_accepted.status) + + server_timeout_none_event = self.server_completion_queue.get(0) + self.assertIsNone(server_timeout_none_event) + client_timeout_none_event = self.client_completion_queue.get(0) + self.assertIsNone(client_timeout_none_event) + + self.assertSequenceEqual(test_data, server_data) + self.assertSequenceEqual(test_data, client_data) + + def testNoEcho(self): + self._perform_echo_test(()) + + def testOneByteEcho(self): + self._perform_echo_test([b'\x07']) + + def testOneManyByteEcho(self): + self._perform_echo_test([_BYTE_SEQUENCE]) + + def testManyOneByteEchoes(self): + self._perform_echo_test(_BYTE_SEQUENCE) + + def testManyManyByteEchoes(self): + self._perform_echo_test(_BYTE_SEQUENCE_SEQUENCE) + + +class CancellationTest(unittest.TestCase): + + def setUp(self): + self.host = 'localhost' + + self.server_completion_queue = _low.CompletionQueue() + self.server = _low.Server(self.server_completion_queue) + port = self.server.add_http2_addr('[::]:0') + self.server.start() + + self.client_completion_queue = _low.CompletionQueue() + self.channel = _low.Channel('%s:%d' % (self.host, port)) + + def tearDown(self): + self.server.stop() + del self.server + self.server_completion_queue.stop() + self.client_completion_queue.stop() + while True: + event = self.server_completion_queue.get(0) + if event is not None and event.kind is _low.Event.Kind.STOP: + break + while True: + event = self.client_completion_queue.get(0) + if event is not None and event.kind is _low.Event.Kind.STOP: + break + + def testCancellation(self): + method = 'test method' + deadline = _FUTURE + metadata_tag = object() + finish_tag = object() + write_tag = object() + service_tag = object() + read_tag = object() + test_data = _BYTE_SEQUENCE_SEQUENCE + + server_data = [] + client_data = [] + + client_call = _low.Call(self.channel, method, self.host, deadline) + + client_call.invoke(self.client_completion_queue, metadata_tag, finish_tag) + + self.server.service(service_tag) + service_accepted = self.server_completion_queue.get(_FUTURE) + server_call = service_accepted.service_acceptance.call + + server_call.accept(self.server_completion_queue, finish_tag) + server_call.premetadata() + + metadata_accepted = self.client_completion_queue.get(_FUTURE) + self.assertIsNotNone(metadata_accepted) + + for datum in test_data: + client_call.write(datum, write_tag) + write_accepted = self.client_completion_queue.get(_FUTURE) + + server_call.read(read_tag) + read_accepted = self.server_completion_queue.get(_FUTURE) + server_data.append(read_accepted.bytes) + + server_call.write(read_accepted.bytes, write_tag) + write_accepted = self.server_completion_queue.get(_FUTURE) + self.assertIsNotNone(write_accepted) + + client_call.read(read_tag) + read_accepted = self.client_completion_queue.get(_FUTURE) + client_data.append(read_accepted.bytes) + + client_call.cancel() + # cancel() is idempotent. + client_call.cancel() + client_call.cancel() + client_call.cancel() + + server_call.read(read_tag) + + server_terminal_event_one = self.server_completion_queue.get(_FUTURE) + server_terminal_event_two = self.server_completion_queue.get(_FUTURE) + if server_terminal_event_one.kind == _low.Event.Kind.READ_ACCEPTED: + read_accepted = server_terminal_event_one + rpc_accepted = server_terminal_event_two + else: + read_accepted = server_terminal_event_two + rpc_accepted = server_terminal_event_one + self.assertIsNotNone(read_accepted) + self.assertIsNotNone(rpc_accepted) + self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind) + self.assertIsNone(read_accepted.bytes) + self.assertEqual(_low.Event.Kind.FINISH, rpc_accepted.kind) + self.assertEqual(_low.Status(_low.Code.CANCELLED, ''), rpc_accepted.status) + + finish_event = self.client_completion_queue.get(_FUTURE) + self.assertEqual(_low.Event.Kind.FINISH, finish_event.kind) + self.assertEqual(_low.Status(_low.Code.CANCELLED, ''), finish_event.status) + + server_timeout_none_event = self.server_completion_queue.get(0) + self.assertIsNone(server_timeout_none_event) + client_timeout_none_event = self.client_completion_queue.get(0) + self.assertIsNone(client_timeout_none_event) + + self.assertSequenceEqual(test_data, server_data) + self.assertSequenceEqual(test_data, client_data) + + +class ExpirationTest(unittest.TestCase): + + @unittest.skip('TODO(nathaniel): Expiration test!') + def testExpiration(self): + pass + + +if __name__ == '__main__': + unittest.main() diff --git a/src/python/src/_adapter/_proto_scenarios.py b/src/python/src/_adapter/_proto_scenarios.py new file mode 100644 index 0000000000..c452fb523a --- /dev/null +++ b/src/python/src/_adapter/_proto_scenarios.py @@ -0,0 +1,261 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Test scenarios using protocol buffers.""" + +import abc +import threading + +from _junkdrawer import math_pb2 + + +class ProtoScenario(object): + """An RPC test scenario using protocol buffers.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def method(self): + """Access the test method name. + + Returns: + The test method name. + """ + raise NotImplementedError() + + @abc.abstractmethod + def serialize_request(self, request): + """Serialize a request protocol buffer. + + Args: + request: A request protocol buffer. + + Returns: + The bytestring serialization of the given request protocol buffer. + """ + raise NotImplementedError() + + @abc.abstractmethod + def deserialize_request(self, request_bytestring): + """Deserialize a request protocol buffer. + + Args: + request_bytestring: The bytestring serialization of a request protocol + buffer. + + Returns: + The request protocol buffer deserialized from the given byte string. + """ + raise NotImplementedError() + + @abc.abstractmethod + def serialize_response(self, response): + """Serialize a response protocol buffer. + + Args: + response: A response protocol buffer. + + Returns: + The bytestring serialization of the given response protocol buffer. + """ + raise NotImplementedError() + + @abc.abstractmethod + def deserialize_response(self, response_bytestring): + """Deserialize a response protocol buffer. + + Args: + response_bytestring: The bytestring serialization of a response protocol + buffer. + + Returns: + The response protocol buffer deserialized from the given byte string. + """ + raise NotImplementedError() + + @abc.abstractmethod + def requests(self): + """Access the sequence of requests for this scenario. + + Returns: + A sequence of request protocol buffers. + """ + raise NotImplementedError() + + @abc.abstractmethod + def response_for_request(self, request): + """Access the response for a particular request. + + Args: + request: A request protocol buffer. + + Returns: + The response protocol buffer appropriate for the given request. + """ + raise NotImplementedError() + + @abc.abstractmethod + def verify_requests(self, experimental_requests): + """Verify the requests transmitted through the system under test. + + Args: + experimental_requests: The request protocol buffers transmitted through + the system under test. + + Returns: + True if the requests satisfy this test scenario; False otherwise. + """ + raise NotImplementedError() + + @abc.abstractmethod + def verify_responses(self, experimental_responses): + """Verify the responses transmitted through the system under test. + + Args: + experimental_responses: The response protocol buffers transmitted through + the system under test. + + Returns: + True if the responses satisfy this test scenario; False otherwise. + """ + raise NotImplementedError() + + +class EmptyScenario(ProtoScenario): + """A scenario that transmits no protocol buffers in either direction.""" + + def method(self): + return 'DivMany' + + def serialize_request(self, request): + raise ValueError('This should not be necessary to call!') + + def deserialize_request(self, request_bytestring): + raise ValueError('This should not be necessary to call!') + + def serialize_response(self, response): + raise ValueError('This should not be necessary to call!') + + def deserialize_response(self, response_bytestring): + raise ValueError('This should not be necessary to call!') + + def requests(self): + return () + + def response_for_request(self, request): + raise ValueError('This should not be necessary to call!') + + def verify_requests(self, experimental_requests): + return not experimental_requests + + def verify_responses(self, experimental_responses): + return not experimental_responses + + +class BidirectionallyUnaryScenario(ProtoScenario): + """A scenario that transmits no protocol buffers in either direction.""" + + _DIVIDEND = 59 + _DIVISOR = 7 + _QUOTIENT = 8 + _REMAINDER = 3 + + _REQUEST = math_pb2.DivArgs(dividend=_DIVIDEND, divisor=_DIVISOR) + _RESPONSE = math_pb2.DivReply(quotient=_QUOTIENT, remainder=_REMAINDER) + + def method(self): + return 'Div' + + def serialize_request(self, request): + return request.SerializeToString() + + def deserialize_request(self, request_bytestring): + return math_pb2.DivArgs.FromString(request_bytestring) + + def serialize_response(self, response): + return response.SerializeToString() + + def deserialize_response(self, response_bytestring): + return math_pb2.DivReply.FromString(response_bytestring) + + def requests(self): + return [self._REQUEST] + + def response_for_request(self, request): + return self._RESPONSE + + def verify_requests(self, experimental_requests): + return tuple(experimental_requests) == (self._REQUEST,) + + def verify_responses(self, experimental_responses): + return tuple(experimental_responses) == (self._RESPONSE,) + + +class BidirectionallyStreamingScenario(ProtoScenario): + """A scenario that transmits no protocol buffers in either direction.""" + + _STREAM_LENGTH = 200 + _REQUESTS = tuple( + math_pb2.DivArgs(dividend=59 + index, divisor=7 + index) + for index in range(_STREAM_LENGTH)) + + def __init__(self): + self._lock = threading.Lock() + self._responses = [] + + def method(self): + return 'DivMany' + + def serialize_request(self, request): + return request.SerializeToString() + + def deserialize_request(self, request_bytestring): + return math_pb2.DivArgs.FromString(request_bytestring) + + def serialize_response(self, response): + return response.SerializeToString() + + def deserialize_response(self, response_bytestring): + return math_pb2.DivReply.FromString(response_bytestring) + + def requests(self): + return self._REQUESTS + + def response_for_request(self, request): + quotient, remainder = divmod(request.dividend, request.divisor) + response = math_pb2.DivReply(quotient=quotient, remainder=remainder) + with self._lock: + self._responses.append(response) + return response + + def verify_requests(self, experimental_requests): + return tuple(experimental_requests) == self._REQUESTS + + def verify_responses(self, experimental_responses): + with self._lock: + return tuple(experimental_responses) == tuple(self._responses) diff --git a/src/python/src/_adapter/_server.c b/src/python/src/_adapter/_server.c new file mode 100644 index 0000000000..a40d32ff51 --- /dev/null +++ b/src/python/src/_adapter/_server.c @@ -0,0 +1,167 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "_adapter/_server.h" + +#include <Python.h> +#include <grpc/grpc.h> + +#include "_adapter/_completion_queue.h" +#include "_adapter/_error.h" + +static int pygrpc_server_init(Server *self, PyObject *args, PyObject *kwds) { + const PyObject *completion_queue; + if (!(PyArg_ParseTuple(args, "O!", &pygrpc_CompletionQueueType, + &completion_queue))) { + self->c_server = NULL; + return -1; + } + + self->c_server = grpc_server_create( + ((CompletionQueue *)completion_queue)->c_completion_queue, NULL); + return 0; +} + +static void pygrpc_server_dealloc(Server *self) { + if (self->c_server != NULL) { + grpc_server_destroy(self->c_server); + } + self->ob_type->tp_free((PyObject *)self); +} + +static PyObject *pygrpc_server_add_http2_addr(Server *self, PyObject *args) { + const char *addr; + int port; + PyArg_ParseTuple(args, "s", &addr); + + port = grpc_server_add_http2_port(self->c_server, addr); + if (port == 0) { + PyErr_SetString(PyExc_RuntimeError, "Couldn't add port to server!"); + return NULL; + } + + return PyInt_FromLong(port); +} + +static PyObject *pygrpc_server_start(Server *self) { + grpc_server_start(self->c_server); + + Py_RETURN_NONE; +} + +static const PyObject *pygrpc_server_service(Server *self, PyObject *args) { + const PyObject *tag; + grpc_call_error call_error; + const PyObject *result; + + if (!(PyArg_ParseTuple(args, "O", &tag))) { + return NULL; + } + + call_error = grpc_server_request_call(self->c_server, (void *)tag); + + result = pygrpc_translate_call_error(call_error); + if (result != NULL) { + Py_INCREF(tag); + } + return result; +} + +static PyObject *pygrpc_server_stop(Server *self) { + grpc_server_shutdown(self->c_server); + + Py_RETURN_NONE; +} + +static PyMethodDef methods[] = { + {"add_http2_addr", (PyCFunction)pygrpc_server_add_http2_addr, METH_VARARGS, + "Add an HTTP2 address."}, + {"start", (PyCFunction)pygrpc_server_start, METH_NOARGS, + "Starts the server."}, + {"service", (PyCFunction)pygrpc_server_service, METH_VARARGS, + "Services a call."}, + {"stop", (PyCFunction)pygrpc_server_stop, METH_NOARGS, "Stops the server."}, + {NULL}}; + +static PyTypeObject pygrpc_ServerType = { + PyObject_HEAD_INIT(NULL)0, /*ob_size*/ + "_gprc.Server", /*tp_name*/ + sizeof(Server), /*tp_basicsize*/ + 0, /*tp_itemsize*/ + (destructor)pygrpc_server_dealloc, /*tp_dealloc*/ + 0, /*tp_print*/ + 0, /*tp_getattr*/ + 0, /*tp_setattr*/ + 0, /*tp_compare*/ + 0, /*tp_repr*/ + 0, /*tp_as_number*/ + 0, /*tp_as_sequence*/ + 0, /*tp_as_mapping*/ + 0, /*tp_hash */ + 0, /*tp_call*/ + 0, /*tp_str*/ + 0, /*tp_getattro*/ + 0, /*tp_setattro*/ + 0, /*tp_as_buffer*/ + Py_TPFLAGS_DEFAULT, /*tp_flags*/ + "Wrapping of grpc_server.", /* tp_doc */ + 0, /* tp_traverse */ + 0, /* tp_clear */ + 0, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + 0, /* tp_iter */ + 0, /* tp_iternext */ + methods, /* tp_methods */ + 0, /* tp_members */ + 0, /* tp_getset */ + 0, /* tp_base */ + 0, /* tp_dict */ + 0, /* tp_descr_get */ + 0, /* tp_descr_set */ + 0, /* tp_dictoffset */ + (initproc)pygrpc_server_init, /* tp_init */ +}; + +int pygrpc_add_server(PyObject *module) { + pygrpc_ServerType.tp_new = PyType_GenericNew; + if (PyType_Ready(&pygrpc_ServerType) < 0) { + PyErr_SetString(PyExc_RuntimeError, "Error defining pygrpc_ServerType!"); + return -1; + } + if (PyModule_AddObject(module, "Server", (PyObject *)&pygrpc_ServerType) == + -1) { + PyErr_SetString(PyExc_ImportError, "Couldn't add Server type to module!"); + return -1; + } + return 0; +} diff --git a/src/python/src/_adapter/_server.h b/src/python/src/_adapter/_server.h new file mode 100644 index 0000000000..0c517e3715 --- /dev/null +++ b/src/python/src/_adapter/_server.h @@ -0,0 +1,44 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef _ADAPTER__SERVER_H_ +#define _ADAPTER__SERVER_H_ + +#include <Python.h> +#include <grpc/grpc.h> + +typedef struct { PyObject_HEAD grpc_server *c_server; } Server; + +int pygrpc_add_server(PyObject *module); + +#endif /* _ADAPTER__SERVER_H_ */ diff --git a/src/python/src/_adapter/_test_links.py b/src/python/src/_adapter/_test_links.py new file mode 100644 index 0000000000..77d1b00f36 --- /dev/null +++ b/src/python/src/_adapter/_test_links.py @@ -0,0 +1,80 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Links suitable for use in tests.""" + +import threading + +from _framework.base.packets import interfaces + + +class ForeLink(interfaces.ForeLink): + """A ForeLink suitable for use in tests of RearLinks.""" + + def __init__(self, action, rear_link): + self.condition = threading.Condition() + self.tickets = [] + self.action = action + self.rear_link = rear_link + + def accept_back_to_front_ticket(self, ticket): + with self.condition: + self.tickets.append(ticket) + self.condition.notify_all() + action, rear_link = self.action, self.rear_link + + if action is not None: + action(ticket, rear_link) + + def join_rear_link(self, rear_link): + with self.condition: + self.rear_link = rear_link + + +class RearLink(interfaces.RearLink): + """A RearLink suitable for use in tests of ForeLinks.""" + + def __init__(self, action, fore_link): + self.condition = threading.Condition() + self.tickets = [] + self.action = action + self.fore_link = fore_link + + def accept_front_to_back_ticket(self, ticket): + with self.condition: + self.tickets.append(ticket) + self.condition.notify_all() + action, fore_link = self.action, self.fore_link + + if action is not None: + action(ticket, fore_link) + + def join_fore_link(self, fore_link): + with self.condition: + self.fore_link = fore_link diff --git a/src/python/src/_adapter/fore.py b/src/python/src/_adapter/fore.py new file mode 100644 index 0000000000..0f00957938 --- /dev/null +++ b/src/python/src/_adapter/fore.py @@ -0,0 +1,309 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""The RPC-service-side bridge between RPC Framework and GRPC-on-the-wire.""" + +import enum +import logging +import threading +import time + +from _adapter import _common +from _adapter import _low +from _framework.base import interfaces +from _framework.base.packets import interfaces as ticket_interfaces +from _framework.base.packets import null +from _framework.base.packets import packets as tickets + + +@enum.unique +class _LowWrite(enum.Enum): + """The possible categories of low-level write state.""" + + OPEN = 'OPEN' + ACTIVE = 'ACTIVE' + CLOSED = 'CLOSED' + + +def _write(call, rpc_state, payload): + serialized_payload = rpc_state.serializer(payload) + if rpc_state.write.low is _LowWrite.OPEN: + call.write(serialized_payload, call) + rpc_state.write.low = _LowWrite.ACTIVE + else: + rpc_state.write.pending.append(serialized_payload) + + +def _status(call, rpc_state): + call.status(_low.Status(_low.Code.OK, ''), call) + rpc_state.write.low = _LowWrite.CLOSED + + +class ForeLink(ticket_interfaces.ForeLink): + """A service-side bridge between RPC Framework and the C-ish _low code.""" + + def __init__( + self, pool, request_deserializers, response_serializers, port=None): + """Constructor. + + Args: + pool: A thread pool. + request_deserializers: A dict from RPC method names to request object + deserializer behaviors. + response_serializers: A dict from RPC method names to response object + serializer behaviors. + port: The port on which to serve, or None to have a port selected + automatically. + """ + self._condition = threading.Condition() + self._pool = pool + self._request_deserializers = request_deserializers + self._response_serializers = response_serializers + self._port = port + + self._rear_link = null.NULL_REAR_LINK + self._completion_queue = None + self._server = None + self._rpc_states = {} + self._spinning = False + + def _on_stop_event(self): + self._spinning = False + self._condition.notify_all() + + def _on_service_acceptance_event(self, event, server): + """Handle a service invocation event.""" + service_acceptance = event.service_acceptance + if service_acceptance is None: + return + + call = service_acceptance.call + call.accept(self._completion_queue, call) + # TODO(nathaniel): Metadata support. + call.premetadata() + call.read(call) + method = service_acceptance.method + + self._rpc_states[call] = _common.CommonRPCState( + _common.WriteState(_LowWrite.OPEN, _common.HighWrite.OPEN, []), 1, + self._request_deserializers[method], + self._response_serializers[method]) + + ticket = tickets.FrontToBackPacket( + call, 0, tickets.Kind.COMMENCEMENT, method, interfaces.FULL, None, None, + service_acceptance.deadline - time.time()) + self._rear_link.accept_front_to_back_ticket(ticket) + + server.service(None) + + def _on_read_event(self, event): + """Handle data arriving during an RPC.""" + call = event.tag + rpc_state = self._rpc_states.get(call, None) + if rpc_state is None: + return + + sequence_number = rpc_state.sequence_number + rpc_state.sequence_number += 1 + if event.bytes is None: + ticket = tickets.FrontToBackPacket( + call, sequence_number, tickets.Kind.COMPLETION, None, None, None, + None, None) + else: + call.read(call) + ticket = tickets.FrontToBackPacket( + call, sequence_number, tickets.Kind.CONTINUATION, None, None, None, + rpc_state.deserializer(event.bytes), None) + + self._rear_link.accept_front_to_back_ticket(ticket) + + def _on_write_event(self, event): + call = event.tag + rpc_state = self._rpc_states.get(call, None) + if rpc_state is None: + return + + if rpc_state.write.pending: + serialized_payload = rpc_state.write.pending.pop(0) + call.write(serialized_payload, call) + elif rpc_state.write.high is _common.HighWrite.CLOSED: + _status(call, rpc_state) + else: + rpc_state.write.low = _LowWrite.OPEN + + def _on_complete_event(self, event): + if not event.complete_accepted: + logging.error('Complete not accepted! %s', (event,)) + call = event.tag + rpc_state = self._rpc_states.pop(call, None) + if rpc_state is None: + return + + sequence_number = rpc_state.sequence_number + rpc_state.sequence_number += 1 + ticket = tickets.FrontToBackPacket( + call, sequence_number, tickets.Kind.TRANSMISSION_FAILURE, None, None, + None, None, None) + self._rear_link.accept_front_to_back_ticket(ticket) + + def _on_finish_event(self, event): + """Handle termination of an RPC.""" + call = event.tag + rpc_state = self._rpc_states.pop(call, None) + if rpc_state is None: + return + + code = event.status.code + if code is _low.Code.OK: + return + + sequence_number = rpc_state.sequence_number + rpc_state.sequence_number += 1 + if code is _low.Code.CANCELLED: + ticket = tickets.FrontToBackPacket( + call, sequence_number, tickets.Kind.CANCELLATION, None, None, None, + None, None) + elif code is _low.Code.EXPIRED: + ticket = tickets.FrontToBackPacket( + call, sequence_number, tickets.Kind.EXPIRATION, None, None, None, + None, None) + else: + # TODO(nathaniel): Better mapping of codes to ticket-categories + ticket = tickets.FrontToBackPacket( + call, sequence_number, tickets.Kind.TRANSMISSION_FAILURE, None, None, + None, None, None) + self._rear_link.accept_front_to_back_ticket(ticket) + + def _spin(self, completion_queue, server): + while True: + event = completion_queue.get(None) + + with self._condition: + if event.kind is _low.Event.Kind.STOP: + self._on_stop_event() + return + elif self._server is None: + continue + elif event.kind is _low.Event.Kind.SERVICE_ACCEPTED: + self._on_service_acceptance_event(event, server) + elif event.kind is _low.Event.Kind.READ_ACCEPTED: + self._on_read_event(event) + elif event.kind is _low.Event.Kind.WRITE_ACCEPTED: + self._on_write_event(event) + elif event.kind is _low.Event.Kind.COMPLETE_ACCEPTED: + self._on_complete_event(event) + elif event.kind is _low.Event.Kind.FINISH: + self._on_finish_event(event) + else: + logging.error('Illegal event! %s', (event,)) + + def _continue(self, call, payload): + rpc_state = self._rpc_states.get(call, None) + if rpc_state is None: + return + + _write(call, rpc_state, payload) + + def _complete(self, call, payload): + """Handle completion of the writes of an RPC.""" + rpc_state = self._rpc_states.get(call, None) + if rpc_state is None: + return + + if rpc_state.write.low is _LowWrite.OPEN: + if payload is None: + _status(call, rpc_state) + else: + _write(call, rpc_state, payload) + elif rpc_state.write.low is _LowWrite.ACTIVE: + if payload is not None: + rpc_state.write.pending.append(rpc_state.serializer(payload)) + else: + raise ValueError('Called to complete after having already completed!') + rpc_state.write.high = _common.HighWrite.CLOSED + + def _cancel(self, call): + call.cancel() + self._rpc_states.pop(call, None) + + def join_rear_link(self, rear_link): + """See ticket_interfaces.ForeLink.join_rear_link for specification.""" + self._rear_link = null.NULL_REAR_LINK if rear_link is None else rear_link + + def start(self): + """Starts this ForeLink. + + This method must be called before attempting to exchange tickets with this + object. + """ + with self._condition: + self._completion_queue = _low.CompletionQueue() + self._server = _low.Server(self._completion_queue) + port = self._server.add_http2_addr( + '[::]:%d' % (0 if self._port is None else self._port)) + self._server.start() + + self._server.service(None) + + self._pool.submit(self._spin, self._completion_queue, self._server) + self._spinning = True + + return port + + # TODO(nathaniel): Expose graceful-shutdown semantics in which this object + # enters a state in which it finishes ongoing RPCs but refuses new ones. + def stop(self): + """Stops this ForeLink. + + This method must be called for proper termination of this object, and no + attempts to exchange tickets with this object may be made after this method + has been called. + """ + with self._condition: + self._server.stop() + # TODO(b/18904187): Yep, this is weird. Deleting a server shouldn't have a + # behaviorally significant side-effect. + self._server = None + self._completion_queue.stop() + + while self._spinning: + self._condition.wait() + + def accept_back_to_front_ticket(self, ticket): + """See ticket_interfaces.ForeLink.accept_back_to_front_ticket for spec.""" + with self._condition: + if self._server is None: + return + + if ticket.kind is tickets.Kind.CONTINUATION: + self._continue(ticket.operation_id, ticket.payload) + elif ticket.kind is tickets.Kind.COMPLETION: + self._complete(ticket.operation_id, ticket.payload) + else: + self._cancel(ticket.operation_id) diff --git a/src/python/src/_adapter/rear.py b/src/python/src/_adapter/rear.py new file mode 100644 index 0000000000..5e0975ab4e --- /dev/null +++ b/src/python/src/_adapter/rear.py @@ -0,0 +1,344 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""The RPC-invocation-side bridge between RPC Framework and GRPC-on-the-wire.""" + +import enum +import logging +import threading +import time + +from _adapter import _common +from _adapter import _low +from _framework.base.packets import interfaces as ticket_interfaces +from _framework.base.packets import null +from _framework.base.packets import packets as tickets + +_INVOCATION_EVENT_KINDS = ( + _low.Event.Kind.METADATA_ACCEPTED, + _low.Event.Kind.FINISH +) + + +@enum.unique +class _LowWrite(enum.Enum): + """The possible categories of low-level write state.""" + + OPEN = 'OPEN' + ACTIVE = 'ACTIVE' + CLOSED = 'CLOSED' + + +class _RPCState(object): + """The full state of any tracked RPC. + + Attributes: + call: The _low.Call object for the RPC. + outstanding: The set of Event.Kind values describing expected future events + for the RPC. + active: A boolean indicating whether or not the RPC is active. + common: An _common.RPCState describing additional state for the RPC. + """ + + def __init__(self, call, outstanding, active, common): + self.call = call + self.outstanding = outstanding + self.active = active + self.common = common + + +def _write(operation_id, call, outstanding, write_state, serialized_payload): + if write_state.low is _LowWrite.OPEN: + call.write(serialized_payload, operation_id) + outstanding.add(_low.Event.Kind.WRITE_ACCEPTED) + write_state.low = _LowWrite.ACTIVE + elif write_state.low is _LowWrite.ACTIVE: + write_state.pending.append(serialized_payload) + else: + raise ValueError('Write attempted after writes completed!') + + +class RearLink(ticket_interfaces.RearLink): + """An invocation-side bridge between RPC Framework and the C-ish _low code.""" + + def __init__( + self, host, port, pool, request_serializers, response_deserializers): + """Constructor. + + Args: + host: The host to which to connect for RPC service. + port: The port to which to connect for RPC service. + pool: A thread pool. + request_serializers: A dict from RPC method names to request object + serializer behaviors. + response_deserializers: A dict from RPC method names to response object + deserializer behaviors. + """ + self._condition = threading.Condition() + self._host = host + self._port = port + self._pool = pool + self._request_serializers = request_serializers + self._response_deserializers = response_deserializers + + self._fore_link = null.NULL_FORE_LINK + self._completion_queue = None + self._channel = None + self._rpc_states = {} + self._spinning = False + + def _on_write_event(self, operation_id, event, rpc_state): + if event.write_accepted: + if rpc_state.common.write.pending: + rpc_state.call.write( + rpc_state.common.write.pending.pop(0), operation_id) + rpc_state.outstanding.add(_low.Event.Kind.WRITE_ACCEPTED) + elif rpc_state.common.write.high is _common.HighWrite.CLOSED: + rpc_state.call.complete(operation_id) + rpc_state.outstanding.add(_low.Event.Kind.COMPLETE_ACCEPTED) + rpc_state.common.write.low = _LowWrite.CLOSED + else: + rpc_state.common.write.low = _LowWrite.OPEN + else: + logging.error('RPC write not accepted! Event: %s', (event,)) + rpc_state.active = False + ticket = tickets.BackToFrontPacket( + operation_id, rpc_state.common.sequence_number, + tickets.Kind.TRANSMISSION_FAILURE, None) + rpc_state.common.sequence_number += 1 + self._fore_link.accept_back_to_front_ticket(ticket) + + def _on_read_event(self, operation_id, event, rpc_state): + if event.bytes is not None: + rpc_state.call.read(operation_id) + rpc_state.outstanding.add(_low.Event.Kind.READ_ACCEPTED) + + ticket = tickets.BackToFrontPacket( + operation_id, rpc_state.common.sequence_number, + tickets.Kind.CONTINUATION, rpc_state.common.deserializer(event.bytes)) + rpc_state.common.sequence_number += 1 + self._fore_link.accept_back_to_front_ticket(ticket) + + def _on_complete_event(self, operation_id, event, rpc_state): + if not event.complete_accepted: + logging.error('RPC complete not accepted! Event: %s', (event,)) + rpc_state.active = False + ticket = tickets.BackToFrontPacket( + operation_id, rpc_state.common.sequence_number, + tickets.Kind.TRANSMISSION_FAILURE, None) + rpc_state.common.sequence_number += 1 + self._fore_link.accept_back_to_front_ticket(ticket) + + # TODO(nathaniel): Metadata support. + def _on_metadata_event(self, operation_id, event, rpc_state): # pylint: disable=unused-argument + rpc_state.call.read(operation_id) + rpc_state.outstanding.add(_low.Event.Kind.READ_ACCEPTED) + + def _on_finish_event(self, operation_id, event, rpc_state): + """Handle termination of an RPC.""" + # TODO(nathaniel): Cover all statuses. + if event.status.code is _low.Code.OK: + category = tickets.Kind.COMPLETION + elif event.status.code is _low.Code.CANCELLED: + category = tickets.Kind.CANCELLATION + elif event.status.code is _low.Code.EXPIRED: + category = tickets.Kind.EXPIRATION + else: + category = tickets.Kind.TRANSMISSION_FAILURE + ticket = tickets.BackToFrontPacket( + operation_id, rpc_state.common.sequence_number, category, + None) + rpc_state.common.sequence_number += 1 + self._fore_link.accept_back_to_front_ticket(ticket) + + def _spin(self, completion_queue): + while True: + event = completion_queue.get(None) + operation_id = event.tag + + with self._condition: + rpc_state = self._rpc_states[operation_id] + rpc_state.outstanding.remove(event.kind) + if rpc_state.active and self._completion_queue is not None: + if event.kind is _low.Event.Kind.WRITE_ACCEPTED: + self._on_write_event(operation_id, event, rpc_state) + elif event.kind is _low.Event.Kind.METADATA_ACCEPTED: + self._on_metadata_event(operation_id, event, rpc_state) + elif event.kind is _low.Event.Kind.READ_ACCEPTED: + self._on_read_event(operation_id, event, rpc_state) + elif event.kind is _low.Event.Kind.COMPLETE_ACCEPTED: + self._on_complete_event(operation_id, event, rpc_state) + elif event.kind is _low.Event.Kind.FINISH: + self._on_finish_event(operation_id, event, rpc_state) + else: + logging.error('Illegal RPC event! %s', (event,)) + + if not rpc_state.outstanding: + self._rpc_states.pop(operation_id) + if not self._rpc_states: + self._spinning = False + self._condition.notify_all() + return + + def _invoke(self, operation_id, name, high_state, payload, timeout): + """Invoke an RPC. + + Args: + operation_id: Any object to be used as an operation ID for the RPC. + name: The RPC method name. + high_state: A _common.HighWrite value representing the "high write state" + of the RPC. + payload: A payload object for the RPC or None if no payload was given at + invocation-time. + timeout: A duration of time in seconds to allow for the RPC. + """ + request_serializer = self._request_serializers[name] + call = _low.Call(self._channel, name, self._host, time.time() + timeout) + call.invoke(self._completion_queue, operation_id, operation_id) + outstanding = set(_INVOCATION_EVENT_KINDS) + + if payload is None: + if high_state is _common.HighWrite.CLOSED: + call.complete(operation_id) + low_state = _LowWrite.CLOSED + outstanding.add(_low.Event.Kind.COMPLETE_ACCEPTED) + else: + low_state = _LowWrite.OPEN + else: + serialized_payload = request_serializer(payload) + call.write(serialized_payload, operation_id) + outstanding.add(_low.Event.Kind.WRITE_ACCEPTED) + low_state = _LowWrite.ACTIVE + + write_state = _common.WriteState(low_state, high_state, []) + common_state = _common.CommonRPCState( + write_state, 0, self._response_deserializers[name], request_serializer) + self._rpc_states[operation_id] = _RPCState( + call, outstanding, True, common_state) + + if not self._spinning: + self._pool.submit(self._spin, self._completion_queue) + self._spinning = True + + def _commence(self, operation_id, name, payload, timeout): + self._invoke(operation_id, name, _common.HighWrite.OPEN, payload, timeout) + + def _continue(self, operation_id, payload): + rpc_state = self._rpc_states.get(operation_id, None) + if rpc_state is None or not rpc_state.active: + return + + _write( + operation_id, rpc_state.call, rpc_state.outstanding, + rpc_state.common.write, rpc_state.common.serializer(payload)) + + def _complete(self, operation_id, payload): + """Close writes associated with an ongoing RPC. + + Args: + operation_id: Any object being use as an operation ID for the RPC. + payload: A payload object for the RPC (and thus the last payload object + for the RPC) or None if no payload was given along with the instruction + to indicate the end of writes for the RPC. + """ + rpc_state = self._rpc_states.get(operation_id, None) + if rpc_state is None or not rpc_state.active: + return + + write_state = rpc_state.common.write + if payload is None: + if write_state.low is _LowWrite.OPEN: + rpc_state.call.complete(operation_id) + rpc_state.outstanding.add(_low.Event.Kind.COMPLETE_ACCEPTED) + write_state.low = _LowWrite.CLOSED + else: + _write( + operation_id, rpc_state.call, rpc_state.outstanding, write_state, + rpc_state.common.serializer(payload)) + write_state.high = _common.HighWrite.CLOSED + + def _entire(self, operation_id, name, payload, timeout): + self._invoke(operation_id, name, _common.HighWrite.CLOSED, payload, timeout) + + def _cancel(self, operation_id): + rpc_state = self._rpc_states.get(operation_id, None) + if rpc_state is not None and rpc_state.active: + rpc_state.call.cancel() + rpc_state.active = False + + def join_fore_link(self, fore_link): + """See ticket_interfaces.RearLink.join_fore_link for specification.""" + with self._condition: + self._fore_link = null.NULL_FORE_LINK if fore_link is None else fore_link + + def start(self): + """Starts this RearLink. + + This method must be called before attempting to exchange tickets with this + object. + """ + with self._condition: + self._completion_queue = _low.CompletionQueue() + self._channel = _low.Channel('%s:%d' % (self._host, self._port)) + + def stop(self): + """Stops this RearLink. + + This method must be called for proper termination of this object, and no + attempts to exchange tickets with this object may be made after this method + has been called. + """ + with self._condition: + self._completion_queue.stop() + self._completion_queue = None + + while self._spinning: + self._condition.wait() + + def accept_front_to_back_ticket(self, ticket): + """See ticket_interfaces.RearLink.accept_front_to_back_ticket for spec.""" + with self._condition: + if self._completion_queue is None: + return + + if ticket.kind is tickets.Kind.COMMENCEMENT: + self._commence( + ticket.operation_id, ticket.name, ticket.payload, ticket.timeout) + elif ticket.kind is tickets.Kind.CONTINUATION: + self._continue(ticket.operation_id, ticket.payload) + elif ticket.kind is tickets.Kind.COMPLETION: + self._complete(ticket.operation_id, ticket.payload) + elif ticket.kind is tickets.Kind.ENTIRE: + self._entire( + ticket.operation_id, ticket.name, ticket.payload, ticket.timeout) + elif ticket.kind is tickets.Kind.CANCELLATION: + self._cancel(ticket.operation_id) + else: + # NOTE(nathaniel): All other categories are treated as cancellation. + self._cancel(ticket.operation_id) diff --git a/src/python/src/_junkdrawer/math_pb2.py b/src/python/src/_junkdrawer/math_pb2.py new file mode 100644 index 0000000000..20165955b4 --- /dev/null +++ b/src/python/src/_junkdrawer/math_pb2.py @@ -0,0 +1,266 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +# TODO(nathaniel): Remove this from source control after having made +# generation from the math.proto source part of GRPC's build-and-test +# process. + +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: math.proto + +import sys +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +from google.protobuf import descriptor_pb2 +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='math.proto', + package='math', + serialized_pb=_b('\n\nmath.proto\x12\x04math\",\n\x07\x44ivArgs\x12\x10\n\x08\x64ividend\x18\x01 \x02(\x03\x12\x0f\n\x07\x64ivisor\x18\x02 \x02(\x03\"/\n\x08\x44ivReply\x12\x10\n\x08quotient\x18\x01 \x02(\x03\x12\x11\n\tremainder\x18\x02 \x02(\x03\"\x18\n\x07\x46ibArgs\x12\r\n\x05limit\x18\x01 \x01(\x03\"\x12\n\x03Num\x12\x0b\n\x03num\x18\x01 \x02(\x03\"\x19\n\x08\x46ibReply\x12\r\n\x05\x63ount\x18\x01 \x02(\x03\x32\xa4\x01\n\x04Math\x12&\n\x03\x44iv\x12\r.math.DivArgs\x1a\x0e.math.DivReply\"\x00\x12.\n\x07\x44ivMany\x12\r.math.DivArgs\x1a\x0e.math.DivReply\"\x00(\x01\x30\x01\x12#\n\x03\x46ib\x12\r.math.FibArgs\x1a\t.math.Num\"\x00\x30\x01\x12\x1f\n\x03Sum\x12\t.math.Num\x1a\t.math.Num\"\x00(\x01') +) +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + + + + +_DIVARGS = _descriptor.Descriptor( + name='DivArgs', + full_name='math.DivArgs', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='dividend', full_name='math.DivArgs.dividend', index=0, + number=1, type=3, cpp_type=2, label=2, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='divisor', full_name='math.DivArgs.divisor', index=1, + number=2, type=3, cpp_type=2, label=2, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + extension_ranges=[], + oneofs=[ + ], + serialized_start=20, + serialized_end=64, +) + + +_DIVREPLY = _descriptor.Descriptor( + name='DivReply', + full_name='math.DivReply', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='quotient', full_name='math.DivReply.quotient', index=0, + number=1, type=3, cpp_type=2, label=2, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='remainder', full_name='math.DivReply.remainder', index=1, + number=2, type=3, cpp_type=2, label=2, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + extension_ranges=[], + oneofs=[ + ], + serialized_start=66, + serialized_end=113, +) + + +_FIBARGS = _descriptor.Descriptor( + name='FibArgs', + full_name='math.FibArgs', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='limit', full_name='math.FibArgs.limit', index=0, + number=1, type=3, cpp_type=2, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + extension_ranges=[], + oneofs=[ + ], + serialized_start=115, + serialized_end=139, +) + + +_NUM = _descriptor.Descriptor( + name='Num', + full_name='math.Num', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='num', full_name='math.Num.num', index=0, + number=1, type=3, cpp_type=2, label=2, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + extension_ranges=[], + oneofs=[ + ], + serialized_start=141, + serialized_end=159, +) + + +_FIBREPLY = _descriptor.Descriptor( + name='FibReply', + full_name='math.FibReply', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='count', full_name='math.FibReply.count', index=0, + number=1, type=3, cpp_type=2, label=2, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + extension_ranges=[], + oneofs=[ + ], + serialized_start=161, + serialized_end=186, +) + +DESCRIPTOR.message_types_by_name['DivArgs'] = _DIVARGS +DESCRIPTOR.message_types_by_name['DivReply'] = _DIVREPLY +DESCRIPTOR.message_types_by_name['FibArgs'] = _FIBARGS +DESCRIPTOR.message_types_by_name['Num'] = _NUM +DESCRIPTOR.message_types_by_name['FibReply'] = _FIBREPLY + +DivArgs = _reflection.GeneratedProtocolMessageType('DivArgs', (_message.Message,), dict( + DESCRIPTOR = _DIVARGS, + __module__ = 'math_pb2' + # @@protoc_insertion_point(class_scope:math.DivArgs) + )) +_sym_db.RegisterMessage(DivArgs) + +DivReply = _reflection.GeneratedProtocolMessageType('DivReply', (_message.Message,), dict( + DESCRIPTOR = _DIVREPLY, + __module__ = 'math_pb2' + # @@protoc_insertion_point(class_scope:math.DivReply) + )) +_sym_db.RegisterMessage(DivReply) + +FibArgs = _reflection.GeneratedProtocolMessageType('FibArgs', (_message.Message,), dict( + DESCRIPTOR = _FIBARGS, + __module__ = 'math_pb2' + # @@protoc_insertion_point(class_scope:math.FibArgs) + )) +_sym_db.RegisterMessage(FibArgs) + +Num = _reflection.GeneratedProtocolMessageType('Num', (_message.Message,), dict( + DESCRIPTOR = _NUM, + __module__ = 'math_pb2' + # @@protoc_insertion_point(class_scope:math.Num) + )) +_sym_db.RegisterMessage(Num) + +FibReply = _reflection.GeneratedProtocolMessageType('FibReply', (_message.Message,), dict( + DESCRIPTOR = _FIBREPLY, + __module__ = 'math_pb2' + # @@protoc_insertion_point(class_scope:math.FibReply) + )) +_sym_db.RegisterMessage(FibReply) + + +# @@protoc_insertion_point(module_scope) |