aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface/server.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/surface/server.c')
-rw-r--r--src/core/lib/surface/server.c71
1 files changed, 47 insertions, 24 deletions
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 191ee75252..934ca0431a 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -44,6 +44,7 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/connected_channel.h"
+#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/support/stack_lockfree.h"
@@ -211,6 +212,11 @@ struct grpc_server {
gpr_mu mu_global; /* mutex for server and channel state */
gpr_mu mu_call; /* mutex for call-specific state */
+ /* startup synchronization: flag is protected by mu_global, signals whether
+ we are doing the listener start routine or not */
+ bool starting;
+ gpr_cv starting_cv;
+
registered_method *registered_methods;
/** one request matcher for unregistered methods */
request_matcher unregistered_request_matcher;
@@ -339,7 +345,7 @@ static void request_matcher_destroy(request_matcher *rm) {
static void kill_zombie(grpc_exec_ctx *exec_ctx, void *elem,
grpc_error *error) {
- grpc_call_destroy(grpc_call_from_top_element(elem));
+ grpc_call_unref(grpc_call_from_top_element(elem));
}
static void request_matcher_zombify_all_pending_calls(grpc_exec_ctx *exec_ctx,
@@ -388,6 +394,7 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) {
grpc_channel_args_destroy(exec_ctx, server->channel_args);
gpr_mu_destroy(&server->mu_global);
gpr_mu_destroy(&server->mu_call);
+ gpr_cv_destroy(&server->starting_cv);
while ((rm = server->registered_methods) != NULL) {
server->registered_methods = rm->next;
if (server->started) {
@@ -974,7 +981,7 @@ const grpc_channel_filter grpc_server_top_filter = {
static void register_completion_queue(grpc_server *server,
grpc_completion_queue *cq,
- bool is_non_listening, void *reserved) {
+ void *reserved) {
size_t i, n;
GPR_ASSERT(!reserved);
for (i = 0; i < server->cq_count; i++) {
@@ -983,10 +990,6 @@ static void register_completion_queue(grpc_server *server,
grpc_cq_mark_server_cq(cq);
- if (is_non_listening) {
- grpc_cq_mark_non_listening_server_cq(cq);
- }
-
GRPC_CQ_INTERNAL_REF(cq, "server");
n = server->cq_count++;
server->cqs = gpr_realloc(server->cqs,
@@ -1000,16 +1003,16 @@ void grpc_server_register_completion_queue(grpc_server *server,
GRPC_API_TRACE(
"grpc_server_register_completion_queue(server=%p, cq=%p, reserved=%p)", 3,
(server, cq, reserved));
- register_completion_queue(server, cq, false, reserved);
-}
-void grpc_server_register_non_listening_completion_queue(
- grpc_server *server, grpc_completion_queue *cq, void *reserved) {
- GRPC_API_TRACE(
- "grpc_server_register_non_listening_completion_queue(server=%p, cq=%p, "
- "reserved=%p)",
- 3, (server, cq, reserved));
- register_completion_queue(server, cq, true, reserved);
+ if (grpc_get_cq_completion_type(cq) != GRPC_CQ_NEXT) {
+ gpr_log(GPR_INFO,
+ "Completion queue which is not of type GRPC_CQ_NEXT is being "
+ "registered as a server-completion-queue");
+ /* Ideally we should log an error and abort but ruby-wrapped-language API
+ calls grpc_completion_queue_pluck() on server completion queues */
+ }
+
+ register_completion_queue(server, cq, reserved);
}
grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
@@ -1017,10 +1020,9 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
grpc_server *server = gpr_zalloc(sizeof(grpc_server));
- GPR_ASSERT(grpc_is_initialized() && "call grpc_init()");
-
gpr_mu_init(&server->mu_global);
gpr_mu_init(&server->mu_call);
+ gpr_cv_init(&server->starting_cv);
/* decremented by grpc_server_destroy */
gpr_ref_init(&server->internal_refcount, 1);
@@ -1077,8 +1079,22 @@ void *grpc_server_register_method(
return m;
}
+static void start_listeners(grpc_exec_ctx *exec_ctx, void *s,
+ grpc_error *error) {
+ grpc_server *server = s;
+ for (listener *l = server->listeners; l; l = l->next) {
+ l->start(exec_ctx, server, l->arg, server->pollsets, server->pollset_count);
+ }
+
+ gpr_mu_lock(&server->mu_global);
+ server->starting = false;
+ gpr_cv_signal(&server->starting_cv);
+ gpr_mu_unlock(&server->mu_global);
+
+ server_unref(exec_ctx, server);
+}
+
void grpc_server_start(grpc_server *server) {
- listener *l;
size_t i;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
@@ -1092,7 +1108,7 @@ void grpc_server_start(grpc_server *server) {
server->requested_calls_per_cq =
gpr_malloc(sizeof(*server->requested_calls_per_cq) * server->cq_count);
for (i = 0; i < server->cq_count; i++) {
- if (!grpc_cq_is_non_listening_server_cq(server->cqs[i])) {
+ if (grpc_cq_can_listen(server->cqs[i])) {
server->pollsets[server->pollset_count++] =
grpc_cq_pollset(server->cqs[i]);
}
@@ -1112,10 +1128,11 @@ void grpc_server_start(grpc_server *server) {
(size_t)server->max_requested_calls_per_cq, server);
}
- for (l = server->listeners; l; l = l->next) {
- l->start(&exec_ctx, server, l->arg, server->pollsets,
- server->pollset_count);
- }
+ server_ref(server);
+ server->starting = true;
+ grpc_closure_sched(&exec_ctx, grpc_closure_create(start_listeners, server,
+ grpc_executor_scheduler),
+ GRPC_ERROR_NONE);
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -1249,8 +1266,14 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
GRPC_API_TRACE("grpc_server_shutdown_and_notify(server=%p, cq=%p, tag=%p)", 3,
(server, cq, tag));
- /* lock, and gather up some stuff to do */
+ /* wait for startup to be finished: locks mu_global */
gpr_mu_lock(&server->mu_global);
+ while (server->starting) {
+ gpr_cv_wait(&server->starting_cv, &server->mu_global,
+ gpr_inf_future(GPR_CLOCK_REALTIME));
+ }
+
+ /* stay locked, and gather up some stuff to do */
grpc_cq_begin_op(cq, tag);
if (server->shutdown_published) {
grpc_cq_end_op(&exec_ctx, cq, tag, GRPC_ERROR_NONE, done_published_shutdown,