aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-07-05 21:08:33 -0700
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-07-05 21:08:33 -0700
commit27166a6f65581a9ee820882e5d7506d6bd07ade6 (patch)
tree0e89c5e09a955b2a074a8b34e0f3fe0a4135f346 /src/python
parent4efb6966bdfb62c725c6614b0d85ea374250bb51 (diff)
parentd1dd3a68a2d4af56f1409327c197590dac6968cb (diff)
Merge github.com:grpc/grpc into flow-like-lava-to-a-barnyard
Conflicts: src/core/surface/call.c src/core/transport/chttp2_transport.c src/core/transport/transport.h
Diffstat (limited to 'src/python')
-rw-r--r--src/python/README.md69
-rw-r--r--src/python/requirements.txt2
-rw-r--r--src/python/src/README.rst22
-rw-r--r--src/python/src/grpc/_adapter/_c/types.h6
-rw-r--r--src/python/src/grpc/_adapter/_c/types/server.c13
-rw-r--r--src/python/src/grpc/_adapter/_c/utility.c103
-rw-r--r--src/python/src/grpc/_adapter/_intermediary_low.py11
-rw-r--r--src/python/src/grpc/_adapter/_intermediary_low_test.py109
-rw-r--r--src/python/src/grpc/_adapter/_links_test.py2
-rw-r--r--src/python/src/grpc/_adapter/_low.py7
-rw-r--r--src/python/src/grpc/_adapter/_low_test.py36
-rw-r--r--src/python/src/setup.py4
12 files changed, 225 insertions, 159 deletions
diff --git a/src/python/README.md b/src/python/README.md
index c67201b670..2beb3a913a 100644
--- a/src/python/README.md
+++ b/src/python/README.md
@@ -1,25 +1,34 @@
gRPC Python
=========
-
The Python facility of gRPC.
-
Status
-------
-
Alpha : Ready for early adopters
-Prerequisites
------------------------
-
-Python 2.7, virtualenv, pip, libprotobuf-dev, and libprotoc-dev.
-
+PREREQUISITES
+-------------
+- Python 2.7, virtualenv, pip
+- [homebrew][] on Mac OS X, [linuxbrew][] on Linux. These simplify the installation of the gRPC C core.
+
+INSTALLATION
+-------------
+On Mac OS X, install [homebrew][]. On Linux, install [linuxbrew][].
+Run the following command to install gRPC Python.
+```sh
+$ curl -fsSL https://goo.gl/getgrpc | bash -s python
+```
+This will download and run the [gRPC install script][], then install the latest version of the gRPC Python package. It also installs the Protocol Buffers compiler (_protoc_) and the gRPC _protoc_ plugin for python.
-Building from source
-----------------------
+EXAMPLES
+--------
+Please read our online documentation for a [Quick Start][] and a [detailed example][]
+BUILDING FROM SOURCE
+---------------------
+- Clone this repository
- Build the gRPC core from the root of the
- [gRPC git repo](https://github.com/grpc/grpc)
+ [gRPC Git repository](https://github.com/grpc/grpc)
```
$ make shared_c static_c
```
@@ -29,40 +38,16 @@ $ make shared_c static_c
$ tools/run_tests/build_python.sh
```
-
-Testing
------------------------
+TESTING
+-------
- Use run_python.sh to run gRPC as it was installed into the virtual environment
```
$ tools/run_tests/run_python.sh
```
-
-Installing
------------------------
-
-- Install the gRPC core
- - [Debian package](https://github.com/grpc/grpc/releases)
- ```
- $ wget https://github.com/grpc/grpc/releases/download/release-0_5_0/libgrpc_0.5.0_amd64.deb
- $ wget https://github.com/grpc/grpc/releases/download/release-0_5_0/libgrpc-dev_0.5.0_amd64.deb
- $ sudo dpkg -i libgrpc_0.5.0_amd64.deb libgrpc-dev_0.5.0_amd64.deb
- ```
- - [From source](https://github.com/grpc/grpc/blob/master/INSTALL)
-
-- Install gRPC Python's dependencies
-```
-$ pip install -r src/python/requirements.txt
-```
-
-- Install gRPC Python
-```
-$ pip install src/python/src
-```
-
-Packaging to PyPI
------------------------
+PACKAGING
+---------
- Install packaging dependencies
```
@@ -73,3 +58,9 @@ $ pip install setuptools twine
```
$ ../../tools/distrib/python/submit.py
```
+
+[homebrew]:http://brew.sh
+[linuxbrew]:https://github.com/Homebrew/linuxbrew#installation
+[gRPC install script]:https://raw.githubusercontent.com/grpc/homebrew-grpc/master/scripts/install
+[Quick Start]:http://www.grpc.io/docs/tutorials/basic/python.html
+[detailed example]:http://www.grpc.io/docs/installation/python.html
diff --git a/src/python/requirements.txt b/src/python/requirements.txt
index d32d436d3c..43395df03b 100644
--- a/src/python/requirements.txt
+++ b/src/python/requirements.txt
@@ -1,3 +1,3 @@
enum34==1.0.4
futures==2.2.0
-protobuf==3.0.0a2
+protobuf==3.0.0a3
diff --git a/src/python/src/README.rst b/src/python/src/README.rst
index bc1815febc..00bdecf56f 100644
--- a/src/python/src/README.rst
+++ b/src/python/src/README.rst
@@ -6,22 +6,18 @@ Package for GRPC Python.
Dependencies
------------
-Ensure that you have installed GRPC core.
-
-On debian linux systems, install from our released deb package:
+Ensure you have installed the gRPC core. On Mac OS X, install homebrew_. On Linux, install linuxbrew_.
+Run the following command to install gRPC Python.
::
- $ wget https://github.com/grpc/grpc/releases/download/release-0_5_0/libgrpc_0.5.0_amd64.deb
- $ wget https://github.com/grpc/grpc/releases/download/release-0_5_0/libgrpc-dev_0.5.0_amd64.deb
- $ sudo dpkg -i libgrpc_0.5.0_amd64.deb libgrpc-dev_0.5.0_amd64.deb
-
-Otherwise, install from source:
+ $ curl -fsSL https://goo.gl/getgrpc | bash -s python
-::
+This will download and run the [gRPC install script][] to install grpc core. The script then uses pip to install this package. It also installs the Protocol Buffers compiler (_protoc_) and the gRPC _protoc_ plugin for python.
- git clone https://github.com/grpc/grpc.git
- cd grpc
- ./configure
- make && make install
+Otherwise, `install from source`_
+.. _`install from source`: https://github.com/grpc/grpc/blob/master/src/python/README.md#building-from-source
+.. _homebrew: http://brew.sh
+.. _linuxbrew: https://github.com/Homebrew/linuxbrew#installation
+.. _`gRPC install script`: https://raw.githubusercontent.com/grpc/homebrew-grpc/master/scripts/install
diff --git a/src/python/src/grpc/_adapter/_c/types.h b/src/python/src/grpc/_adapter/_c/types.h
index e189ae2566..3449f0643f 100644
--- a/src/python/src/grpc/_adapter/_c/types.h
+++ b/src/python/src/grpc/_adapter/_c/types.h
@@ -241,10 +241,10 @@ double pygrpc_cast_gpr_timespec_to_double(gpr_timespec timespec);
gpr_timespec pygrpc_cast_double_to_gpr_timespec(double seconds);
/* Returns true on success, false on failure. */
-int pygrpc_cast_pylist_to_send_metadata(
- PyObject *pylist, grpc_metadata **metadata, size_t *count);
+int pygrpc_cast_pyseq_to_send_metadata(
+ PyObject *pyseq, grpc_metadata **metadata, size_t *count);
/* Returns a metadata array as a Python object on success, else NULL. */
-PyObject *pygrpc_cast_metadata_array_to_pylist(grpc_metadata_array metadata);
+PyObject *pygrpc_cast_metadata_array_to_pyseq(grpc_metadata_array metadata);
/* Transliterate from a list of python channel arguments (2-tuples of string
and string|integer|None) to a grpc_channel_args object. The strings placed
diff --git a/src/python/src/grpc/_adapter/_c/types/server.c b/src/python/src/grpc/_adapter/_c/types/server.c
index 65d84b58fe..2a00f34039 100644
--- a/src/python/src/grpc/_adapter/_c/types/server.c
+++ b/src/python/src/grpc/_adapter/_c/types/server.c
@@ -105,6 +105,7 @@ Server *pygrpc_Server_new(PyTypeObject *type, PyObject *args, PyObject *kwargs)
}
self = (Server *)type->tp_alloc(type, 0);
self->c_serv = grpc_server_create(&c_args);
+ grpc_server_register_completion_queue(self->c_serv, cq->c_cq);
pygrpc_discard_channel_args(c_args);
self->cq = cq;
Py_INCREF(self->cq);
@@ -167,17 +168,13 @@ PyObject *pygrpc_Server_start(Server *self, PyObject *ignored) {
PyObject *pygrpc_Server_shutdown(
Server *self, PyObject *args, PyObject *kwargs) {
- PyObject *user_tag = NULL;
+ PyObject *user_tag;
pygrpc_tag *tag;
static char *keywords[] = {"tag", NULL};
- if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|O", keywords, &user_tag)) {
+ if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O", keywords, &user_tag)) {
return NULL;
}
- if (user_tag) {
- tag = pygrpc_produce_server_shutdown_tag(user_tag);
- grpc_server_shutdown_and_notify(self->c_serv, tag);
- } else {
- grpc_server_shutdown(self->c_serv);
- }
+ tag = pygrpc_produce_server_shutdown_tag(user_tag);
+ grpc_server_shutdown_and_notify(self->c_serv, self->cq->c_cq, tag);
Py_RETURN_NONE;
}
diff --git a/src/python/src/grpc/_adapter/_c/utility.c b/src/python/src/grpc/_adapter/_c/utility.c
index 6d228c73fe..480a720c21 100644
--- a/src/python/src/grpc/_adapter/_c/utility.c
+++ b/src/python/src/grpc/_adapter/_c/utility.c
@@ -32,13 +32,16 @@
*/
#include <math.h>
+#include <string.h>
#define PY_SSIZE_T_CLEAN
#include <Python.h>
#include <grpc/grpc.h>
+#include <grpc/byte_buffer_reader.h>
#include <grpc/support/alloc.h>
#include <grpc/support/slice.h>
#include <grpc/support/time.h>
+#include <grpc/support/string_util.h>
#include "grpc/_adapter/_c/types.h"
@@ -116,12 +119,13 @@ PyObject *pygrpc_consume_event(grpc_event event) {
tag->request_call_details.method, tag->request_call_details.host,
pygrpc_cast_gpr_timespec_to_double(tag->request_call_details.deadline),
GRPC_OP_RECV_INITIAL_METADATA,
- pygrpc_cast_metadata_array_to_pylist(tag->request_metadata), Py_None,
+ pygrpc_cast_metadata_array_to_pyseq(tag->request_metadata), Py_None,
Py_None, Py_None, Py_None,
event.success ? Py_True : Py_False);
} else {
result = Py_BuildValue("iOOONO", GRPC_OP_COMPLETE, tag->user_tag,
- tag->call, Py_None, pygrpc_consume_ops(tag->ops, tag->nops),
+ tag->call ? (PyObject*)tag->call : Py_None, Py_None,
+ pygrpc_consume_ops(tag->ops, tag->nops),
event.success ? Py_True : Py_False);
}
break;
@@ -144,29 +148,32 @@ int pygrpc_produce_op(PyObject *op, grpc_op *result) {
static const int STATUS_INDEX = 4;
static const int STATUS_CODE_INDEX = 0;
static const int STATUS_DETAILS_INDEX = 1;
+ int type;
+ Py_ssize_t message_size;
+ char *message;
+ char *status_details;
+ gpr_slice message_slice;
grpc_op c_op;
if (!PyTuple_Check(op)) {
PyErr_SetString(PyExc_TypeError, "expected tuple op");
return 0;
}
if (PyTuple_Size(op) != OP_TUPLE_SIZE) {
- char buf[64];
- snprintf(buf, sizeof(buf), "expected tuple op of length %d", OP_TUPLE_SIZE);
+ char *buf;
+ gpr_asprintf(&buf, "expected tuple op of length %d", OP_TUPLE_SIZE);
PyErr_SetString(PyExc_ValueError, buf);
+ gpr_free(buf);
return 0;
}
- int type = PyInt_AsLong(PyTuple_GET_ITEM(op, TYPE_INDEX));
+ type = PyInt_AsLong(PyTuple_GET_ITEM(op, TYPE_INDEX));
if (PyErr_Occurred()) {
return 0;
}
- Py_ssize_t message_size;
- char *message;
- char *status_details;
- gpr_slice message_slice;
c_op.op = type;
+ c_op.flags = 0;
switch (type) {
case GRPC_OP_SEND_INITIAL_METADATA:
- if (!pygrpc_cast_pylist_to_send_metadata(
+ if (!pygrpc_cast_pyseq_to_send_metadata(
PyTuple_GetItem(op, INITIAL_METADATA_INDEX),
&c_op.data.send_initial_metadata.metadata,
&c_op.data.send_initial_metadata.count)) {
@@ -177,24 +184,25 @@ int pygrpc_produce_op(PyObject *op, grpc_op *result) {
PyString_AsStringAndSize(
PyTuple_GET_ITEM(op, MESSAGE_INDEX), &message, &message_size);
message_slice = gpr_slice_from_copied_buffer(message, message_size);
- c_op.data.send_message = grpc_byte_buffer_create(&message_slice, 1);
+ c_op.data.send_message = grpc_raw_byte_buffer_create(&message_slice, 1);
gpr_slice_unref(message_slice);
break;
case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
/* Don't need to fill in any other fields. */
break;
case GRPC_OP_SEND_STATUS_FROM_SERVER:
- if (!pygrpc_cast_pylist_to_send_metadata(
+ if (!pygrpc_cast_pyseq_to_send_metadata(
PyTuple_GetItem(op, TRAILING_METADATA_INDEX),
&c_op.data.send_status_from_server.trailing_metadata,
&c_op.data.send_status_from_server.trailing_metadata_count)) {
return 0;
}
if (!PyTuple_Check(PyTuple_GET_ITEM(op, STATUS_INDEX))) {
- char buf[64];
- snprintf(buf, sizeof(buf), "expected tuple status in op of length %d",
- STATUS_TUPLE_SIZE);
- PyErr_SetString(PyExc_TypeError, buf);
+ char *buf;
+ gpr_asprintf(&buf, "expected tuple status in op of length %d",
+ STATUS_TUPLE_SIZE);
+ PyErr_SetString(PyExc_ValueError, buf);
+ gpr_free(buf);
return 0;
}
c_op.data.send_status_from_server.status = PyInt_AsLong(
@@ -240,8 +248,16 @@ int pygrpc_produce_op(PyObject *op, grpc_op *result) {
}
void pygrpc_discard_op(grpc_op op) {
+ size_t i;
switch(op.op) {
case GRPC_OP_SEND_INITIAL_METADATA:
+ /* Whenever we produce send-metadata, we allocate new strings (to handle
+ arbitrary sequence input as opposed to just lists or just tuples). We
+ thus must free those elements. */
+ for (i = 0; i < op.data.send_initial_metadata.count; ++i) {
+ gpr_free((void *)op.data.send_initial_metadata.metadata[i].key);
+ gpr_free((void *)op.data.send_initial_metadata.metadata[i].value);
+ }
gpr_free(op.data.send_initial_metadata.metadata);
break;
case GRPC_OP_SEND_MESSAGE:
@@ -251,6 +267,16 @@ void pygrpc_discard_op(grpc_op op) {
/* Don't need to free any fields. */
break;
case GRPC_OP_SEND_STATUS_FROM_SERVER:
+ /* Whenever we produce send-metadata, we allocate new strings (to handle
+ arbitrary sequence input as opposed to just lists or just tuples). We
+ thus must free those elements. */
+ for (i = 0; i < op.data.send_status_from_server.trailing_metadata_count;
+ ++i) {
+ gpr_free(
+ (void *)op.data.send_status_from_server.trailing_metadata[i].key);
+ gpr_free(
+ (void *)op.data.send_status_from_server.trailing_metadata[i].value);
+ }
gpr_free(op.data.send_status_from_server.trailing_metadata);
gpr_free((char *)op.data.send_status_from_server.status_details);
break;
@@ -351,9 +377,14 @@ double pygrpc_cast_gpr_timespec_to_double(gpr_timespec timespec) {
return timespec.tv_sec + 1e-9*timespec.tv_nsec;
}
+/* Because C89 doesn't have a way to check for infinity... */
+static int pygrpc_isinf(double x) {
+ return x * 0 != 0;
+}
+
gpr_timespec pygrpc_cast_double_to_gpr_timespec(double seconds) {
gpr_timespec result;
- if isinf(seconds) {
+ if (pygrpc_isinf(seconds)) {
result = seconds > 0.0 ? gpr_inf_future : gpr_inf_past;
} else {
result.tv_sec = (time_t)seconds;
@@ -365,7 +396,9 @@ gpr_timespec pygrpc_cast_double_to_gpr_timespec(double seconds) {
int pygrpc_produce_channel_args(PyObject *py_args, grpc_channel_args *c_args) {
size_t num_args = PyList_Size(py_args);
size_t i;
- grpc_channel_args args = {num_args, gpr_malloc(sizeof(grpc_arg) * num_args)};
+ grpc_channel_args args;
+ args.num_args = num_args;
+ args.args = gpr_malloc(sizeof(grpc_arg) * num_args);
for (i = 0; i < args.num_args; ++i) {
char *key;
PyObject *value;
@@ -405,31 +438,41 @@ void pygrpc_discard_channel_args(grpc_channel_args args) {
gpr_free(args.args);
}
-int pygrpc_cast_pylist_to_send_metadata(
- PyObject *pylist, grpc_metadata **metadata, size_t *count) {
+int pygrpc_cast_pyseq_to_send_metadata(
+ PyObject *pyseq, grpc_metadata **metadata, size_t *count) {
size_t i;
Py_ssize_t value_length;
- *count = PyList_Size(pylist);
+ char *key;
+ char *value;
+ if (!PySequence_Check(pyseq)) {
+ return 0;
+ }
+ *count = PySequence_Size(pyseq);
*metadata = gpr_malloc(sizeof(grpc_metadata) * *count);
for (i = 0; i < *count; ++i) {
- if (!PyArg_ParseTuple(
- PyList_GetItem(pylist, i), "ss#",
- &(*metadata)[i].key, &(*metadata)[i].value, &value_length)) {
+ PyObject *item = PySequence_GetItem(pyseq, i);
+ if (!PyArg_ParseTuple(item, "ss#", &key, &value, &value_length)) {
+ Py_DECREF(item);
gpr_free(*metadata);
*count = 0;
*metadata = NULL;
return 0;
+ } else {
+ (*metadata)[i].key = gpr_strdup(key);
+ (*metadata)[i].value = gpr_malloc(value_length);
+ memcpy((void *)(*metadata)[i].value, value, value_length);
+ Py_DECREF(item);
}
(*metadata)[i].value_length = value_length;
}
return 1;
}
-PyObject *pygrpc_cast_metadata_array_to_pylist(grpc_metadata_array metadata) {
- PyObject *result = PyList_New(metadata.count);
+PyObject *pygrpc_cast_metadata_array_to_pyseq(grpc_metadata_array metadata) {
+ PyObject *result = PyTuple_New(metadata.count);
size_t i;
for (i = 0; i < metadata.count; ++i) {
- PyList_SetItem(
+ PyTuple_SetItem(
result, i, Py_BuildValue(
"ss#", metadata.metadata[i].key, metadata.metadata[i].value,
(Py_ssize_t)metadata.metadata[i].value_length));
@@ -443,18 +486,18 @@ PyObject *pygrpc_cast_metadata_array_to_pylist(grpc_metadata_array metadata) {
void pygrpc_byte_buffer_to_bytes(
grpc_byte_buffer *buffer, char **result, size_t *result_size) {
- grpc_byte_buffer_reader *reader = grpc_byte_buffer_reader_create(buffer);
+ grpc_byte_buffer_reader reader;
+ grpc_byte_buffer_reader_init(&reader, buffer);
gpr_slice slice;
char *read_result = NULL;
size_t size = 0;
- while (grpc_byte_buffer_reader_next(reader, &slice)) {
+ while (grpc_byte_buffer_reader_next(&reader, &slice)) {
read_result = gpr_realloc(read_result, size + GPR_SLICE_LENGTH(slice));
memcpy(read_result + size, GPR_SLICE_START_PTR(slice),
GPR_SLICE_LENGTH(slice));
size = size + GPR_SLICE_LENGTH(slice);
gpr_slice_unref(slice);
}
- grpc_byte_buffer_reader_destroy(reader);
*result_size = size;
*result = read_result;
}
diff --git a/src/python/src/grpc/_adapter/_intermediary_low.py b/src/python/src/grpc/_adapter/_intermediary_low.py
index a6e325c4e5..3c7f0a2619 100644
--- a/src/python/src/grpc/_adapter/_intermediary_low.py
+++ b/src/python/src/grpc/_adapter/_intermediary_low.py
@@ -100,7 +100,7 @@ class _TagAdapter(collections.namedtuple('_TagAdapter', [
class Call(object):
"""Adapter from old _low.Call interface to new _low.Call."""
-
+
def __init__(self, channel, completion_queue, method, host, deadline):
self._internal = channel._internal.create_call(
completion_queue._internal, method, host, deadline)
@@ -144,10 +144,11 @@ class Call(object):
self._metadata.append((key, value))
def premetadata(self):
- return self._internal.start_batch([
+ result = self._internal.start_batch([
_types.OpArgs.send_initial_metadata(self._metadata)
], _IGNORE_ME_TAG)
self._metadata = []
+ return result
def read(self, tag):
return self._internal.start_batch([
@@ -207,7 +208,7 @@ class CompletionQueue(object):
complete_accepted = ev.success if kind == Event.Kind.COMPLETE_ACCEPTED else None
service_acceptance = ServiceAcceptance(Call._from_internal(ev.call), ev.call_details.method, ev.call_details.host, ev.call_details.deadline) if kind == Event.Kind.SERVICE_ACCEPTED else None
message_bytes = ev.results[0].message if kind == Event.Kind.READ_ACCEPTED else None
- status = Status(ev.results[0].status.code, ev.results[0].status.details) if (kind == Event.Kind.FINISH and ev.results[0].status) else Status(_types.StatusCode.CANCELLED if ev.results[0].cancelled else _types.StatusCode.OK, '') if ev.results[0].cancelled is not None else None
+ status = Status(ev.results[0].status.code, ev.results[0].status.details) if (kind == Event.Kind.FINISH and ev.results[0].status) else Status(_types.StatusCode.CANCELLED if ev.results[0].cancelled else _types.StatusCode.OK, '') if len(ev.results) > 0 and ev.results[0].cancelled is not None else None
metadata = ev.results[0].initial_metadata if (kind in [Event.Kind.SERVICE_ACCEPTED, Event.Kind.METADATA_ACCEPTED]) else (ev.results[0].trailing_metadata if kind == Event.Kind.FINISH else None)
else:
raise RuntimeError('unknown event')
@@ -241,7 +242,7 @@ class Server(object):
return self._internal.request_call(self._internal_cq, _TagAdapter(tag, Event.Kind.SERVICE_ACCEPTED))
def stop(self):
- return self._internal.shutdown()
+ return self._internal.shutdown(_TagAdapter(None, Event.Kind.STOP))
class ClientCredentials(object):
@@ -253,6 +254,6 @@ class ClientCredentials(object):
class ServerCredentials(object):
"""Adapter from old _low.ServerCredentials interface to new _low.ServerCredentials."""
-
+
def __init__(self, root_credentials, pair_sequence):
self._internal = _low.ServerCredentials.ssl(root_credentials, list(pair_sequence))
diff --git a/src/python/src/grpc/_adapter/_intermediary_low_test.py b/src/python/src/grpc/_adapter/_intermediary_low_test.py
index 6ff51c43a6..27a5b82e9c 100644
--- a/src/python/src/grpc/_adapter/_intermediary_low_test.py
+++ b/src/python/src/grpc/_adapter/_intermediary_low_test.py
@@ -29,6 +29,8 @@
"""Tests for the old '_low'."""
+import Queue
+import threading
import time
import unittest
@@ -43,6 +45,7 @@ _BYTE_SEQUENCE_SEQUENCE = tuple(
bytes(bytearray((row + column) % 256 for column in range(row)))
for row in range(_STREAM_LENGTH))
+
class LonelyClientTest(unittest.TestCase):
def testLonelyClient(self):
@@ -79,6 +82,14 @@ class LonelyClientTest(unittest.TestCase):
del completion_queue
+def _drive_completion_queue(completion_queue, event_queue):
+ while True:
+ event = completion_queue.get(_FUTURE)
+ if event.kind is _low.Event.Kind.STOP:
+ break
+ event_queue.put(event)
+
+
class EchoTest(unittest.TestCase):
def setUp(self):
@@ -88,32 +99,27 @@ class EchoTest(unittest.TestCase):
self.server = _low.Server(self.server_completion_queue)
port = self.server.add_http2_addr('[::]:0')
self.server.start()
+ self.server_events = Queue.Queue()
+ self.server_completion_queue_thread = threading.Thread(
+ target=_drive_completion_queue,
+ args=(self.server_completion_queue, self.server_events))
+ self.server_completion_queue_thread.start()
self.client_completion_queue = _low.CompletionQueue()
self.channel = _low.Channel('%s:%d' % (self.host, port), None)
+ self.client_events = Queue.Queue()
+ self.client_completion_queue_thread = threading.Thread(
+ target=_drive_completion_queue,
+ args=(self.client_completion_queue, self.client_events))
+ self.client_completion_queue_thread.start()
def tearDown(self):
self.server.stop()
- # NOTE(nathaniel): Yep, this is weird; it's a consequence of
- # grpc_server_destroy's being what has the effect of telling the server's
- # completion queue to pump out all pending events/tags immediately rather
- # than gracefully completing all outstanding RPCs while accepting no new
- # ones.
- # TODO(nathaniel): Deallocation of a Python object shouldn't have this kind
- # of observable side effect let alone such an important one.
- del self.server
self.server_completion_queue.stop()
self.client_completion_queue.stop()
- while True:
- event = self.server_completion_queue.get(_FUTURE)
- if event is not None and event.kind is _low.Event.Kind.STOP:
- break
- while True:
- event = self.client_completion_queue.get(_FUTURE)
- if event is not None and event.kind is _low.Event.Kind.STOP:
- break
- self.server_completion_queue = None
- self.client_completion_queue = None
+ self.server_completion_queue_thread.join()
+ self.client_completion_queue_thread.join()
+ del self.server
def _perform_echo_test(self, test_data):
method = 'test method'
@@ -151,7 +157,7 @@ class EchoTest(unittest.TestCase):
client_call.invoke(self.client_completion_queue, metadata_tag, finish_tag)
self.server.service(service_tag)
- service_accepted = self.server_completion_queue.get(_FUTURE)
+ service_accepted = self.server_events.get()
self.assertIsNotNone(service_accepted)
self.assertIs(service_accepted.kind, _low.Event.Kind.SERVICE_ACCEPTED)
self.assertIs(service_accepted.tag, service_tag)
@@ -172,7 +178,7 @@ class EchoTest(unittest.TestCase):
server_leading_binary_metadata_value)
server_call.premetadata()
- metadata_accepted = self.client_completion_queue.get(_FUTURE)
+ metadata_accepted = self.client_events.get()
self.assertIsNotNone(metadata_accepted)
self.assertEqual(_low.Event.Kind.METADATA_ACCEPTED, metadata_accepted.kind)
self.assertEqual(metadata_tag, metadata_accepted.tag)
@@ -186,14 +192,14 @@ class EchoTest(unittest.TestCase):
for datum in test_data:
client_call.write(datum, write_tag)
- write_accepted = self.client_completion_queue.get(_FUTURE)
+ write_accepted = self.client_events.get()
self.assertIsNotNone(write_accepted)
self.assertIs(write_accepted.kind, _low.Event.Kind.WRITE_ACCEPTED)
self.assertIs(write_accepted.tag, write_tag)
self.assertIs(write_accepted.write_accepted, True)
server_call.read(read_tag)
- read_accepted = self.server_completion_queue.get(_FUTURE)
+ read_accepted = self.server_events.get()
self.assertIsNotNone(read_accepted)
self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
self.assertEqual(read_tag, read_accepted.tag)
@@ -201,14 +207,14 @@ class EchoTest(unittest.TestCase):
server_data.append(read_accepted.bytes)
server_call.write(read_accepted.bytes, write_tag)
- write_accepted = self.server_completion_queue.get(_FUTURE)
+ write_accepted = self.server_events.get()
self.assertIsNotNone(write_accepted)
self.assertEqual(_low.Event.Kind.WRITE_ACCEPTED, write_accepted.kind)
self.assertEqual(write_tag, write_accepted.tag)
self.assertTrue(write_accepted.write_accepted)
client_call.read(read_tag)
- read_accepted = self.client_completion_queue.get(_FUTURE)
+ read_accepted = self.client_events.get()
self.assertIsNotNone(read_accepted)
self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
self.assertEqual(read_tag, read_accepted.tag)
@@ -216,14 +222,14 @@ class EchoTest(unittest.TestCase):
client_data.append(read_accepted.bytes)
client_call.complete(complete_tag)
- complete_accepted = self.client_completion_queue.get(_FUTURE)
+ complete_accepted = self.client_events.get()
self.assertIsNotNone(complete_accepted)
self.assertIs(complete_accepted.kind, _low.Event.Kind.COMPLETE_ACCEPTED)
self.assertIs(complete_accepted.tag, complete_tag)
self.assertIs(complete_accepted.complete_accepted, True)
server_call.read(read_tag)
- read_accepted = self.server_completion_queue.get(_FUTURE)
+ read_accepted = self.server_events.get()
self.assertIsNotNone(read_accepted)
self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
self.assertEqual(read_tag, read_accepted.tag)
@@ -235,8 +241,8 @@ class EchoTest(unittest.TestCase):
server_trailing_binary_metadata_value)
server_call.status(_low.Status(_low.Code.OK, details), status_tag)
- server_terminal_event_one = self.server_completion_queue.get(_FUTURE)
- server_terminal_event_two = self.server_completion_queue.get(_FUTURE)
+ server_terminal_event_one = self.server_events.get()
+ server_terminal_event_two = self.server_events.get()
if server_terminal_event_one.kind == _low.Event.Kind.COMPLETE_ACCEPTED:
status_accepted = server_terminal_event_one
rpc_accepted = server_terminal_event_two
@@ -253,8 +259,8 @@ class EchoTest(unittest.TestCase):
self.assertEqual(_low.Status(_low.Code.OK, ''), rpc_accepted.status)
client_call.read(read_tag)
- client_terminal_event_one = self.client_completion_queue.get(_FUTURE)
- client_terminal_event_two = self.client_completion_queue.get(_FUTURE)
+ client_terminal_event_one = self.client_events.get()
+ client_terminal_event_two = self.client_events.get()
if client_terminal_event_one.kind == _low.Event.Kind.READ_ACCEPTED:
read_accepted = client_terminal_event_one
finish_accepted = client_terminal_event_two
@@ -276,6 +282,9 @@ class EchoTest(unittest.TestCase):
self.assertIn(server_trailing_binary_metadata_key, metadata)
self.assertEqual(server_trailing_binary_metadata_value,
metadata[server_trailing_binary_metadata_key])
+ self.assertSetEqual(set(key for key, _ in finish_accepted.metadata),
+ set((server_trailing_metadata_key,
+ server_trailing_binary_metadata_key,)))
server_timeout_none_event = self.server_completion_queue.get(0)
self.assertIsNone(server_timeout_none_event)
@@ -310,23 +319,27 @@ class CancellationTest(unittest.TestCase):
self.server = _low.Server(self.server_completion_queue)
port = self.server.add_http2_addr('[::]:0')
self.server.start()
+ self.server_events = Queue.Queue()
+ self.server_completion_queue_thread = threading.Thread(
+ target=_drive_completion_queue,
+ args=(self.server_completion_queue, self.server_events))
+ self.server_completion_queue_thread.start()
self.client_completion_queue = _low.CompletionQueue()
self.channel = _low.Channel('%s:%d' % (self.host, port), None)
+ self.client_events = Queue.Queue()
+ self.client_completion_queue_thread = threading.Thread(
+ target=_drive_completion_queue,
+ args=(self.client_completion_queue, self.client_events))
+ self.client_completion_queue_thread.start()
def tearDown(self):
self.server.stop()
- del self.server
self.server_completion_queue.stop()
self.client_completion_queue.stop()
- while True:
- event = self.server_completion_queue.get(0)
- if event is not None and event.kind is _low.Event.Kind.STOP:
- break
- while True:
- event = self.client_completion_queue.get(0)
- if event is not None and event.kind is _low.Event.Kind.STOP:
- break
+ self.server_completion_queue_thread.join()
+ self.client_completion_queue_thread.join()
+ del self.server
def testCancellation(self):
method = 'test method'
@@ -347,29 +360,29 @@ class CancellationTest(unittest.TestCase):
client_call.invoke(self.client_completion_queue, metadata_tag, finish_tag)
self.server.service(service_tag)
- service_accepted = self.server_completion_queue.get(_FUTURE)
+ service_accepted = self.server_events.get()
server_call = service_accepted.service_acceptance.call
server_call.accept(self.server_completion_queue, finish_tag)
server_call.premetadata()
- metadata_accepted = self.client_completion_queue.get(_FUTURE)
+ metadata_accepted = self.client_events.get()
self.assertIsNotNone(metadata_accepted)
for datum in test_data:
client_call.write(datum, write_tag)
- write_accepted = self.client_completion_queue.get(_FUTURE)
+ write_accepted = self.client_events.get()
server_call.read(read_tag)
- read_accepted = self.server_completion_queue.get(_FUTURE)
+ read_accepted = self.server_events.get()
server_data.append(read_accepted.bytes)
server_call.write(read_accepted.bytes, write_tag)
- write_accepted = self.server_completion_queue.get(_FUTURE)
+ write_accepted = self.server_events.get()
self.assertIsNotNone(write_accepted)
client_call.read(read_tag)
- read_accepted = self.client_completion_queue.get(_FUTURE)
+ read_accepted = self.client_events.get()
client_data.append(read_accepted.bytes)
client_call.cancel()
@@ -380,8 +393,8 @@ class CancellationTest(unittest.TestCase):
server_call.read(read_tag)
- server_terminal_event_one = self.server_completion_queue.get(_FUTURE)
- server_terminal_event_two = self.server_completion_queue.get(_FUTURE)
+ server_terminal_event_one = self.server_events.get()
+ server_terminal_event_two = self.server_events.get()
if server_terminal_event_one.kind == _low.Event.Kind.READ_ACCEPTED:
read_accepted = server_terminal_event_one
rpc_accepted = server_terminal_event_two
@@ -395,7 +408,7 @@ class CancellationTest(unittest.TestCase):
self.assertEqual(_low.Event.Kind.FINISH, rpc_accepted.kind)
self.assertEqual(_low.Status(_low.Code.CANCELLED, ''), rpc_accepted.status)
- finish_event = self.client_completion_queue.get(_FUTURE)
+ finish_event = self.client_events.get()
self.assertEqual(_low.Event.Kind.FINISH, finish_event.kind)
self.assertEqual(_low.Status(_low.Code.CANCELLED, 'Cancelled'),
finish_event.status)
diff --git a/src/python/src/grpc/_adapter/_links_test.py b/src/python/src/grpc/_adapter/_links_test.py
index 50257d8691..4729b84f84 100644
--- a/src/python/src/grpc/_adapter/_links_test.py
+++ b/src/python/src/grpc/_adapter/_links_test.py
@@ -40,7 +40,7 @@ from grpc.framework.base import interfaces
from grpc.framework.foundation import logging_pool
_IDENTITY = lambda x: x
-_TIMEOUT = 2
+_TIMEOUT = 32
# TODO(nathaniel): End-to-end metadata testing.
diff --git a/src/python/src/grpc/_adapter/_low.py b/src/python/src/grpc/_adapter/_low.py
index 0c1d3b40a5..dcf67dbc11 100644
--- a/src/python/src/grpc/_adapter/_low.py
+++ b/src/python/src/grpc/_adapter/_low.py
@@ -101,11 +101,8 @@ class Server(_types.Server):
def start(self):
return self.server.start()
- def shutdown(self, tag=_NO_TAG):
- if tag is _NO_TAG:
- return self.server.shutdown()
- else:
- return self.server.shutdown(tag)
+ def shutdown(self, tag=None):
+ return self.server.shutdown(tag)
def request_call(self, completion_queue, tag):
return self.server.request_call(completion_queue.completion_queue, tag)
diff --git a/src/python/src/grpc/_adapter/_low_test.py b/src/python/src/grpc/_adapter/_low_test.py
index e53b176caf..268e5fe765 100644
--- a/src/python/src/grpc/_adapter/_low_test.py
+++ b/src/python/src/grpc/_adapter/_low_test.py
@@ -27,6 +27,7 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+import threading
import time
import unittest
@@ -34,6 +35,33 @@ from grpc._adapter import _types
from grpc._adapter import _low
+def WaitForEvents(completion_queues, deadline):
+ """
+ Args:
+ completion_queues: list of completion queues to wait for events on
+ deadline: absolute deadline to wait until
+
+ Returns:
+ a sequence of events of length len(completion_queues).
+ """
+
+ results = [None] * len(completion_queues)
+ lock = threading.Lock()
+ threads = []
+ def set_ith_result(i, completion_queue):
+ result = completion_queue.next(deadline)
+ with lock:
+ print i, completion_queue, result, time.time() - deadline
+ results[i] = result
+ for i, completion_queue in enumerate(completion_queues):
+ thread = threading.Thread(target=set_ith_result,
+ args=[i, completion_queue])
+ thread.start()
+ threads.append(thread)
+ for thread in threads:
+ thread.join()
+ return results
+
class InsecureServerInsecureClient(unittest.TestCase):
def setUp(self):
@@ -48,7 +76,6 @@ class InsecureServerInsecureClient(unittest.TestCase):
def tearDown(self):
self.server.shutdown()
del self.client_channel
- del self.server
self.client_completion_queue.shutdown()
while self.client_completion_queue.next().type != _types.EventType.QUEUE_SHUTDOWN:
@@ -59,6 +86,7 @@ class InsecureServerInsecureClient(unittest.TestCase):
del self.client_completion_queue
del self.server_completion_queue
+ del self.server
def testEcho(self):
DEADLINE = time.time()+5
@@ -95,7 +123,8 @@ class InsecureServerInsecureClient(unittest.TestCase):
], client_call_tag)
self.assertEquals(_types.CallError.OK, client_start_batch_result)
- request_event = self.server_completion_queue.next(DEADLINE)
+ client_no_event, request_event, = WaitForEvents([self.client_completion_queue, self.server_completion_queue], time.time() + 2)
+ self.assertEquals(client_no_event, None)
self.assertEquals(_types.EventType.OP_COMPLETE, request_event.type)
self.assertIsInstance(request_event.call, _low.Call)
self.assertIs(server_request_tag, request_event.tag)
@@ -118,8 +147,7 @@ class InsecureServerInsecureClient(unittest.TestCase):
], server_call_tag)
self.assertEquals(_types.CallError.OK, server_start_batch_result)
- client_event = self.client_completion_queue.next(DEADLINE)
- server_event = self.server_completion_queue.next(DEADLINE)
+ client_event, server_event, = WaitForEvents([self.client_completion_queue, self.server_completion_queue], time.time() + 1)
self.assertEquals(6, len(client_event.results))
found_client_op_types = set()
diff --git a/src/python/src/setup.py b/src/python/src/setup.py
index dc655a70f9..5398b09936 100644
--- a/src/python/src/setup.py
+++ b/src/python/src/setup.py
@@ -86,13 +86,13 @@ _PACKAGE_DIRECTORIES = {
setuptools.setup(
name='grpcio',
- version='0.9.0a0',
+ version='0.9.0a1',
ext_modules=[_EXTENSION_MODULE],
packages=list(_PACKAGES),
package_dir=_PACKAGE_DIRECTORIES,
install_requires=[
'enum34==1.0.4',
'futures==2.2.0',
- 'protobuf==3.0.0a2'
+ 'protobuf==3.0.0a3'
]
)