diff options
author | Vijay Pai <vpai@google.com> | 2015-06-17 12:42:17 -0700 |
---|---|---|
committer | Vijay Pai <vpai@google.com> | 2015-06-17 13:05:34 -0700 |
commit | 8931cdd29fa90fb27d2bad3f0aa03f3abf1ae923 (patch) | |
tree | 43adb97d8c40ada3e3ef54e6a470433addf7433b /src/core | |
parent | ced73bde96f64b40d0798cb915cc59ae1c240f26 (diff) |
Split server lock into 2 pieces: one for call list and one for global
state (including channels)
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/surface/server.c | 95 |
1 files changed, 58 insertions, 37 deletions
diff --git a/src/core/surface/server.c b/src/core/surface/server.c index c8ac559a0d..dac374b7d4 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -141,7 +141,8 @@ struct grpc_server { grpc_pollset **pollsets; size_t cq_count; - gpr_mu mu; + gpr_mu mu_global; + gpr_mu mu_call; registered_method *registered_methods; requested_call_array requested_calls; @@ -200,6 +201,8 @@ static void begin_call(grpc_server *server, call_data *calld, static void fail_call(grpc_server *server, requested_call *rc); static void shutdown_channel(channel_data *chand, int send_goaway, int send_disconnect); +/* Before calling maybe_finish_shutdown, we must hold mu_global and not + hold mu_call */ static void maybe_finish_shutdown(grpc_server *server); static int call_list_join(call_data **root, call_data *call, call_list list) { @@ -273,7 +276,8 @@ static void server_delete(grpc_server *server) { registered_method *rm; size_t i; grpc_channel_args_destroy(server->channel_args); - gpr_mu_destroy(&server->mu); + gpr_mu_destroy(&server->mu_global); + gpr_mu_destroy(&server->mu_call); gpr_free(server->channel_filters); requested_call_array_destroy(&server->requested_calls); while ((rm = server->registered_methods) != NULL) { @@ -335,11 +339,11 @@ static void finish_start_new_rpc_and_unlock(grpc_server *server, if (array->count == 0) { calld->state = PENDING; call_list_join(pending_root, calld, PENDING_START); - gpr_mu_unlock(&server->mu); + gpr_mu_unlock(&server->mu_call); } else { rc = array->calls[--array->count]; calld->state = ACTIVATED; - gpr_mu_unlock(&server->mu); + gpr_mu_unlock(&server->mu_call); begin_call(server, calld, &rc); } } @@ -352,7 +356,7 @@ static void start_new_rpc(grpc_call_element *elem) { gpr_uint32 hash; channel_registered_method *rm; - gpr_mu_lock(&server->mu); + gpr_mu_lock(&server->mu_call); if (chand->registered_methods && calld->path && calld->host) { /* TODO(ctiller): unify these two searches */ /* check for an exact match with host */ @@ -404,11 +408,16 @@ static void maybe_finish_shutdown(grpc_server *server) { if (!server->shutdown || server->shutdown_published) { return; } + + gpr_mu_lock(&server->mu_call); if (server->lists[ALL_CALLS] != NULL) { gpr_log(GPR_DEBUG, "Waiting for all calls to finish before destroying server"); + gpr_mu_unlock(&server->mu_call); return; } + gpr_mu_unlock(&server->mu_call); + if (server->root_channel_data.next != &server->root_channel_data) { gpr_log(GPR_DEBUG, "Waiting for all channels to close before destroying server"); @@ -452,6 +461,7 @@ static void server_on_recv(void *ptr, int success) { grpc_call_element *elem = ptr; call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; + int remove_res; if (success && !calld->got_initial_metadata) { size_t i; @@ -476,16 +486,16 @@ static void server_on_recv(void *ptr, int success) { case GRPC_STREAM_SEND_CLOSED: break; case GRPC_STREAM_RECV_CLOSED: - gpr_mu_lock(&chand->server->mu); + gpr_mu_lock(&chand->server->mu_call); if (calld->state == NOT_STARTED) { calld->state = ZOMBIED; grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); grpc_iomgr_add_callback(&calld->kill_zombie_closure); } - gpr_mu_unlock(&chand->server->mu); + gpr_mu_unlock(&chand->server->mu_call); break; case GRPC_STREAM_CLOSED: - gpr_mu_lock(&chand->server->mu); + gpr_mu_lock(&chand->server->mu_call); if (calld->state == NOT_STARTED) { calld->state = ZOMBIED; grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); @@ -496,10 +506,13 @@ static void server_on_recv(void *ptr, int success) { grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); grpc_iomgr_add_callback(&calld->kill_zombie_closure); } - if (call_list_remove(calld, ALL_CALLS)) { + remove_res = call_list_remove(calld, ALL_CALLS); + gpr_mu_unlock(&chand->server->mu_call); + gpr_mu_lock(&chand->server->mu_global); + if (remove_res) { decrement_call_count(chand); } - gpr_mu_unlock(&chand->server->mu); + gpr_mu_unlock(&chand->server->mu_global); break; } @@ -542,10 +555,10 @@ static void channel_op(grpc_channel_element *elem, case GRPC_TRANSPORT_CLOSED: /* if the transport is closed for a server channel, we destroy the channel */ - gpr_mu_lock(&server->mu); + gpr_mu_lock(&server->mu_global); server_ref(server); destroy_channel(chand); - gpr_mu_unlock(&server->mu); + gpr_mu_unlock(&server->mu_global); server_unref(server); break; case GRPC_TRANSPORT_GOAWAY: @@ -612,10 +625,13 @@ static void init_call_elem(grpc_call_element *elem, calld->deadline = gpr_inf_future; calld->call = grpc_call_from_top_element(elem); - gpr_mu_lock(&chand->server->mu); + gpr_mu_lock(&chand->server->mu_call); call_list_join(&chand->server->lists[ALL_CALLS], calld, ALL_CALLS); + gpr_mu_unlock(&chand->server->mu_call); + + gpr_mu_lock(&chand->server->mu_global); chand->num_calls++; - gpr_mu_unlock(&chand->server->mu); + gpr_mu_unlock(&chand->server->mu_global); server_ref(chand->server); @@ -628,14 +644,16 @@ static void destroy_call_elem(grpc_call_element *elem) { int removed[CALL_LIST_COUNT]; size_t i; - gpr_mu_lock(&chand->server->mu); + gpr_mu_lock(&chand->server->mu_call); for (i = 0; i < CALL_LIST_COUNT; i++) { removed[i] = call_list_remove(elem->call_data, i); } + gpr_mu_unlock(&chand->server->mu_call); + gpr_mu_lock(&chand->server->mu_global); if (removed[ALL_CALLS]) { decrement_call_count(chand); } - gpr_mu_unlock(&chand->server->mu); + gpr_mu_unlock(&chand->server->mu_global); if (calld->host) { grpc_mdstr_unref(calld->host); @@ -678,12 +696,12 @@ static void destroy_channel_elem(grpc_channel_element *elem) { gpr_free(chand->registered_methods); } if (chand->server) { - gpr_mu_lock(&chand->server->mu); + gpr_mu_lock(&chand->server->mu_global); chand->next->prev = chand->prev; chand->prev->next = chand->next; chand->next = chand->prev = chand; maybe_finish_shutdown(chand->server); - gpr_mu_unlock(&chand->server->mu); + gpr_mu_unlock(&chand->server->mu_global); grpc_mdstr_unref(chand->path_key); grpc_mdstr_unref(chand->authority_key); server_unref(chand->server); @@ -730,7 +748,8 @@ grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters, memset(server, 0, sizeof(grpc_server)); - gpr_mu_init(&server->mu); + gpr_mu_init(&server->mu_global); + gpr_mu_init(&server->mu_call); /* decremented by grpc_server_destroy */ gpr_ref_init(&server->internal_refcount, 1); @@ -880,11 +899,11 @@ grpc_transport_setup_result grpc_server_setup_transport( result = grpc_connected_channel_bind_transport( grpc_channel_get_channel_stack(channel), transport); - gpr_mu_lock(&s->mu); + gpr_mu_lock(&s->mu_global); chand->next = &s->root_channel_data; chand->prev = chand->next->prev; chand->next->prev = chand->prev->next = chand; - gpr_mu_unlock(&s->mu); + gpr_mu_unlock(&s->mu_global); gpr_free(filters); @@ -901,7 +920,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server, shutdown_tag *sdt; /* lock, and gather up some stuff to do */ - gpr_mu_lock(&server->mu); + gpr_mu_lock(&server->mu_global); grpc_cq_begin_op(cq, NULL); server->shutdown_tags = gpr_realloc(server->shutdown_tags, @@ -910,7 +929,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server, sdt->tag = tag; sdt->cq = cq; if (server->shutdown) { - gpr_mu_unlock(&server->mu); + gpr_mu_unlock(&server->mu_global); return; } @@ -920,6 +939,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server, } /* collect all unregistered then registered calls */ + gpr_mu_lock(&server->mu_call); requested_calls = server->requested_calls; memset(&server->requested_calls, 0, sizeof(server->requested_calls)); for (rm = server->registered_methods; rm; rm = rm->next) { @@ -938,10 +958,11 @@ void grpc_server_shutdown_and_notify(grpc_server *server, gpr_free(rm->requested.calls); memset(&rm->requested, 0, sizeof(rm->requested)); } + gpr_mu_unlock(&server->mu_call); server->shutdown = 1; maybe_finish_shutdown(server); - gpr_mu_unlock(&server->mu); + gpr_mu_unlock(&server->mu_global); /* terminate all the requested calls */ for (i = 0; i < requested_calls.count; i++) { @@ -957,10 +978,10 @@ void grpc_server_shutdown_and_notify(grpc_server *server, void grpc_server_listener_destroy_done(void *s) { grpc_server *server = s; - gpr_mu_lock(&server->mu); + gpr_mu_lock(&server->mu_global); server->listeners_destroyed++; maybe_finish_shutdown(server); - gpr_mu_unlock(&server->mu); + gpr_mu_unlock(&server->mu_global); } void grpc_server_cancel_all_calls(grpc_server *server) { @@ -971,12 +992,12 @@ void grpc_server_cancel_all_calls(grpc_server *server) { int is_first = 1; size_t i; - gpr_mu_lock(&server->mu); + gpr_mu_lock(&server->mu_call); GPR_ASSERT(server->shutdown); if (!server->lists[ALL_CALLS]) { - gpr_mu_unlock(&server->mu); + gpr_mu_unlock(&server->mu_call); return; } @@ -996,7 +1017,7 @@ void grpc_server_cancel_all_calls(grpc_server *server) { is_first = 0; } - gpr_mu_unlock(&server->mu); + gpr_mu_unlock(&server->mu_call); for (i = 0; i < call_count; i++) { grpc_call_cancel_with_status(calls[i], GRPC_STATUS_UNAVAILABLE, @@ -1010,7 +1031,7 @@ void grpc_server_cancel_all_calls(grpc_server *server) { void grpc_server_destroy(grpc_server *server) { listener *l; - gpr_mu_lock(&server->mu); + gpr_mu_lock(&server->mu_global); GPR_ASSERT(server->shutdown || !server->listeners); GPR_ASSERT(server->listeners_destroyed == num_listeners(server)); @@ -1020,7 +1041,7 @@ void grpc_server_destroy(grpc_server *server) { gpr_free(l); } - gpr_mu_unlock(&server->mu); + gpr_mu_unlock(&server->mu_global); server_unref(server); } @@ -1042,9 +1063,9 @@ static grpc_call_error queue_call_request(grpc_server *server, requested_call *rc) { call_data *calld = NULL; requested_call_array *requested_calls = NULL; - gpr_mu_lock(&server->mu); + gpr_mu_lock(&server->mu_call); if (server->shutdown) { - gpr_mu_unlock(&server->mu); + gpr_mu_unlock(&server->mu_call); fail_call(server, rc); return GRPC_CALL_OK; } @@ -1063,12 +1084,12 @@ static grpc_call_error queue_call_request(grpc_server *server, if (calld) { GPR_ASSERT(calld->state == PENDING); calld->state = ACTIVATED; - gpr_mu_unlock(&server->mu); + gpr_mu_unlock(&server->mu_call); begin_call(server, calld, rc); return GRPC_CALL_OK; } else { *requested_call_array_add(requested_calls) = *rc; - gpr_mu_unlock(&server->mu); + gpr_mu_unlock(&server->mu_call); return GRPC_CALL_OK; } } @@ -1212,8 +1233,8 @@ const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) { int grpc_server_has_open_connections(grpc_server *server) { int r; - gpr_mu_lock(&server->mu); + gpr_mu_lock(&server->mu_global); r = server->root_channel_data.next != &server->root_channel_data; - gpr_mu_unlock(&server->mu); + gpr_mu_unlock(&server->mu_global); return r; } |