diff options
author | Craig Tiller <craig.tiller@gmail.com> | 2015-02-12 09:02:56 -0800 |
---|---|---|
committer | Craig Tiller <craig.tiller@gmail.com> | 2015-02-12 09:02:56 -0800 |
commit | 20bc56d8908358b1e4f8e8d853a3d3c53f8cb4fe (patch) | |
tree | 2c9e02ebef55da5953a9049c6220757b8a1dbc68 | |
parent | fada7d43b1ee8d128e4ee5267ab859e84ce56752 (diff) |
Multi-completion-queue-server
Allow binding a different completion queue to each registered method.
This will allow multiplexing for the C++ server between sync & async
methods more easily.
-rw-r--r-- | include/grpc/grpc.h | 3 | ||||
-rw-r--r-- | src/core/iomgr/tcp_server.h | 5 | ||||
-rw-r--r-- | src/core/iomgr/tcp_server_posix.c | 11 | ||||
-rw-r--r-- | src/core/security/server_secure_chttp2.c | 5 | ||||
-rw-r--r-- | src/core/surface/server.c | 79 | ||||
-rw-r--r-- | src/core/surface/server.h | 2 | ||||
-rw-r--r-- | src/core/surface/server_chttp2.c | 4 | ||||
-rw-r--r-- | src/cpp/server/server.cc | 2 | ||||
-rw-r--r-- | test/core/iomgr/tcp_server_posix_test.c | 6 |
9 files changed, 77 insertions, 40 deletions
diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index f6c7da4654..bd766b55b3 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -565,7 +565,8 @@ grpc_call_error grpc_server_request_call( Must be called before grpc_server_start. Returns NULL on failure. */ void *grpc_server_register_method(grpc_server *server, const char *method, - const char *host); + const char *host, + grpc_completion_queue *new_call_cq); /* Request notification of a new pre-registered call */ grpc_call_error grpc_server_request_registered_call( diff --git a/src/core/iomgr/tcp_server.h b/src/core/iomgr/tcp_server.h index 2558a1eb9f..11f9b05663 100644 --- a/src/core/iomgr/tcp_server.h +++ b/src/core/iomgr/tcp_server.h @@ -46,8 +46,9 @@ typedef void (*grpc_tcp_server_cb)(void *arg, grpc_endpoint *ep); grpc_tcp_server *grpc_tcp_server_create(void); /* Start listening to bound ports */ -void grpc_tcp_server_start(grpc_tcp_server *server, grpc_pollset *pollset, - grpc_tcp_server_cb cb, void *cb_arg); +void grpc_tcp_server_start(grpc_tcp_server *server, grpc_pollset **pollsets, + size_t pollset_count, grpc_tcp_server_cb cb, + void *cb_arg); /* Add a port to the server, returning port number on success, or negative on failure. diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index 091f0aab1a..e362dd1e6c 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -353,9 +353,10 @@ int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned index) { return (index < s->nports) ? s->ports[index].fd : -1; } -void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset *pollset, - grpc_tcp_server_cb cb, void *cb_arg) { - size_t i; +void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset **pollsets, + size_t pollset_count, grpc_tcp_server_cb cb, + void *cb_arg) { + size_t i, j; GPR_ASSERT(cb); gpr_mu_lock(&s->mu); GPR_ASSERT(!s->cb); @@ -363,8 +364,8 @@ void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset *pollset, s->cb = cb; s->cb_arg = cb_arg; for (i = 0; i < s->nports; i++) { - if (pollset) { - grpc_pollset_add_fd(pollset, s->ports[i].emfd); + for (j = 0; j < pollset_count; j++) { + grpc_pollset_add_fd(pollsets[j], s->ports[i].emfd); } grpc_fd_notify_on_read(s->ports[i].emfd, on_read, &s->ports[i]); s->active_ports++; diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c index 480c882794..19056ba23e 100644 --- a/src/core/security/server_secure_chttp2.c +++ b/src/core/security/server_secure_chttp2.c @@ -76,9 +76,10 @@ static void on_accept(void *server, grpc_endpoint *tcp) { /* Note: the following code is the same with server_chttp2.c */ /* Server callback: start listening on our ports */ -static void start(grpc_server *server, void *tcpp, grpc_pollset *pollset) { +static void start(grpc_server *server, void *tcpp, grpc_pollset **pollsets, + size_t pollset_count) { grpc_tcp_server *tcp = tcpp; - grpc_tcp_server_start(tcp, pollset, on_accept, server); + grpc_tcp_server_start(tcp, pollsets, pollset_count, on_accept, server); } /* Server callback: destroy the tcp listener (so we don't generate further diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 80b248ee84..22588194ea 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -53,7 +53,7 @@ typedef enum { PENDING_START, ALL_CALLS, CALL_LIST_COUNT } call_list; typedef struct listener { void *arg; - void (*start)(grpc_server *server, void *arg, grpc_pollset *pollset); + void (*start)(grpc_server *server, void *arg, grpc_pollset **pollsets, size_t pollset_count); void (*destroy)(grpc_server *server, void *arg); struct listener *next; } listener; @@ -101,6 +101,7 @@ struct registered_method { char *host; call_data *pending; requested_call_array requested; + grpc_completion_queue *cq; registered_method *next; }; @@ -127,7 +128,11 @@ struct grpc_server { size_t channel_filter_count; const grpc_channel_filter **channel_filters; grpc_channel_args *channel_args; - grpc_completion_queue *cq; + grpc_completion_queue *unregistered_cq; + + grpc_completion_queue **cqs; + grpc_pollset **pollsets; + size_t cq_count; gpr_mu mu; @@ -169,6 +174,7 @@ struct call_data { grpc_mdstr *host; legacy_data *legacy; + grpc_completion_queue *cq_new; call_data **root[CALL_LIST_COUNT]; call_link links[CALL_LIST_COUNT]; @@ -496,7 +502,7 @@ static void init_call_elem(grpc_call_element *elem, static void destroy_call_elem(grpc_call_element *elem) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; - int i; + size_t i; gpr_mu_lock(&chand->server->mu); for (i = 0; i < CALL_LIST_COUNT; i++) { @@ -504,7 +510,9 @@ static void destroy_call_elem(grpc_call_element *elem) { } if (chand->server->shutdown && chand->server->have_shutdown_tag && chand->server->lists[ALL_CALLS] == NULL) { - grpc_cq_end_server_shutdown(chand->server->cq, chand->server->shutdown_tag); + for (i = 0; i < chand->server->cq_count; i++) { + grpc_cq_end_server_shutdown(chand->server->cqs[i], chand->server->shutdown_tag); + } } gpr_mu_unlock(&chand->server->mu); @@ -557,6 +565,16 @@ static const grpc_channel_filter server_surface_filter = { sizeof(channel_data), init_channel_elem, destroy_channel_elem, "server", }; +static void addcq(grpc_server *server, grpc_completion_queue *cq) { + size_t i, n; + for (i = 0; i < server->cq_count; i++) { + if (server->cqs[i] == cq) return; + } + n = server->cq_count++; + server->cqs = gpr_realloc(server->cqs, server->cq_count * sizeof(grpc_completion_queue*)); + server->cqs[n] = cq; +} + grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq, grpc_channel_filter **filters, size_t filter_count, @@ -566,10 +584,11 @@ grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq, grpc_server *server = gpr_malloc(sizeof(grpc_server)); memset(server, 0, sizeof(grpc_server)); + if (cq) addcq(server, cq); gpr_mu_init(&server->mu); - server->cq = cq; + server->unregistered_cq = cq; /* decremented by grpc_server_destroy */ gpr_ref_init(&server->internal_refcount, 1); server->root_channel_data.next = server->root_channel_data.prev = @@ -605,7 +624,7 @@ static int streq(const char *a, const char *b) { } void *grpc_server_register_method(grpc_server *server, const char *method, - const char *host) { + const char *host, grpc_completion_queue *cq_new_rpc) { registered_method *m; if (!method) { gpr_log(GPR_ERROR, "%s method string cannot be NULL", __FUNCTION__); @@ -618,20 +637,28 @@ void *grpc_server_register_method(grpc_server *server, const char *method, return NULL; } } + addcq(server, cq_new_rpc); m = gpr_malloc(sizeof(registered_method)); memset(m, 0, sizeof(*m)); m->method = gpr_strdup(method); m->host = gpr_strdup(host); m->next = server->registered_methods; + m->cq = cq_new_rpc; server->registered_methods = m; return m; } void grpc_server_start(grpc_server *server) { listener *l; + size_t i; + + server->pollsets = gpr_malloc(sizeof(grpc_pollset*) * server->cq_count); + for (i = 0; i < server->cq_count; i++) { + server->pollsets[i] = grpc_cq_pollset(server->cqs[i]); + } for (l = server->listeners; l; l = l->next) { - l->start(server, l->arg, grpc_cq_pollset(server->cq)); + l->start(server, l->arg, server->pollsets, server->cq_count); } } @@ -664,7 +691,9 @@ grpc_transport_setup_result grpc_server_setup_transport( } filters[i] = &grpc_connected_channel_filter; - grpc_transport_add_to_pollset(transport, grpc_cq_pollset(s->cq)); + for (i = 0; i < s->cq_count; i++) { + grpc_transport_add_to_pollset(transport, grpc_cq_pollset(s->cqs[i])); + } channel = grpc_channel_create_from_filters(filters, num_filters, s->channel_args, mdctx, 0); @@ -765,9 +794,11 @@ static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag, server->have_shutdown_tag = have_shutdown_tag; server->shutdown_tag = shutdown_tag; if (have_shutdown_tag) { - grpc_cq_begin_op(server->cq, NULL, GRPC_SERVER_SHUTDOWN); - if (server->lists[ALL_CALLS] == NULL) { - grpc_cq_end_server_shutdown(server->cq, shutdown_tag); + for (i = 0; i < server->cq_count; i++) { + grpc_cq_begin_op(server->cqs[i], NULL, GRPC_SERVER_SHUTDOWN); + if (server->lists[ALL_CALLS] == NULL) { + grpc_cq_end_server_shutdown(server->cqs[i], shutdown_tag); + } } } gpr_mu_unlock(&server->mu); @@ -826,7 +857,7 @@ void grpc_server_destroy(grpc_server *server) { void grpc_server_add_listener(grpc_server *server, void *arg, void (*start)(grpc_server *server, void *arg, - grpc_pollset *pollset), + grpc_pollset **pollsets, size_t pollset_count), void (*destroy)(grpc_server *server, void *arg)) { listener *l = gpr_malloc(sizeof(listener)); l->arg = arg; @@ -878,7 +909,7 @@ grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call, grpc_completion_queue *cq_bind, void *tag) { requested_call rc; - grpc_cq_begin_op(server->cq, NULL, GRPC_OP_COMPLETE); + grpc_cq_begin_op(server->unregistered_cq, NULL, GRPC_OP_COMPLETE); rc.type = BATCH_CALL; rc.tag = tag; rc.data.batch.cq_bind = cq_bind; @@ -889,12 +920,13 @@ grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call, } grpc_call_error grpc_server_request_registered_call( - grpc_server *server, void *registered_method, grpc_call **call, + grpc_server *server, void *rm, grpc_call **call, gpr_timespec *deadline, grpc_metadata_array *initial_metadata, grpc_byte_buffer **optional_payload, grpc_completion_queue *cq_bind, void *tag) { requested_call rc; - grpc_cq_begin_op(server->cq, NULL, GRPC_OP_COMPLETE); + registered_method *registered_method = rm; + grpc_cq_begin_op(registered_method->cq, NULL, GRPC_OP_COMPLETE); rc.type = REGISTERED_CALL; rc.tag = tag; rc.data.registered.cq_bind = cq_bind; @@ -909,7 +941,7 @@ grpc_call_error grpc_server_request_registered_call( grpc_call_error grpc_server_request_call_old(grpc_server *server, void *tag_new) { requested_call rc; - grpc_cq_begin_op(server->cq, NULL, GRPC_SERVER_RPC_NEW); + grpc_cq_begin_op(server->unregistered_cq, NULL, GRPC_SERVER_RPC_NEW); rc.type = LEGACY_CALL; rc.tag = tag_new; return queue_call_request(server, &rc); @@ -965,6 +997,7 @@ static void begin_call(grpc_server *server, call_data *calld, r->op = GRPC_IOREQ_RECV_INITIAL_METADATA; r->data.recv_metadata = rc->data.batch.initial_metadata; r++; + calld->cq_new = server->unregistered_cq; publish = publish_registered_or_batch; break; case REGISTERED_CALL: @@ -979,6 +1012,7 @@ static void begin_call(grpc_server *server, call_data *calld, r->data.recv_message = rc->data.registered.optional_payload; r++; } + calld->cq_new = rc->data.registered.registered_method->cq; publish = publish_registered_or_batch; break; } @@ -991,19 +1025,19 @@ static void begin_call(grpc_server *server, call_data *calld, static void fail_call(grpc_server *server, requested_call *rc) { switch (rc->type) { case LEGACY_CALL: - grpc_cq_end_new_rpc(server->cq, rc->tag, NULL, do_nothing, NULL, NULL, + grpc_cq_end_new_rpc(server->unregistered_cq, rc->tag, NULL, do_nothing, NULL, NULL, NULL, gpr_inf_past, 0, NULL); break; case BATCH_CALL: *rc->data.batch.call = NULL; rc->data.batch.initial_metadata->count = 0; - grpc_cq_end_op_complete(server->cq, rc->tag, NULL, do_nothing, NULL, + grpc_cq_end_op_complete(server->unregistered_cq, rc->tag, NULL, do_nothing, NULL, GRPC_OP_ERROR); break; case REGISTERED_CALL: *rc->data.registered.call = NULL; rc->data.registered.initial_metadata->count = 0; - grpc_cq_end_op_complete(server->cq, rc->tag, NULL, do_nothing, NULL, + grpc_cq_end_op_complete(rc->data.registered.registered_method->cq, rc->tag, NULL, do_nothing, NULL, GRPC_OP_ERROR); break; } @@ -1017,7 +1051,7 @@ static void publish_legacy(grpc_call *call, grpc_op_error status, void *tag) { grpc_server *server = chand->server; if (status == GRPC_OP_OK) { - grpc_cq_end_new_rpc(server->cq, tag, call, do_nothing, NULL, + grpc_cq_end_new_rpc(server->unregistered_cq, tag, call, do_nothing, NULL, grpc_mdstr_as_c_string(calld->path), grpc_mdstr_as_c_string(calld->host), calld->deadline, calld->legacy->initial_metadata.count, @@ -1032,9 +1066,8 @@ static void publish_registered_or_batch(grpc_call *call, grpc_op_error status, void *tag) { grpc_call_element *elem = grpc_call_stack_element(grpc_call_get_call_stack(call), 0); - channel_data *chand = elem->channel_data; - grpc_server *server = chand->server; - grpc_cq_end_op_complete(server->cq, tag, call, do_nothing, NULL, status); + call_data *calld = elem->call_data; + grpc_cq_end_op_complete(calld->cq_new, tag, call, do_nothing, NULL, status); } const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) { diff --git a/src/core/surface/server.h b/src/core/surface/server.h index 50574d66a4..c8861f420d 100644 --- a/src/core/surface/server.h +++ b/src/core/surface/server.h @@ -48,7 +48,7 @@ grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq, and when it shuts down, it will call destroy */ void grpc_server_add_listener(grpc_server *server, void *listener, void (*start)(grpc_server *server, void *arg, - grpc_pollset *pollset), + grpc_pollset **pollsets, size_t npollsets), void (*destroy)(grpc_server *server, void *arg)); /* Setup a transport - creates a channel stack, binds the transport to the diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c index 5ba7d47efd..3b6abb7d03 100644 --- a/src/core/surface/server_chttp2.c +++ b/src/core/surface/server_chttp2.c @@ -59,9 +59,9 @@ static void new_transport(void *server, grpc_endpoint *tcp) { } /* Server callback: start listening on our ports */ -static void start(grpc_server *server, void *tcpp, grpc_pollset *pollset) { +static void start(grpc_server *server, void *tcpp, grpc_pollset **pollsets, size_t pollset_count) { grpc_tcp_server *tcp = tcpp; - grpc_tcp_server_start(tcp, pollset, new_transport, server); + grpc_tcp_server_start(tcp, pollsets, pollset_count, new_transport, server); } /* Server callback: destroy the tcp listener (so we don't generate further diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index 17b0543bcd..938a549d4f 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -84,7 +84,7 @@ Server::~Server() { bool Server::RegisterService(RpcService *service) { for (int i = 0; i < service->GetMethodCount(); ++i) { RpcServiceMethod *method = service->GetMethod(i); - void *tag = grpc_server_register_method(server_, method->name(), nullptr); + void *tag = grpc_server_register_method(server_, method->name(), nullptr, cq_.cq()); if (!tag) { gpr_log(GPR_DEBUG, "Attempt to register %s multiple times", method->name()); diff --git a/test/core/iomgr/tcp_server_posix_test.c b/test/core/iomgr/tcp_server_posix_test.c index e906f302cf..ae6994ef07 100644 --- a/test/core/iomgr/tcp_server_posix_test.c +++ b/test/core/iomgr/tcp_server_posix_test.c @@ -66,7 +66,7 @@ static void test_no_op(void) { static void test_no_op_with_start(void) { grpc_tcp_server *s = grpc_tcp_server_create(); LOG_TEST(); - grpc_tcp_server_start(s, NULL, on_connect, NULL); + grpc_tcp_server_start(s, NULL, 0, on_connect, NULL); grpc_tcp_server_destroy(s); } @@ -93,7 +93,7 @@ static void test_no_op_with_port_and_start(void) { GPR_ASSERT( grpc_tcp_server_add_port(s, (struct sockaddr *)&addr, sizeof(addr))); - grpc_tcp_server_start(s, NULL, on_connect, NULL); + grpc_tcp_server_start(s, NULL, 0, on_connect, NULL); grpc_tcp_server_destroy(s); } @@ -120,7 +120,7 @@ static void test_connect(int n) { GPR_ASSERT(getsockname(svrfd, (struct sockaddr *)&addr, &addr_len) == 0); GPR_ASSERT(addr_len <= sizeof(addr)); - grpc_tcp_server_start(s, NULL, on_connect, NULL); + grpc_tcp_server_start(s, NULL, 0, on_connect, NULL); for (i = 0; i < n; i++) { deadline = gpr_time_add(gpr_now(), gpr_time_from_micros(10000000)); |