aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-06-16 17:08:30 -0700
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-06-16 17:08:30 -0700
commitbd6d3d0499a6485f91d26b719a0d880bc36541cd (patch)
treef905fe1e948bdad5b89993683d21f974e55a23f6
parent4b01f6e08597ab4142fe9df00bbf51e444226a8c (diff)
parent4c32de585a0f95139d0c2487bc4f4dc954ed2be7 (diff)
Merge branch 'we-dont-need-no-backup' of github.com:ctiller/grpc into we-dont-need-no-backup
-rw-r--r--include/grpc/grpc.h5
-rw-r--r--src/core/iomgr/pollset_multipoller_with_epoll.c5
-rw-r--r--src/core/iomgr/pollset_multipoller_with_poll_posix.c5
-rw-r--r--src/core/iomgr/pollset_posix.c5
-rw-r--r--src/core/surface/completion_queue.c5
-rw-r--r--src/core/surface/completion_queue.h3
-rw-r--r--src/core/surface/server.c7
-rw-r--r--src/python/src/grpc/_adapter/_c/types/server.c1
-rw-r--r--src/python/src/grpc/_adapter/_low_test.py34
9 files changed, 60 insertions, 10 deletions
diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h
index 0eed892db9..a9cbf7e192 100644
--- a/include/grpc/grpc.h
+++ b/include/grpc/grpc.h
@@ -144,7 +144,10 @@ typedef enum grpc_call_error {
/* the flags value was illegal for this call */
GRPC_CALL_ERROR_INVALID_FLAGS,
/* invalid metadata was passed to this call */
- GRPC_CALL_ERROR_INVALID_METADATA
+ GRPC_CALL_ERROR_INVALID_METADATA,
+ /* completion queue for notification has not been registered with the server
+ */
+ GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE
} grpc_call_error;
/* Write Flags: */
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c
index 8e585a007d..e5e3435feb 100644
--- a/src/core/iomgr/pollset_multipoller_with_epoll.c
+++ b/src/core/iomgr/pollset_multipoller_with_epoll.c
@@ -100,8 +100,9 @@ static int multipoll_with_epoll_pollset_maybe_work(
if (gpr_time_cmp(deadline, gpr_inf_future) == 0) {
timeout_ms = -1;
} else {
- timeout_ms = gpr_time_to_millis(gpr_time_sub(deadline, now));
- if (timeout_ms <= 0) {
+ timeout_ms = gpr_time_to_millis(
+ gpr_time_add(gpr_time_sub(deadline, now), gpr_time_from_micros(500)));
+ if (timeout_ms < 0) {
return 1;
}
}
diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
index 5ee6980732..d21c52c0f0 100644
--- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c
+++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
@@ -116,8 +116,9 @@ static int multipoll_with_poll_pollset_maybe_work(
if (gpr_time_cmp(deadline, gpr_inf_future) == 0) {
timeout = -1;
} else {
- timeout = gpr_time_to_millis(gpr_time_sub(deadline, now));
- if (timeout <= 0) {
+ timeout = gpr_time_to_millis(
+ gpr_time_add(gpr_time_sub(deadline, now), gpr_time_from_micros(500)));
+ if (timeout < 0) {
return 1;
}
}
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index db704b9df1..0fe54c8f1d 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -346,8 +346,9 @@ static int basic_pollset_maybe_work(grpc_pollset *pollset,
if (gpr_time_cmp(deadline, gpr_inf_future) == 0) {
timeout = -1;
} else {
- timeout = gpr_time_to_millis(gpr_time_sub(deadline, now));
- if (timeout <= 0) {
+ timeout = gpr_time_to_millis(
+ gpr_time_add(gpr_time_sub(deadline, now), gpr_time_from_micros(500)));
+ if (timeout < 0) {
return 1;
}
}
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index 57ecf365cc..bd0fabf9da 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -73,6 +73,7 @@ struct grpc_completion_queue {
event *queue;
/* Fixed size chained hash table of events for pluck() */
event *buckets[NUM_TAG_BUCKETS];
+ int is_server_cq;
};
grpc_completion_queue *grpc_completion_queue_create(void) {
@@ -323,3 +324,7 @@ void grpc_cq_hack_spin_pollset(grpc_completion_queue *cc) {
gpr_time_add(gpr_now(), gpr_time_from_millis(100)));
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
}
+
+void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; }
+
+int grpc_cq_is_server_cq(grpc_completion_queue *cc) { return cc->is_server_cq; }
diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h
index 2249d0e789..e76910c00b 100644
--- a/src/core/surface/completion_queue.h
+++ b/src/core/surface/completion_queue.h
@@ -63,4 +63,7 @@ grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc);
void grpc_cq_hack_spin_pollset(grpc_completion_queue *cc);
+void grpc_cq_mark_server_cq(grpc_completion_queue *cc);
+int grpc_cq_is_server_cq(grpc_completion_queue *cc);
+
#endif /* GRPC_INTERNAL_CORE_SURFACE_COMPLETION_QUEUE_H */
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 3671efe0d0..10cb8538ac 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -709,6 +709,7 @@ void grpc_server_register_completion_queue(grpc_server *server,
if (server->cqs[i] == cq) return;
}
GRPC_CQ_INTERNAL_REF(cq, "server");
+ grpc_cq_mark_server_cq(cq);
n = server->cq_count++;
server->cqs = gpr_realloc(server->cqs,
server->cq_count * sizeof(grpc_completion_queue *));
@@ -1081,6 +1082,9 @@ grpc_call_error grpc_server_request_call(
GRPC_SERVER_LOG_REQUEST_CALL(GPR_INFO, server, call, details,
initial_metadata, cq_bound_to_call,
cq_for_notification, tag);
+ if (!grpc_cq_is_server_cq(cq_for_notification)) {
+ return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
+ }
grpc_cq_begin_op(cq_for_notification, NULL);
rc.type = BATCH_CALL;
rc.tag = tag;
@@ -1099,6 +1103,9 @@ grpc_call_error grpc_server_request_registered_call(
grpc_completion_queue *cq_for_notification, void *tag) {
requested_call rc;
registered_method *registered_method = rm;
+ if (!grpc_cq_is_server_cq(cq_for_notification)) {
+ return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
+ }
grpc_cq_begin_op(cq_for_notification, NULL);
rc.type = REGISTERED_CALL;
rc.tag = tag;
diff --git a/src/python/src/grpc/_adapter/_c/types/server.c b/src/python/src/grpc/_adapter/_c/types/server.c
index 26b38da8f1..2a00f34039 100644
--- a/src/python/src/grpc/_adapter/_c/types/server.c
+++ b/src/python/src/grpc/_adapter/_c/types/server.c
@@ -105,6 +105,7 @@ Server *pygrpc_Server_new(PyTypeObject *type, PyObject *args, PyObject *kwargs)
}
self = (Server *)type->tp_alloc(type, 0);
self->c_serv = grpc_server_create(&c_args);
+ grpc_server_register_completion_queue(self->c_serv, cq->c_cq);
pygrpc_discard_channel_args(c_args);
self->cq = cq;
Py_INCREF(self->cq);
diff --git a/src/python/src/grpc/_adapter/_low_test.py b/src/python/src/grpc/_adapter/_low_test.py
index 8a9f1a0c49..268e5fe765 100644
--- a/src/python/src/grpc/_adapter/_low_test.py
+++ b/src/python/src/grpc/_adapter/_low_test.py
@@ -27,6 +27,7 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+import threading
import time
import unittest
@@ -34,6 +35,33 @@ from grpc._adapter import _types
from grpc._adapter import _low
+def WaitForEvents(completion_queues, deadline):
+ """
+ Args:
+ completion_queues: list of completion queues to wait for events on
+ deadline: absolute deadline to wait until
+
+ Returns:
+ a sequence of events of length len(completion_queues).
+ """
+
+ results = [None] * len(completion_queues)
+ lock = threading.Lock()
+ threads = []
+ def set_ith_result(i, completion_queue):
+ result = completion_queue.next(deadline)
+ with lock:
+ print i, completion_queue, result, time.time() - deadline
+ results[i] = result
+ for i, completion_queue in enumerate(completion_queues):
+ thread = threading.Thread(target=set_ith_result,
+ args=[i, completion_queue])
+ thread.start()
+ threads.append(thread)
+ for thread in threads:
+ thread.join()
+ return results
+
class InsecureServerInsecureClient(unittest.TestCase):
def setUp(self):
@@ -95,7 +123,8 @@ class InsecureServerInsecureClient(unittest.TestCase):
], client_call_tag)
self.assertEquals(_types.CallError.OK, client_start_batch_result)
- request_event = self.server_completion_queue.next(DEADLINE)
+ client_no_event, request_event, = WaitForEvents([self.client_completion_queue, self.server_completion_queue], time.time() + 2)
+ self.assertEquals(client_no_event, None)
self.assertEquals(_types.EventType.OP_COMPLETE, request_event.type)
self.assertIsInstance(request_event.call, _low.Call)
self.assertIs(server_request_tag, request_event.tag)
@@ -118,8 +147,7 @@ class InsecureServerInsecureClient(unittest.TestCase):
], server_call_tag)
self.assertEquals(_types.CallError.OK, server_start_batch_result)
- client_event = self.client_completion_queue.next(DEADLINE)
- server_event = self.server_completion_queue.next(DEADLINE)
+ client_event, server_event, = WaitForEvents([self.client_completion_queue, self.server_completion_queue], time.time() + 1)
self.assertEquals(6, len(client_event.results))
found_client_op_types = set()