aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r--src/core/lib/surface/completion_queue.c13
-rw-r--r--src/core/lib/surface/completion_queue.h4
-rw-r--r--src/core/lib/surface/server.c53
3 files changed, 49 insertions, 21 deletions
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index 1f82c3bad2..ae78f8f616 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -70,6 +70,8 @@ struct grpc_completion_queue {
int shutdown;
int shutdown_called;
int is_server_cq;
+ /** Can the server cq accept incoming channels */
+ int is_non_listening_server_cq;
int num_pluckers;
plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
grpc_closure pollset_shutdown_done;
@@ -149,6 +151,7 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
cc->shutdown = 0;
cc->shutdown_called = 0;
cc->is_server_cq = 0;
+ cc->is_non_listening_server_cq = 0;
cc->num_pluckers = 0;
#ifndef NDEBUG
cc->outstanding_tag_count = 0;
@@ -511,6 +514,14 @@ grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) {
return POLLSET_FROM_CQ(cc);
}
+void grpc_cq_mark_non_listening_server_cq(grpc_completion_queue *cc) {
+ cc->is_non_listening_server_cq = 1;
+}
+
+bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc) {
+ return (cc->is_non_listening_server_cq == 1);
+}
+
void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; }
int grpc_cq_is_server_cq(grpc_completion_queue *cc) { return cc->is_server_cq; }
diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h
index eef82cf014..1528ca4ad8 100644
--- a/src/core/lib/surface/completion_queue.h
+++ b/src/core/lib/surface/completion_queue.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -82,6 +82,8 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc);
+void grpc_cq_mark_non_listening_server_cq(grpc_completion_queue *cc);
+bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc);
void grpc_cq_mark_server_cq(grpc_completion_queue *cc);
int grpc_cq_is_server_cq(grpc_completion_queue *cc);
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 2db95b9cdf..c9b458faf2 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -895,23 +895,45 @@ const grpc_channel_filter grpc_server_top_filter = {
"server",
};
-void grpc_server_register_completion_queue(grpc_server *server,
- grpc_completion_queue *cq,
- void *reserved) {
+static void register_completion_queue(grpc_server *server,
+ grpc_completion_queue *cq,
+ bool is_non_listening, void *reserved) {
size_t i, n;
- GRPC_API_TRACE(
- "grpc_server_register_completion_queue(server=%p, cq=%p, reserved=%p)", 3,
- (server, cq, reserved));
GPR_ASSERT(!reserved);
for (i = 0; i < server->cq_count; i++) {
if (server->cqs[i] == cq) return;
}
- GRPC_CQ_INTERNAL_REF(cq, "server");
+
grpc_cq_mark_server_cq(cq);
- n = server->cq_count++;
- server->cqs = gpr_realloc(server->cqs,
- server->cq_count * sizeof(grpc_completion_queue *));
- server->cqs[n] = cq;
+
+ /* Non-listening completion queues are not added to server->cqs */
+ if (is_non_listening) {
+ grpc_cq_mark_non_listening_server_cq(cq);
+ } else {
+ GRPC_CQ_INTERNAL_REF(cq, "server");
+ n = server->cq_count++;
+ server->cqs = gpr_realloc(
+ server->cqs, server->cq_count * sizeof(grpc_completion_queue *));
+ server->cqs[n] = cq;
+ }
+}
+
+void grpc_server_register_completion_queue(grpc_server *server,
+ grpc_completion_queue *cq,
+ void *reserved) {
+ GRPC_API_TRACE(
+ "grpc_server_register_completion_queue(server=%p, cq=%p, reserved=%p)", 3,
+ (server, cq, reserved));
+ register_completion_queue(server, cq, false, reserved);
+}
+
+void grpc_server_register_non_listening_completion_queue(
+ grpc_server *server, grpc_completion_queue *cq, void *reserved) {
+ GRPC_API_TRACE(
+ "grpc_server_register_non_listening_completion_queue(server=%p, cq=%p, "
+ "reserved=%p)",
+ 3, (server, cq, reserved));
+ register_completion_queue(server, cq, true, reserved);
}
grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
@@ -1018,7 +1040,6 @@ void grpc_server_start(grpc_server *server) {
void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
grpc_transport *transport,
const grpc_channel_args *args) {
- size_t i;
size_t num_registered_methods;
size_t alloc;
registered_method *rm;
@@ -1033,12 +1054,6 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
uint32_t max_probes = 0;
grpc_transport_op op;
- for (i = 0; i < s->cq_count; i++) {
- memset(&op, 0, sizeof(op));
- op.bind_pollset = grpc_cq_pollset(s->cqs[i]);
- grpc_transport_perform_op(exec_ctx, transport, &op);
- }
-
channel =
grpc_channel_create(exec_ctx, NULL, args, GRPC_SERVER_CHANNEL, transport);
chand = (channel_data *)grpc_channel_stack_element(