aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-02-12 09:02:56 -0800
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-02-12 09:02:56 -0800
commit20bc56d8908358b1e4f8e8d853a3d3c53f8cb4fe (patch)
tree2c9e02ebef55da5953a9049c6220757b8a1dbc68
parentfada7d43b1ee8d128e4ee5267ab859e84ce56752 (diff)
Multi-completion-queue-server
Allow binding a different completion queue to each registered method. This will allow multiplexing for the C++ server between sync & async methods more easily.
-rw-r--r--include/grpc/grpc.h3
-rw-r--r--src/core/iomgr/tcp_server.h5
-rw-r--r--src/core/iomgr/tcp_server_posix.c11
-rw-r--r--src/core/security/server_secure_chttp2.c5
-rw-r--r--src/core/surface/server.c79
-rw-r--r--src/core/surface/server.h2
-rw-r--r--src/core/surface/server_chttp2.c4
-rw-r--r--src/cpp/server/server.cc2
-rw-r--r--test/core/iomgr/tcp_server_posix_test.c6
9 files changed, 77 insertions, 40 deletions
diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h
index f6c7da4654..bd766b55b3 100644
--- a/include/grpc/grpc.h
+++ b/include/grpc/grpc.h
@@ -565,7 +565,8 @@ grpc_call_error grpc_server_request_call(
Must be called before grpc_server_start.
Returns NULL on failure. */
void *grpc_server_register_method(grpc_server *server, const char *method,
- const char *host);
+ const char *host,
+ grpc_completion_queue *new_call_cq);
/* Request notification of a new pre-registered call */
grpc_call_error grpc_server_request_registered_call(
diff --git a/src/core/iomgr/tcp_server.h b/src/core/iomgr/tcp_server.h
index 2558a1eb9f..11f9b05663 100644
--- a/src/core/iomgr/tcp_server.h
+++ b/src/core/iomgr/tcp_server.h
@@ -46,8 +46,9 @@ typedef void (*grpc_tcp_server_cb)(void *arg, grpc_endpoint *ep);
grpc_tcp_server *grpc_tcp_server_create(void);
/* Start listening to bound ports */
-void grpc_tcp_server_start(grpc_tcp_server *server, grpc_pollset *pollset,
- grpc_tcp_server_cb cb, void *cb_arg);
+void grpc_tcp_server_start(grpc_tcp_server *server, grpc_pollset **pollsets,
+ size_t pollset_count, grpc_tcp_server_cb cb,
+ void *cb_arg);
/* Add a port to the server, returning port number on success, or negative
on failure.
diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c
index 091f0aab1a..e362dd1e6c 100644
--- a/src/core/iomgr/tcp_server_posix.c
+++ b/src/core/iomgr/tcp_server_posix.c
@@ -353,9 +353,10 @@ int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned index) {
return (index < s->nports) ? s->ports[index].fd : -1;
}
-void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset *pollset,
- grpc_tcp_server_cb cb, void *cb_arg) {
- size_t i;
+void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset **pollsets,
+ size_t pollset_count, grpc_tcp_server_cb cb,
+ void *cb_arg) {
+ size_t i, j;
GPR_ASSERT(cb);
gpr_mu_lock(&s->mu);
GPR_ASSERT(!s->cb);
@@ -363,8 +364,8 @@ void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset *pollset,
s->cb = cb;
s->cb_arg = cb_arg;
for (i = 0; i < s->nports; i++) {
- if (pollset) {
- grpc_pollset_add_fd(pollset, s->ports[i].emfd);
+ for (j = 0; j < pollset_count; j++) {
+ grpc_pollset_add_fd(pollsets[j], s->ports[i].emfd);
}
grpc_fd_notify_on_read(s->ports[i].emfd, on_read, &s->ports[i]);
s->active_ports++;
diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c
index 480c882794..19056ba23e 100644
--- a/src/core/security/server_secure_chttp2.c
+++ b/src/core/security/server_secure_chttp2.c
@@ -76,9 +76,10 @@ static void on_accept(void *server, grpc_endpoint *tcp) {
/* Note: the following code is the same with server_chttp2.c */
/* Server callback: start listening on our ports */
-static void start(grpc_server *server, void *tcpp, grpc_pollset *pollset) {
+static void start(grpc_server *server, void *tcpp, grpc_pollset **pollsets,
+ size_t pollset_count) {
grpc_tcp_server *tcp = tcpp;
- grpc_tcp_server_start(tcp, pollset, on_accept, server);
+ grpc_tcp_server_start(tcp, pollsets, pollset_count, on_accept, server);
}
/* Server callback: destroy the tcp listener (so we don't generate further
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 80b248ee84..22588194ea 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -53,7 +53,7 @@ typedef enum { PENDING_START, ALL_CALLS, CALL_LIST_COUNT } call_list;
typedef struct listener {
void *arg;
- void (*start)(grpc_server *server, void *arg, grpc_pollset *pollset);
+ void (*start)(grpc_server *server, void *arg, grpc_pollset **pollsets, size_t pollset_count);
void (*destroy)(grpc_server *server, void *arg);
struct listener *next;
} listener;
@@ -101,6 +101,7 @@ struct registered_method {
char *host;
call_data *pending;
requested_call_array requested;
+ grpc_completion_queue *cq;
registered_method *next;
};
@@ -127,7 +128,11 @@ struct grpc_server {
size_t channel_filter_count;
const grpc_channel_filter **channel_filters;
grpc_channel_args *channel_args;
- grpc_completion_queue *cq;
+ grpc_completion_queue *unregistered_cq;
+
+ grpc_completion_queue **cqs;
+ grpc_pollset **pollsets;
+ size_t cq_count;
gpr_mu mu;
@@ -169,6 +174,7 @@ struct call_data {
grpc_mdstr *host;
legacy_data *legacy;
+ grpc_completion_queue *cq_new;
call_data **root[CALL_LIST_COUNT];
call_link links[CALL_LIST_COUNT];
@@ -496,7 +502,7 @@ static void init_call_elem(grpc_call_element *elem,
static void destroy_call_elem(grpc_call_element *elem) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
- int i;
+ size_t i;
gpr_mu_lock(&chand->server->mu);
for (i = 0; i < CALL_LIST_COUNT; i++) {
@@ -504,7 +510,9 @@ static void destroy_call_elem(grpc_call_element *elem) {
}
if (chand->server->shutdown && chand->server->have_shutdown_tag &&
chand->server->lists[ALL_CALLS] == NULL) {
- grpc_cq_end_server_shutdown(chand->server->cq, chand->server->shutdown_tag);
+ for (i = 0; i < chand->server->cq_count; i++) {
+ grpc_cq_end_server_shutdown(chand->server->cqs[i], chand->server->shutdown_tag);
+ }
}
gpr_mu_unlock(&chand->server->mu);
@@ -557,6 +565,16 @@ static const grpc_channel_filter server_surface_filter = {
sizeof(channel_data), init_channel_elem, destroy_channel_elem, "server",
};
+static void addcq(grpc_server *server, grpc_completion_queue *cq) {
+ size_t i, n;
+ for (i = 0; i < server->cq_count; i++) {
+ if (server->cqs[i] == cq) return;
+ }
+ n = server->cq_count++;
+ server->cqs = gpr_realloc(server->cqs, server->cq_count * sizeof(grpc_completion_queue*));
+ server->cqs[n] = cq;
+}
+
grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq,
grpc_channel_filter **filters,
size_t filter_count,
@@ -566,10 +584,11 @@ grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq,
grpc_server *server = gpr_malloc(sizeof(grpc_server));
memset(server, 0, sizeof(grpc_server));
+ if (cq) addcq(server, cq);
gpr_mu_init(&server->mu);
- server->cq = cq;
+ server->unregistered_cq = cq;
/* decremented by grpc_server_destroy */
gpr_ref_init(&server->internal_refcount, 1);
server->root_channel_data.next = server->root_channel_data.prev =
@@ -605,7 +624,7 @@ static int streq(const char *a, const char *b) {
}
void *grpc_server_register_method(grpc_server *server, const char *method,
- const char *host) {
+ const char *host, grpc_completion_queue *cq_new_rpc) {
registered_method *m;
if (!method) {
gpr_log(GPR_ERROR, "%s method string cannot be NULL", __FUNCTION__);
@@ -618,20 +637,28 @@ void *grpc_server_register_method(grpc_server *server, const char *method,
return NULL;
}
}
+ addcq(server, cq_new_rpc);
m = gpr_malloc(sizeof(registered_method));
memset(m, 0, sizeof(*m));
m->method = gpr_strdup(method);
m->host = gpr_strdup(host);
m->next = server->registered_methods;
+ m->cq = cq_new_rpc;
server->registered_methods = m;
return m;
}
void grpc_server_start(grpc_server *server) {
listener *l;
+ size_t i;
+
+ server->pollsets = gpr_malloc(sizeof(grpc_pollset*) * server->cq_count);
+ for (i = 0; i < server->cq_count; i++) {
+ server->pollsets[i] = grpc_cq_pollset(server->cqs[i]);
+ }
for (l = server->listeners; l; l = l->next) {
- l->start(server, l->arg, grpc_cq_pollset(server->cq));
+ l->start(server, l->arg, server->pollsets, server->cq_count);
}
}
@@ -664,7 +691,9 @@ grpc_transport_setup_result grpc_server_setup_transport(
}
filters[i] = &grpc_connected_channel_filter;
- grpc_transport_add_to_pollset(transport, grpc_cq_pollset(s->cq));
+ for (i = 0; i < s->cq_count; i++) {
+ grpc_transport_add_to_pollset(transport, grpc_cq_pollset(s->cqs[i]));
+ }
channel = grpc_channel_create_from_filters(filters, num_filters,
s->channel_args, mdctx, 0);
@@ -765,9 +794,11 @@ static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
server->have_shutdown_tag = have_shutdown_tag;
server->shutdown_tag = shutdown_tag;
if (have_shutdown_tag) {
- grpc_cq_begin_op(server->cq, NULL, GRPC_SERVER_SHUTDOWN);
- if (server->lists[ALL_CALLS] == NULL) {
- grpc_cq_end_server_shutdown(server->cq, shutdown_tag);
+ for (i = 0; i < server->cq_count; i++) {
+ grpc_cq_begin_op(server->cqs[i], NULL, GRPC_SERVER_SHUTDOWN);
+ if (server->lists[ALL_CALLS] == NULL) {
+ grpc_cq_end_server_shutdown(server->cqs[i], shutdown_tag);
+ }
}
}
gpr_mu_unlock(&server->mu);
@@ -826,7 +857,7 @@ void grpc_server_destroy(grpc_server *server) {
void grpc_server_add_listener(grpc_server *server, void *arg,
void (*start)(grpc_server *server, void *arg,
- grpc_pollset *pollset),
+ grpc_pollset **pollsets, size_t pollset_count),
void (*destroy)(grpc_server *server, void *arg)) {
listener *l = gpr_malloc(sizeof(listener));
l->arg = arg;
@@ -878,7 +909,7 @@ grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call,
grpc_completion_queue *cq_bind,
void *tag) {
requested_call rc;
- grpc_cq_begin_op(server->cq, NULL, GRPC_OP_COMPLETE);
+ grpc_cq_begin_op(server->unregistered_cq, NULL, GRPC_OP_COMPLETE);
rc.type = BATCH_CALL;
rc.tag = tag;
rc.data.batch.cq_bind = cq_bind;
@@ -889,12 +920,13 @@ grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call,
}
grpc_call_error grpc_server_request_registered_call(
- grpc_server *server, void *registered_method, grpc_call **call,
+ grpc_server *server, void *rm, grpc_call **call,
gpr_timespec *deadline, grpc_metadata_array *initial_metadata,
grpc_byte_buffer **optional_payload, grpc_completion_queue *cq_bind,
void *tag) {
requested_call rc;
- grpc_cq_begin_op(server->cq, NULL, GRPC_OP_COMPLETE);
+ registered_method *registered_method = rm;
+ grpc_cq_begin_op(registered_method->cq, NULL, GRPC_OP_COMPLETE);
rc.type = REGISTERED_CALL;
rc.tag = tag;
rc.data.registered.cq_bind = cq_bind;
@@ -909,7 +941,7 @@ grpc_call_error grpc_server_request_registered_call(
grpc_call_error grpc_server_request_call_old(grpc_server *server,
void *tag_new) {
requested_call rc;
- grpc_cq_begin_op(server->cq, NULL, GRPC_SERVER_RPC_NEW);
+ grpc_cq_begin_op(server->unregistered_cq, NULL, GRPC_SERVER_RPC_NEW);
rc.type = LEGACY_CALL;
rc.tag = tag_new;
return queue_call_request(server, &rc);
@@ -965,6 +997,7 @@ static void begin_call(grpc_server *server, call_data *calld,
r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
r->data.recv_metadata = rc->data.batch.initial_metadata;
r++;
+ calld->cq_new = server->unregistered_cq;
publish = publish_registered_or_batch;
break;
case REGISTERED_CALL:
@@ -979,6 +1012,7 @@ static void begin_call(grpc_server *server, call_data *calld,
r->data.recv_message = rc->data.registered.optional_payload;
r++;
}
+ calld->cq_new = rc->data.registered.registered_method->cq;
publish = publish_registered_or_batch;
break;
}
@@ -991,19 +1025,19 @@ static void begin_call(grpc_server *server, call_data *calld,
static void fail_call(grpc_server *server, requested_call *rc) {
switch (rc->type) {
case LEGACY_CALL:
- grpc_cq_end_new_rpc(server->cq, rc->tag, NULL, do_nothing, NULL, NULL,
+ grpc_cq_end_new_rpc(server->unregistered_cq, rc->tag, NULL, do_nothing, NULL, NULL,
NULL, gpr_inf_past, 0, NULL);
break;
case BATCH_CALL:
*rc->data.batch.call = NULL;
rc->data.batch.initial_metadata->count = 0;
- grpc_cq_end_op_complete(server->cq, rc->tag, NULL, do_nothing, NULL,
+ grpc_cq_end_op_complete(server->unregistered_cq, rc->tag, NULL, do_nothing, NULL,
GRPC_OP_ERROR);
break;
case REGISTERED_CALL:
*rc->data.registered.call = NULL;
rc->data.registered.initial_metadata->count = 0;
- grpc_cq_end_op_complete(server->cq, rc->tag, NULL, do_nothing, NULL,
+ grpc_cq_end_op_complete(rc->data.registered.registered_method->cq, rc->tag, NULL, do_nothing, NULL,
GRPC_OP_ERROR);
break;
}
@@ -1017,7 +1051,7 @@ static void publish_legacy(grpc_call *call, grpc_op_error status, void *tag) {
grpc_server *server = chand->server;
if (status == GRPC_OP_OK) {
- grpc_cq_end_new_rpc(server->cq, tag, call, do_nothing, NULL,
+ grpc_cq_end_new_rpc(server->unregistered_cq, tag, call, do_nothing, NULL,
grpc_mdstr_as_c_string(calld->path),
grpc_mdstr_as_c_string(calld->host), calld->deadline,
calld->legacy->initial_metadata.count,
@@ -1032,9 +1066,8 @@ static void publish_registered_or_batch(grpc_call *call, grpc_op_error status,
void *tag) {
grpc_call_element *elem =
grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
- channel_data *chand = elem->channel_data;
- grpc_server *server = chand->server;
- grpc_cq_end_op_complete(server->cq, tag, call, do_nothing, NULL, status);
+ call_data *calld = elem->call_data;
+ grpc_cq_end_op_complete(calld->cq_new, tag, call, do_nothing, NULL, status);
}
const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {
diff --git a/src/core/surface/server.h b/src/core/surface/server.h
index 50574d66a4..c8861f420d 100644
--- a/src/core/surface/server.h
+++ b/src/core/surface/server.h
@@ -48,7 +48,7 @@ grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq,
and when it shuts down, it will call destroy */
void grpc_server_add_listener(grpc_server *server, void *listener,
void (*start)(grpc_server *server, void *arg,
- grpc_pollset *pollset),
+ grpc_pollset **pollsets, size_t npollsets),
void (*destroy)(grpc_server *server, void *arg));
/* Setup a transport - creates a channel stack, binds the transport to the
diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c
index 5ba7d47efd..3b6abb7d03 100644
--- a/src/core/surface/server_chttp2.c
+++ b/src/core/surface/server_chttp2.c
@@ -59,9 +59,9 @@ static void new_transport(void *server, grpc_endpoint *tcp) {
}
/* Server callback: start listening on our ports */
-static void start(grpc_server *server, void *tcpp, grpc_pollset *pollset) {
+static void start(grpc_server *server, void *tcpp, grpc_pollset **pollsets, size_t pollset_count) {
grpc_tcp_server *tcp = tcpp;
- grpc_tcp_server_start(tcp, pollset, new_transport, server);
+ grpc_tcp_server_start(tcp, pollsets, pollset_count, new_transport, server);
}
/* Server callback: destroy the tcp listener (so we don't generate further
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index 17b0543bcd..938a549d4f 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -84,7 +84,7 @@ Server::~Server() {
bool Server::RegisterService(RpcService *service) {
for (int i = 0; i < service->GetMethodCount(); ++i) {
RpcServiceMethod *method = service->GetMethod(i);
- void *tag = grpc_server_register_method(server_, method->name(), nullptr);
+ void *tag = grpc_server_register_method(server_, method->name(), nullptr, cq_.cq());
if (!tag) {
gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
method->name());
diff --git a/test/core/iomgr/tcp_server_posix_test.c b/test/core/iomgr/tcp_server_posix_test.c
index e906f302cf..ae6994ef07 100644
--- a/test/core/iomgr/tcp_server_posix_test.c
+++ b/test/core/iomgr/tcp_server_posix_test.c
@@ -66,7 +66,7 @@ static void test_no_op(void) {
static void test_no_op_with_start(void) {
grpc_tcp_server *s = grpc_tcp_server_create();
LOG_TEST();
- grpc_tcp_server_start(s, NULL, on_connect, NULL);
+ grpc_tcp_server_start(s, NULL, 0, on_connect, NULL);
grpc_tcp_server_destroy(s);
}
@@ -93,7 +93,7 @@ static void test_no_op_with_port_and_start(void) {
GPR_ASSERT(
grpc_tcp_server_add_port(s, (struct sockaddr *)&addr, sizeof(addr)));
- grpc_tcp_server_start(s, NULL, on_connect, NULL);
+ grpc_tcp_server_start(s, NULL, 0, on_connect, NULL);
grpc_tcp_server_destroy(s);
}
@@ -120,7 +120,7 @@ static void test_connect(int n) {
GPR_ASSERT(getsockname(svrfd, (struct sockaddr *)&addr, &addr_len) == 0);
GPR_ASSERT(addr_len <= sizeof(addr));
- grpc_tcp_server_start(s, NULL, on_connect, NULL);
+ grpc_tcp_server_start(s, NULL, 0, on_connect, NULL);
for (i = 0; i < n; i++) {
deadline = gpr_time_add(gpr_now(), gpr_time_from_micros(10000000));