diff options
Diffstat (limited to 'src/core/lib/surface/server.c')
-rw-r--r-- | src/core/lib/surface/server.c | 71 |
1 files changed, 47 insertions, 24 deletions
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 191ee75252..934ca0431a 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -44,6 +44,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" +#include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/support/stack_lockfree.h" @@ -211,6 +212,11 @@ struct grpc_server { gpr_mu mu_global; /* mutex for server and channel state */ gpr_mu mu_call; /* mutex for call-specific state */ + /* startup synchronization: flag is protected by mu_global, signals whether + we are doing the listener start routine or not */ + bool starting; + gpr_cv starting_cv; + registered_method *registered_methods; /** one request matcher for unregistered methods */ request_matcher unregistered_request_matcher; @@ -339,7 +345,7 @@ static void request_matcher_destroy(request_matcher *rm) { static void kill_zombie(grpc_exec_ctx *exec_ctx, void *elem, grpc_error *error) { - grpc_call_destroy(grpc_call_from_top_element(elem)); + grpc_call_unref(grpc_call_from_top_element(elem)); } static void request_matcher_zombify_all_pending_calls(grpc_exec_ctx *exec_ctx, @@ -388,6 +394,7 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) { grpc_channel_args_destroy(exec_ctx, server->channel_args); gpr_mu_destroy(&server->mu_global); gpr_mu_destroy(&server->mu_call); + gpr_cv_destroy(&server->starting_cv); while ((rm = server->registered_methods) != NULL) { server->registered_methods = rm->next; if (server->started) { @@ -974,7 +981,7 @@ const grpc_channel_filter grpc_server_top_filter = { static void register_completion_queue(grpc_server *server, grpc_completion_queue *cq, - bool is_non_listening, void *reserved) { + void *reserved) { size_t i, n; GPR_ASSERT(!reserved); for (i = 0; i < server->cq_count; i++) { @@ -983,10 +990,6 @@ static void register_completion_queue(grpc_server *server, grpc_cq_mark_server_cq(cq); - if (is_non_listening) { - grpc_cq_mark_non_listening_server_cq(cq); - } - GRPC_CQ_INTERNAL_REF(cq, "server"); n = server->cq_count++; server->cqs = gpr_realloc(server->cqs, @@ -1000,16 +1003,16 @@ void grpc_server_register_completion_queue(grpc_server *server, GRPC_API_TRACE( "grpc_server_register_completion_queue(server=%p, cq=%p, reserved=%p)", 3, (server, cq, reserved)); - register_completion_queue(server, cq, false, reserved); -} -void grpc_server_register_non_listening_completion_queue( - grpc_server *server, grpc_completion_queue *cq, void *reserved) { - GRPC_API_TRACE( - "grpc_server_register_non_listening_completion_queue(server=%p, cq=%p, " - "reserved=%p)", - 3, (server, cq, reserved)); - register_completion_queue(server, cq, true, reserved); + if (grpc_get_cq_completion_type(cq) != GRPC_CQ_NEXT) { + gpr_log(GPR_INFO, + "Completion queue which is not of type GRPC_CQ_NEXT is being " + "registered as a server-completion-queue"); + /* Ideally we should log an error and abort but ruby-wrapped-language API + calls grpc_completion_queue_pluck() on server completion queues */ + } + + register_completion_queue(server, cq, reserved); } grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) { @@ -1017,10 +1020,9 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) { grpc_server *server = gpr_zalloc(sizeof(grpc_server)); - GPR_ASSERT(grpc_is_initialized() && "call grpc_init()"); - gpr_mu_init(&server->mu_global); gpr_mu_init(&server->mu_call); + gpr_cv_init(&server->starting_cv); /* decremented by grpc_server_destroy */ gpr_ref_init(&server->internal_refcount, 1); @@ -1077,8 +1079,22 @@ void *grpc_server_register_method( return m; } +static void start_listeners(grpc_exec_ctx *exec_ctx, void *s, + grpc_error *error) { + grpc_server *server = s; + for (listener *l = server->listeners; l; l = l->next) { + l->start(exec_ctx, server, l->arg, server->pollsets, server->pollset_count); + } + + gpr_mu_lock(&server->mu_global); + server->starting = false; + gpr_cv_signal(&server->starting_cv); + gpr_mu_unlock(&server->mu_global); + + server_unref(exec_ctx, server); +} + void grpc_server_start(grpc_server *server) { - listener *l; size_t i; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; @@ -1092,7 +1108,7 @@ void grpc_server_start(grpc_server *server) { server->requested_calls_per_cq = gpr_malloc(sizeof(*server->requested_calls_per_cq) * server->cq_count); for (i = 0; i < server->cq_count; i++) { - if (!grpc_cq_is_non_listening_server_cq(server->cqs[i])) { + if (grpc_cq_can_listen(server->cqs[i])) { server->pollsets[server->pollset_count++] = grpc_cq_pollset(server->cqs[i]); } @@ -1112,10 +1128,11 @@ void grpc_server_start(grpc_server *server) { (size_t)server->max_requested_calls_per_cq, server); } - for (l = server->listeners; l; l = l->next) { - l->start(&exec_ctx, server, l->arg, server->pollsets, - server->pollset_count); - } + server_ref(server); + server->starting = true; + grpc_closure_sched(&exec_ctx, grpc_closure_create(start_listeners, server, + grpc_executor_scheduler), + GRPC_ERROR_NONE); grpc_exec_ctx_finish(&exec_ctx); } @@ -1249,8 +1266,14 @@ void grpc_server_shutdown_and_notify(grpc_server *server, GRPC_API_TRACE("grpc_server_shutdown_and_notify(server=%p, cq=%p, tag=%p)", 3, (server, cq, tag)); - /* lock, and gather up some stuff to do */ + /* wait for startup to be finished: locks mu_global */ gpr_mu_lock(&server->mu_global); + while (server->starting) { + gpr_cv_wait(&server->starting_cv, &server->mu_global, + gpr_inf_future(GPR_CLOCK_REALTIME)); + } + + /* stay locked, and gather up some stuff to do */ grpc_cq_begin_op(cq, tag); if (server->shutdown_published) { grpc_cq_end_op(&exec_ctx, cq, tag, GRPC_ERROR_NONE, done_published_shutdown, |