diff options
author | Craig Tiller <craig.tiller@gmail.com> | 2015-06-16 17:08:30 -0700 |
---|---|---|
committer | Craig Tiller <craig.tiller@gmail.com> | 2015-06-16 17:08:30 -0700 |
commit | bd6d3d0499a6485f91d26b719a0d880bc36541cd (patch) | |
tree | f905fe1e948bdad5b89993683d21f974e55a23f6 | |
parent | 4b01f6e08597ab4142fe9df00bbf51e444226a8c (diff) | |
parent | 4c32de585a0f95139d0c2487bc4f4dc954ed2be7 (diff) |
Merge branch 'we-dont-need-no-backup' of github.com:ctiller/grpc into we-dont-need-no-backup
-rw-r--r-- | include/grpc/grpc.h | 5 | ||||
-rw-r--r-- | src/core/iomgr/pollset_multipoller_with_epoll.c | 5 | ||||
-rw-r--r-- | src/core/iomgr/pollset_multipoller_with_poll_posix.c | 5 | ||||
-rw-r--r-- | src/core/iomgr/pollset_posix.c | 5 | ||||
-rw-r--r-- | src/core/surface/completion_queue.c | 5 | ||||
-rw-r--r-- | src/core/surface/completion_queue.h | 3 | ||||
-rw-r--r-- | src/core/surface/server.c | 7 | ||||
-rw-r--r-- | src/python/src/grpc/_adapter/_c/types/server.c | 1 | ||||
-rw-r--r-- | src/python/src/grpc/_adapter/_low_test.py | 34 |
9 files changed, 60 insertions, 10 deletions
diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index 0eed892db9..a9cbf7e192 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -144,7 +144,10 @@ typedef enum grpc_call_error { /* the flags value was illegal for this call */ GRPC_CALL_ERROR_INVALID_FLAGS, /* invalid metadata was passed to this call */ - GRPC_CALL_ERROR_INVALID_METADATA + GRPC_CALL_ERROR_INVALID_METADATA, + /* completion queue for notification has not been registered with the server + */ + GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE } grpc_call_error; /* Write Flags: */ diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index 8e585a007d..e5e3435feb 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -100,8 +100,9 @@ static int multipoll_with_epoll_pollset_maybe_work( if (gpr_time_cmp(deadline, gpr_inf_future) == 0) { timeout_ms = -1; } else { - timeout_ms = gpr_time_to_millis(gpr_time_sub(deadline, now)); - if (timeout_ms <= 0) { + timeout_ms = gpr_time_to_millis( + gpr_time_add(gpr_time_sub(deadline, now), gpr_time_from_micros(500))); + if (timeout_ms < 0) { return 1; } } diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c index 5ee6980732..d21c52c0f0 100644 --- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c +++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c @@ -116,8 +116,9 @@ static int multipoll_with_poll_pollset_maybe_work( if (gpr_time_cmp(deadline, gpr_inf_future) == 0) { timeout = -1; } else { - timeout = gpr_time_to_millis(gpr_time_sub(deadline, now)); - if (timeout <= 0) { + timeout = gpr_time_to_millis( + gpr_time_add(gpr_time_sub(deadline, now), gpr_time_from_micros(500))); + if (timeout < 0) { return 1; } } diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index db704b9df1..0fe54c8f1d 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -346,8 +346,9 @@ static int basic_pollset_maybe_work(grpc_pollset *pollset, if (gpr_time_cmp(deadline, gpr_inf_future) == 0) { timeout = -1; } else { - timeout = gpr_time_to_millis(gpr_time_sub(deadline, now)); - if (timeout <= 0) { + timeout = gpr_time_to_millis( + gpr_time_add(gpr_time_sub(deadline, now), gpr_time_from_micros(500))); + if (timeout < 0) { return 1; } } diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 57ecf365cc..bd0fabf9da 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -73,6 +73,7 @@ struct grpc_completion_queue { event *queue; /* Fixed size chained hash table of events for pluck() */ event *buckets[NUM_TAG_BUCKETS]; + int is_server_cq; }; grpc_completion_queue *grpc_completion_queue_create(void) { @@ -323,3 +324,7 @@ void grpc_cq_hack_spin_pollset(grpc_completion_queue *cc) { gpr_time_add(gpr_now(), gpr_time_from_millis(100))); gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } + +void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; } + +int grpc_cq_is_server_cq(grpc_completion_queue *cc) { return cc->is_server_cq; } diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h index 2249d0e789..e76910c00b 100644 --- a/src/core/surface/completion_queue.h +++ b/src/core/surface/completion_queue.h @@ -63,4 +63,7 @@ grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc); void grpc_cq_hack_spin_pollset(grpc_completion_queue *cc); +void grpc_cq_mark_server_cq(grpc_completion_queue *cc); +int grpc_cq_is_server_cq(grpc_completion_queue *cc); + #endif /* GRPC_INTERNAL_CORE_SURFACE_COMPLETION_QUEUE_H */ diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 3671efe0d0..10cb8538ac 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -709,6 +709,7 @@ void grpc_server_register_completion_queue(grpc_server *server, if (server->cqs[i] == cq) return; } GRPC_CQ_INTERNAL_REF(cq, "server"); + grpc_cq_mark_server_cq(cq); n = server->cq_count++; server->cqs = gpr_realloc(server->cqs, server->cq_count * sizeof(grpc_completion_queue *)); @@ -1081,6 +1082,9 @@ grpc_call_error grpc_server_request_call( GRPC_SERVER_LOG_REQUEST_CALL(GPR_INFO, server, call, details, initial_metadata, cq_bound_to_call, cq_for_notification, tag); + if (!grpc_cq_is_server_cq(cq_for_notification)) { + return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; + } grpc_cq_begin_op(cq_for_notification, NULL); rc.type = BATCH_CALL; rc.tag = tag; @@ -1099,6 +1103,9 @@ grpc_call_error grpc_server_request_registered_call( grpc_completion_queue *cq_for_notification, void *tag) { requested_call rc; registered_method *registered_method = rm; + if (!grpc_cq_is_server_cq(cq_for_notification)) { + return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; + } grpc_cq_begin_op(cq_for_notification, NULL); rc.type = REGISTERED_CALL; rc.tag = tag; diff --git a/src/python/src/grpc/_adapter/_c/types/server.c b/src/python/src/grpc/_adapter/_c/types/server.c index 26b38da8f1..2a00f34039 100644 --- a/src/python/src/grpc/_adapter/_c/types/server.c +++ b/src/python/src/grpc/_adapter/_c/types/server.c @@ -105,6 +105,7 @@ Server *pygrpc_Server_new(PyTypeObject *type, PyObject *args, PyObject *kwargs) } self = (Server *)type->tp_alloc(type, 0); self->c_serv = grpc_server_create(&c_args); + grpc_server_register_completion_queue(self->c_serv, cq->c_cq); pygrpc_discard_channel_args(c_args); self->cq = cq; Py_INCREF(self->cq); diff --git a/src/python/src/grpc/_adapter/_low_test.py b/src/python/src/grpc/_adapter/_low_test.py index 8a9f1a0c49..268e5fe765 100644 --- a/src/python/src/grpc/_adapter/_low_test.py +++ b/src/python/src/grpc/_adapter/_low_test.py @@ -27,6 +27,7 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +import threading import time import unittest @@ -34,6 +35,33 @@ from grpc._adapter import _types from grpc._adapter import _low +def WaitForEvents(completion_queues, deadline): + """ + Args: + completion_queues: list of completion queues to wait for events on + deadline: absolute deadline to wait until + + Returns: + a sequence of events of length len(completion_queues). + """ + + results = [None] * len(completion_queues) + lock = threading.Lock() + threads = [] + def set_ith_result(i, completion_queue): + result = completion_queue.next(deadline) + with lock: + print i, completion_queue, result, time.time() - deadline + results[i] = result + for i, completion_queue in enumerate(completion_queues): + thread = threading.Thread(target=set_ith_result, + args=[i, completion_queue]) + thread.start() + threads.append(thread) + for thread in threads: + thread.join() + return results + class InsecureServerInsecureClient(unittest.TestCase): def setUp(self): @@ -95,7 +123,8 @@ class InsecureServerInsecureClient(unittest.TestCase): ], client_call_tag) self.assertEquals(_types.CallError.OK, client_start_batch_result) - request_event = self.server_completion_queue.next(DEADLINE) + client_no_event, request_event, = WaitForEvents([self.client_completion_queue, self.server_completion_queue], time.time() + 2) + self.assertEquals(client_no_event, None) self.assertEquals(_types.EventType.OP_COMPLETE, request_event.type) self.assertIsInstance(request_event.call, _low.Call) self.assertIs(server_request_tag, request_event.tag) @@ -118,8 +147,7 @@ class InsecureServerInsecureClient(unittest.TestCase): ], server_call_tag) self.assertEquals(_types.CallError.OK, server_start_batch_result) - client_event = self.client_completion_queue.next(DEADLINE) - server_event = self.server_completion_queue.next(DEADLINE) + client_event, server_event, = WaitForEvents([self.client_completion_queue, self.server_completion_queue], time.time() + 1) self.assertEquals(6, len(client_event.results)) found_client_op_types = set() |