aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-07-08 15:43:18 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-07-08 15:43:18 -0700
commitea0c361e2376aa215b6f9f5b24215f0476b5f980 (patch)
treede98b90037a091491f35b1ec3f339172dc4ca35e /src/core
parentb4c55ea07294889a1f9ad5b3a334ae787b181e4c (diff)
parent3f03f9164a2084051bd741c4970f11bbde5feb9b (diff)
Merge branch 'screw-you-guys-im-taking-my-own-lock' into simpler-cq
Diffstat (limited to 'src/core')
-rw-r--r--src/core/surface/server.c69
1 files changed, 43 insertions, 26 deletions
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 9a29060f2b..1a99acd536 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -179,7 +179,11 @@ typedef enum {
struct call_data {
grpc_call *call;
+ /** protects state */
+ gpr_mu mu_state;
+ /** the current state of a call - see call_state */
call_state state;
+
grpc_mdstr *path;
grpc_mdstr *host;
gpr_timespec deadline;
@@ -382,19 +386,24 @@ static void destroy_channel(channel_data *chand) {
grpc_iomgr_add_callback(&chand->finish_destroy_channel_closure);
}
-static void finish_start_new_rpc_and_unlock(grpc_server *server,
- grpc_call_element *elem,
- call_data **pending_root,
- requested_call **requests) {
- requested_call *rc = *requests;
+static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem,
+ call_data **pending_root,
+ requested_call **requests) {
+ requested_call *rc;
call_data *calld = elem->call_data;
+ gpr_mu_lock(&server->mu_call);
+ rc = *requests;
if (rc == NULL) {
+ gpr_mu_lock(&calld->mu_state);
calld->state = PENDING;
+ gpr_mu_unlock(&calld->mu_state);
call_list_join(pending_root, calld, PENDING_START);
gpr_mu_unlock(&server->mu_call);
} else {
*requests = rc->next;
+ gpr_mu_lock(&calld->mu_state);
calld->state = ACTIVATED;
+ gpr_mu_unlock(&calld->mu_state);
gpr_mu_unlock(&server->mu_call);
begin_call(server, calld, rc);
}
@@ -408,7 +417,6 @@ static void start_new_rpc(grpc_call_element *elem) {
gpr_uint32 hash;
channel_registered_method *rm;
- gpr_mu_lock(&server->mu_call);
if (chand->registered_methods && calld->path && calld->host) {
/* TODO(ctiller): unify these two searches */
/* check for an exact match with host */
@@ -419,9 +427,8 @@ static void start_new_rpc(grpc_call_element *elem) {
if (!rm) break;
if (rm->host != calld->host) continue;
if (rm->method != calld->path) continue;
- finish_start_new_rpc_and_unlock(server, elem,
- &rm->server_registered_method->pending,
- &rm->server_registered_method->requests);
+ finish_start_new_rpc(server, elem, &rm->server_registered_method->pending,
+ &rm->server_registered_method->requests);
return;
}
/* check for a wildcard method definition (no host set) */
@@ -432,14 +439,13 @@ static void start_new_rpc(grpc_call_element *elem) {
if (!rm) break;
if (rm->host != NULL) continue;
if (rm->method != calld->path) continue;
- finish_start_new_rpc_and_unlock(server, elem,
- &rm->server_registered_method->pending,
- &rm->server_registered_method->requests);
+ finish_start_new_rpc(server, elem, &rm->server_registered_method->pending,
+ &rm->server_registered_method->requests);
return;
}
}
- finish_start_new_rpc_and_unlock(server, elem, &server->lists[PENDING_START],
- &server->requests);
+ finish_start_new_rpc(server, elem, &server->lists[PENDING_START],
+ &server->requests);
}
static void kill_zombie(void *elem, int success) {
@@ -530,27 +536,34 @@ static void server_on_recv(void *ptr, int success) {
case GRPC_STREAM_SEND_CLOSED:
break;
case GRPC_STREAM_RECV_CLOSED:
- gpr_mu_lock(&chand->server->mu_call);
+ gpr_mu_lock(&calld->mu_state);
if (calld->state == NOT_STARTED) {
calld->state = ZOMBIED;
+ gpr_mu_unlock(&calld->mu_state);
grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
grpc_iomgr_add_callback(&calld->kill_zombie_closure);
+ } else {
+ gpr_mu_unlock(&calld->mu_state);
}
- gpr_mu_unlock(&chand->server->mu_call);
break;
case GRPC_STREAM_CLOSED:
- gpr_mu_lock(&chand->server->mu_call);
+ gpr_mu_lock(&calld->mu_state);
if (calld->state == NOT_STARTED) {
calld->state = ZOMBIED;
+ gpr_mu_unlock(&calld->mu_state);
grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
grpc_iomgr_add_callback(&calld->kill_zombie_closure);
} else if (calld->state == PENDING) {
- call_list_remove(calld, PENDING_START);
calld->state = ZOMBIED;
+ gpr_mu_unlock(&calld->mu_state);
+ gpr_mu_lock(&chand->server->mu_call);
+ call_list_remove(calld, PENDING_START);
+ gpr_mu_unlock(&chand->server->mu_call);
grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
grpc_iomgr_add_callback(&calld->kill_zombie_closure);
+ } else {
+ gpr_mu_unlock(&calld->mu_state);
}
- gpr_mu_unlock(&chand->server->mu_call);
break;
}
@@ -612,6 +625,7 @@ static void init_call_elem(grpc_call_element *elem,
memset(calld, 0, sizeof(call_data));
calld->deadline = gpr_inf_future;
calld->call = grpc_call_from_top_element(elem);
+ gpr_mu_init(&calld->mu_state);
grpc_iomgr_closure_init(&calld->server_on_recv, server_on_recv, elem);
@@ -623,13 +637,12 @@ static void init_call_elem(grpc_call_element *elem,
static void destroy_call_elem(grpc_call_element *elem) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
- size_t i;
- gpr_mu_lock(&chand->server->mu_call);
- for (i = 0; i < CALL_LIST_COUNT; i++) {
- call_list_remove(elem->call_data, i);
+ if (calld->state == PENDING) {
+ gpr_mu_lock(&chand->server->mu_call);
+ call_list_remove(elem->call_data, PENDING_START);
+ gpr_mu_unlock(&chand->server->mu_call);
}
- gpr_mu_unlock(&chand->server->mu_call);
if (calld->host) {
grpc_mdstr_unref(calld->host);
@@ -638,6 +651,8 @@ static void destroy_call_elem(grpc_call_element *elem) {
grpc_mdstr_unref(calld->path);
}
+ gpr_mu_destroy(&calld->mu_state);
+
server_unref(chand->server);
}
@@ -1024,10 +1039,12 @@ static grpc_call_error queue_call_request(grpc_server *server,
requests = &rc->data.registered.registered_method->requests;
break;
}
- if (calld) {
+ if (calld != NULL) {
+ gpr_mu_unlock(&server->mu_call);
+ gpr_mu_lock(&calld->mu_state);
GPR_ASSERT(calld->state == PENDING);
calld->state = ACTIVATED;
- gpr_mu_unlock(&server->mu_call);
+ gpr_mu_unlock(&calld->mu_state);
begin_call(server, calld, rc);
return GRPC_CALL_OK;
} else {