aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface/server.c
diff options
context:
space:
mode:
authorGravatar David Garcia Quintas <dgq@google.com>2015-07-15 14:15:58 -0700
committerGravatar David Garcia Quintas <dgq@google.com>2015-07-15 14:15:58 -0700
commit211e65be3cb1f4b6f7184c1945d8284905f79968 (patch)
treee6c7ee9e1612a6b12fa99446a72dda5d5d64769f /src/core/surface/server.c
parentffe3d2b1908933a149932502fd0607239ccef88a (diff)
parent3e5d61670e23b040ed47b2df1e4c87ee2cfec4aa (diff)
Merge branch 'master' of github.com:grpc/grpc into str_join_with_sep
Diffstat (limited to 'src/core/surface/server.c')
-rw-r--r--src/core/surface/server.c281
1 files changed, 153 insertions, 128 deletions
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 341ca2942c..4dc51bf031 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -72,12 +72,14 @@ typedef struct {
typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type;
-typedef struct {
+typedef struct requested_call {
requested_call_type type;
+ struct requested_call *next;
void *tag;
grpc_completion_queue *cq_bound_to_call;
grpc_completion_queue *cq_for_notification;
grpc_call **call;
+ grpc_cq_completion completion;
union {
struct {
grpc_call_details *details;
@@ -92,17 +94,11 @@ typedef struct {
} data;
} requested_call;
-typedef struct {
- requested_call *calls;
- size_t count;
- size_t capacity;
-} requested_call_array;
-
struct registered_method {
char *method;
char *host;
call_data *pending;
- requested_call_array requested;
+ requested_call *requests;
registered_method *next;
};
@@ -131,6 +127,7 @@ struct channel_data {
typedef struct shutdown_tag {
void *tag;
grpc_completion_queue *cq;
+ grpc_cq_completion completion;
} shutdown_tag;
struct grpc_server {
@@ -153,7 +150,7 @@ struct grpc_server {
gpr_mu mu_call; /* mutex for call-specific state */
registered_method *registered_methods;
- requested_call_array requested_calls;
+ requested_call *requests;
gpr_uint8 shutdown;
gpr_uint8 shutdown_published;
@@ -166,6 +163,9 @@ struct grpc_server {
listener *listeners;
int listeners_destroyed;
gpr_refcount internal_refcount;
+
+ /** when did we print the last shutdown progress message */
+ gpr_timespec last_shutdown_message_time;
};
typedef enum {
@@ -182,7 +182,11 @@ typedef enum {
struct call_data {
grpc_call *call;
+ /** protects state */
+ gpr_mu mu_state;
+ /** the current state of a call - see call_state */
call_state state;
+
grpc_mdstr *path;
grpc_mdstr *host;
gpr_timespec deadline;
@@ -266,7 +270,8 @@ static void send_shutdown(grpc_channel *channel, int send_goaway,
}
static void channel_broadcaster_shutdown(channel_broadcaster *cb,
- int send_goaway, int force_disconnect) {
+ int send_goaway,
+ int force_disconnect) {
size_t i;
for (i = 0; i < cb->num_channels; i++) {
@@ -325,22 +330,6 @@ static int call_list_remove(call_data *call, call_list list) {
return 1;
}
-static void requested_call_array_destroy(requested_call_array *array) {
- gpr_free(array->calls);
-}
-
-static requested_call *requested_call_array_add(requested_call_array *array) {
- requested_call *rc;
- if (array->count == array->capacity) {
- array->capacity = GPR_MAX(array->capacity + 8, array->capacity * 2);
- array->calls =
- gpr_realloc(array->calls, sizeof(requested_call) * array->capacity);
- }
- rc = &array->calls[array->count++];
- memset(rc, 0, sizeof(*rc));
- return rc;
-}
-
static void server_ref(grpc_server *server) {
gpr_ref(&server->internal_refcount);
}
@@ -352,12 +341,10 @@ static void server_delete(grpc_server *server) {
gpr_mu_destroy(&server->mu_global);
gpr_mu_destroy(&server->mu_call);
gpr_free(server->channel_filters);
- requested_call_array_destroy(&server->requested_calls);
while ((rm = server->registered_methods) != NULL) {
server->registered_methods = rm->next;
gpr_free(rm->method);
gpr_free(rm->host);
- requested_call_array_destroy(&rm->requested);
gpr_free(rm);
}
for (i = 0; i < server->cq_count; i++) {
@@ -403,21 +390,26 @@ static void destroy_channel(channel_data *chand) {
grpc_iomgr_add_callback(&chand->finish_destroy_channel_closure);
}
-static void finish_start_new_rpc_and_unlock(grpc_server *server,
- grpc_call_element *elem,
- call_data **pending_root,
- requested_call_array *array) {
- requested_call rc;
+static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem,
+ call_data **pending_root,
+ requested_call **requests) {
+ requested_call *rc;
call_data *calld = elem->call_data;
- if (array->count == 0) {
+ gpr_mu_lock(&server->mu_call);
+ rc = *requests;
+ if (rc == NULL) {
+ gpr_mu_lock(&calld->mu_state);
calld->state = PENDING;
+ gpr_mu_unlock(&calld->mu_state);
call_list_join(pending_root, calld, PENDING_START);
gpr_mu_unlock(&server->mu_call);
} else {
- rc = array->calls[--array->count];
+ *requests = rc->next;
+ gpr_mu_lock(&calld->mu_state);
calld->state = ACTIVATED;
+ gpr_mu_unlock(&calld->mu_state);
gpr_mu_unlock(&server->mu_call);
- begin_call(server, calld, &rc);
+ begin_call(server, calld, rc);
}
}
@@ -429,20 +421,18 @@ static void start_new_rpc(grpc_call_element *elem) {
gpr_uint32 hash;
channel_registered_method *rm;
- gpr_mu_lock(&server->mu_call);
if (chand->registered_methods && calld->path && calld->host) {
/* TODO(ctiller): unify these two searches */
/* check for an exact match with host */
hash = GRPC_MDSTR_KV_HASH(calld->host->hash, calld->path->hash);
- for (i = 0; i < chand->registered_method_max_probes; i++) {
+ for (i = 0; i <= chand->registered_method_max_probes; i++) {
rm = &chand->registered_methods[(hash + i) %
chand->registered_method_slots];
if (!rm) break;
if (rm->host != calld->host) continue;
if (rm->method != calld->path) continue;
- finish_start_new_rpc_and_unlock(server, elem,
- &rm->server_registered_method->pending,
- &rm->server_registered_method->requested);
+ finish_start_new_rpc(server, elem, &rm->server_registered_method->pending,
+ &rm->server_registered_method->requests);
return;
}
/* check for a wildcard method definition (no host set) */
@@ -453,14 +443,13 @@ static void start_new_rpc(grpc_call_element *elem) {
if (!rm) break;
if (rm->host != NULL) continue;
if (rm->method != calld->path) continue;
- finish_start_new_rpc_and_unlock(server, elem,
- &rm->server_registered_method->pending,
- &rm->server_registered_method->requested);
+ finish_start_new_rpc(server, elem, &rm->server_registered_method->pending,
+ &rm->server_registered_method->requests);
return;
}
}
- finish_start_new_rpc_and_unlock(server, elem, &server->lists[PENDING_START],
- &server->requested_calls);
+ finish_start_new_rpc(server, elem, &server->lists[PENDING_START],
+ &server->requests);
}
static void kill_zombie(void *elem, int success) {
@@ -476,26 +465,47 @@ static int num_listeners(grpc_server *server) {
return n;
}
+static void done_shutdown_event(void *server, grpc_cq_completion *completion) {
+ server_unref(server);
+}
+
+static int num_channels(grpc_server *server) {
+ channel_data *chand;
+ int n = 0;
+ for (chand = server->root_channel_data.next;
+ chand != &server->root_channel_data; chand = chand->next) {
+ n++;
+ }
+ return n;
+}
+
static void maybe_finish_shutdown(grpc_server *server) {
size_t i;
if (!server->shutdown || server->shutdown_published) {
return;
}
- if (server->root_channel_data.next != &server->root_channel_data) {
- gpr_log(GPR_DEBUG,
- "Waiting for all channels to close before destroying server");
- return;
- }
- if (server->listeners_destroyed < num_listeners(server)) {
- gpr_log(GPR_DEBUG, "Waiting for all listeners to be destroyed (@ %d/%d)",
- server->listeners_destroyed, num_listeners(server));
+ if (server->root_channel_data.next != &server->root_channel_data ||
+ server->listeners_destroyed < num_listeners(server)) {
+ if (gpr_time_cmp(
+ gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), server->last_shutdown_message_time),
+ gpr_time_from_seconds(1)) >= 0) {
+ server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
+ gpr_log(GPR_DEBUG,
+ "Waiting for %d channels and %d/%d listeners to be destroyed"
+ " before shutting down server",
+ num_channels(server),
+ num_listeners(server) - server->listeners_destroyed,
+ num_listeners(server));
+ }
return;
}
server->shutdown_published = 1;
for (i = 0; i < server->num_shutdown_tags; i++) {
- grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag,
- NULL, 1);
+ server_ref(server);
+ grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag, 1,
+ done_shutdown_event, server,
+ &server->shutdown_tags[i].completion);
}
}
@@ -504,10 +514,10 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
if (md->key == chand->path_key) {
- calld->path = grpc_mdstr_ref(md->value);
+ calld->path = GRPC_MDSTR_REF(md->value);
return NULL;
} else if (md->key == chand->authority_key) {
- calld->host = grpc_mdstr_ref(md->value);
+ calld->host = GRPC_MDSTR_REF(md->value);
return NULL;
}
return md;
@@ -541,27 +551,34 @@ static void server_on_recv(void *ptr, int success) {
case GRPC_STREAM_SEND_CLOSED:
break;
case GRPC_STREAM_RECV_CLOSED:
- gpr_mu_lock(&chand->server->mu_call);
+ gpr_mu_lock(&calld->mu_state);
if (calld->state == NOT_STARTED) {
calld->state = ZOMBIED;
+ gpr_mu_unlock(&calld->mu_state);
grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
grpc_iomgr_add_callback(&calld->kill_zombie_closure);
+ } else {
+ gpr_mu_unlock(&calld->mu_state);
}
- gpr_mu_unlock(&chand->server->mu_call);
break;
case GRPC_STREAM_CLOSED:
- gpr_mu_lock(&chand->server->mu_call);
+ gpr_mu_lock(&calld->mu_state);
if (calld->state == NOT_STARTED) {
calld->state = ZOMBIED;
+ gpr_mu_unlock(&calld->mu_state);
grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
grpc_iomgr_add_callback(&calld->kill_zombie_closure);
} else if (calld->state == PENDING) {
- call_list_remove(calld, PENDING_START);
calld->state = ZOMBIED;
+ gpr_mu_unlock(&calld->mu_state);
+ gpr_mu_lock(&chand->server->mu_call);
+ call_list_remove(calld, PENDING_START);
+ gpr_mu_unlock(&chand->server->mu_call);
grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
grpc_iomgr_add_callback(&calld->kill_zombie_closure);
+ } else {
+ gpr_mu_unlock(&calld->mu_state);
}
- gpr_mu_unlock(&chand->server->mu_call);
break;
}
@@ -623,6 +640,7 @@ static void init_call_elem(grpc_call_element *elem,
memset(calld, 0, sizeof(call_data));
calld->deadline = gpr_inf_future;
calld->call = grpc_call_from_top_element(elem);
+ gpr_mu_init(&calld->mu_state);
grpc_iomgr_closure_init(&calld->server_on_recv, server_on_recv, elem);
@@ -634,21 +652,22 @@ static void init_call_elem(grpc_call_element *elem,
static void destroy_call_elem(grpc_call_element *elem) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
- size_t i;
- gpr_mu_lock(&chand->server->mu_call);
- for (i = 0; i < CALL_LIST_COUNT; i++) {
- call_list_remove(elem->call_data, i);
+ if (calld->state == PENDING) {
+ gpr_mu_lock(&chand->server->mu_call);
+ call_list_remove(elem->call_data, PENDING_START);
+ gpr_mu_unlock(&chand->server->mu_call);
}
- gpr_mu_unlock(&chand->server->mu_call);
if (calld->host) {
- grpc_mdstr_unref(calld->host);
+ GRPC_MDSTR_UNREF(calld->host);
}
if (calld->path) {
- grpc_mdstr_unref(calld->path);
+ GRPC_MDSTR_UNREF(calld->path);
}
+ gpr_mu_destroy(&calld->mu_state);
+
server_unref(chand->server);
}
@@ -676,10 +695,10 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
if (chand->registered_methods) {
for (i = 0; i < chand->registered_method_slots; i++) {
if (chand->registered_methods[i].method) {
- grpc_mdstr_unref(chand->registered_methods[i].method);
+ GRPC_MDSTR_UNREF(chand->registered_methods[i].method);
}
if (chand->registered_methods[i].host) {
- grpc_mdstr_unref(chand->registered_methods[i].host);
+ GRPC_MDSTR_UNREF(chand->registered_methods[i].host);
}
}
gpr_free(chand->registered_methods);
@@ -691,8 +710,8 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
chand->next = chand->prev = chand;
maybe_finish_shutdown(chand->server);
gpr_mu_unlock(&chand->server->mu_global);
- grpc_mdstr_unref(chand->path_key);
- grpc_mdstr_unref(chand->authority_key);
+ GRPC_MDSTR_UNREF(chand->path_key);
+ GRPC_MDSTR_UNREF(chand->authority_key);
server_unref(chand->server);
}
}
@@ -910,15 +929,14 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport,
void grpc_server_shutdown_and_notify(grpc_server *server,
grpc_completion_queue *cq, void *tag) {
listener *l;
- requested_call_array requested_calls;
- size_t i;
+ requested_call *requests = NULL;
registered_method *rm;
shutdown_tag *sdt;
channel_broadcaster broadcaster;
/* lock, and gather up some stuff to do */
gpr_mu_lock(&server->mu_global);
- grpc_cq_begin_op(cq, NULL);
+ grpc_cq_begin_op(cq);
server->shutdown_tags =
gpr_realloc(server->shutdown_tags,
sizeof(shutdown_tag) * (server->num_shutdown_tags + 1));
@@ -930,27 +948,21 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
return;
}
+ server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
+
channel_broadcaster_init(server, &broadcaster);
/* collect all unregistered then registered calls */
gpr_mu_lock(&server->mu_call);
- requested_calls = server->requested_calls;
- memset(&server->requested_calls, 0, sizeof(server->requested_calls));
+ requests = server->requests;
+ server->requests = NULL;
for (rm = server->registered_methods; rm; rm = rm->next) {
- if (requested_calls.count + rm->requested.count >
- requested_calls.capacity) {
- requested_calls.capacity =
- GPR_MAX(requested_calls.count + rm->requested.count,
- 2 * requested_calls.capacity);
- requested_calls.calls =
- gpr_realloc(requested_calls.calls, sizeof(*requested_calls.calls) *
- requested_calls.capacity);
+ while (rm->requests != NULL) {
+ requested_call *c = rm->requests;
+ rm->requests = c->next;
+ c->next = requests;
+ requests = c;
}
- memcpy(requested_calls.calls + requested_calls.count, rm->requested.calls,
- sizeof(*requested_calls.calls) * rm->requested.count);
- requested_calls.count += rm->requested.count;
- gpr_free(rm->requested.calls);
- memset(&rm->requested, 0, sizeof(rm->requested));
}
gpr_mu_unlock(&server->mu_call);
@@ -959,10 +971,11 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
gpr_mu_unlock(&server->mu_global);
/* terminate all the requested calls */
- for (i = 0; i < requested_calls.count; i++) {
- fail_call(server, &requested_calls.calls[i]);
+ while (requests != NULL) {
+ requested_call *next = requests->next;
+ fail_call(server, requests);
+ requests = next;
}
- gpr_free(requested_calls.calls);
/* Shutdown listeners */
for (l = server->listeners; l; l = l->next) {
@@ -1024,7 +1037,7 @@ void grpc_server_add_listener(grpc_server *server, void *arg,
static grpc_call_error queue_call_request(grpc_server *server,
requested_call *rc) {
call_data *calld = NULL;
- requested_call_array *requested_calls = NULL;
+ requested_call **requests = NULL;
gpr_mu_lock(&server->mu_call);
if (server->shutdown) {
gpr_mu_unlock(&server->mu_call);
@@ -1035,22 +1048,25 @@ static grpc_call_error queue_call_request(grpc_server *server,
case BATCH_CALL:
calld =
call_list_remove_head(&server->lists[PENDING_START], PENDING_START);
- requested_calls = &server->requested_calls;
+ requests = &server->requests;
break;
case REGISTERED_CALL:
calld = call_list_remove_head(
&rc->data.registered.registered_method->pending, PENDING_START);
- requested_calls = &rc->data.registered.registered_method->requested;
+ requests = &rc->data.registered.registered_method->requests;
break;
}
- if (calld) {
+ if (calld != NULL) {
+ gpr_mu_unlock(&server->mu_call);
+ gpr_mu_lock(&calld->mu_state);
GPR_ASSERT(calld->state == PENDING);
calld->state = ACTIVATED;
- gpr_mu_unlock(&server->mu_call);
+ gpr_mu_unlock(&calld->mu_state);
begin_call(server, calld, rc);
return GRPC_CALL_OK;
} else {
- *requested_call_array_add(requested_calls) = *rc;
+ rc->next = *requests;
+ *requests = rc;
gpr_mu_unlock(&server->mu_call);
return GRPC_CALL_OK;
}
@@ -1061,22 +1077,23 @@ grpc_call_error grpc_server_request_call(
grpc_metadata_array *initial_metadata,
grpc_completion_queue *cq_bound_to_call,
grpc_completion_queue *cq_for_notification, void *tag) {
- requested_call rc;
+ requested_call *rc = gpr_malloc(sizeof(*rc));
GRPC_SERVER_LOG_REQUEST_CALL(GPR_INFO, server, call, details,
initial_metadata, cq_bound_to_call,
cq_for_notification, tag);
if (!grpc_cq_is_server_cq(cq_for_notification)) {
+ gpr_free(rc);
return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
}
- grpc_cq_begin_op(cq_for_notification, NULL);
- rc.type = BATCH_CALL;
- rc.tag = tag;
- rc.cq_bound_to_call = cq_bound_to_call;
- rc.cq_for_notification = cq_for_notification;
- rc.call = call;
- rc.data.batch.details = details;
- rc.data.batch.initial_metadata = initial_metadata;
- return queue_call_request(server, &rc);
+ grpc_cq_begin_op(cq_for_notification);
+ rc->type = BATCH_CALL;
+ rc->tag = tag;
+ rc->cq_bound_to_call = cq_bound_to_call;
+ rc->cq_for_notification = cq_for_notification;
+ rc->call = call;
+ rc->data.batch.details = details;
+ rc->data.batch.initial_metadata = initial_metadata;
+ return queue_call_request(server, rc);
}
grpc_call_error grpc_server_request_registered_call(
@@ -1084,22 +1101,23 @@ grpc_call_error grpc_server_request_registered_call(
grpc_metadata_array *initial_metadata, grpc_byte_buffer **optional_payload,
grpc_completion_queue *cq_bound_to_call,
grpc_completion_queue *cq_for_notification, void *tag) {
- requested_call rc;
+ requested_call *rc = gpr_malloc(sizeof(*rc));
registered_method *registered_method = rm;
if (!grpc_cq_is_server_cq(cq_for_notification)) {
+ gpr_free(rc);
return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
}
- grpc_cq_begin_op(cq_for_notification, NULL);
- rc.type = REGISTERED_CALL;
- rc.tag = tag;
- rc.cq_bound_to_call = cq_bound_to_call;
- rc.cq_for_notification = cq_for_notification;
- rc.call = call;
- rc.data.registered.registered_method = registered_method;
- rc.data.registered.deadline = deadline;
- rc.data.registered.initial_metadata = initial_metadata;
- rc.data.registered.optional_payload = optional_payload;
- return queue_call_request(server, &rc);
+ grpc_cq_begin_op(cq_for_notification);
+ rc->type = REGISTERED_CALL;
+ rc->tag = tag;
+ rc->cq_bound_to_call = cq_bound_to_call;
+ rc->cq_for_notification = cq_for_notification;
+ rc->call = call;
+ rc->data.registered.registered_method = registered_method;
+ rc->data.registered.deadline = deadline;
+ rc->data.registered.initial_metadata = initial_metadata;
+ rc->data.registered.optional_payload = optional_payload;
+ return queue_call_request(server, rc);
}
static void publish_registered_or_batch(grpc_call *call, int success,
@@ -1166,8 +1184,11 @@ static void begin_call(grpc_server *server, call_data *calld,
}
GRPC_CALL_INTERNAL_REF(calld->call, "server");
- grpc_call_start_ioreq_and_call_back(calld->call, req, r - req, publish,
- rc->tag);
+ grpc_call_start_ioreq_and_call_back(calld->call, req, r - req, publish, rc);
+}
+
+static void done_request_event(void *req, grpc_cq_completion *c) {
+ gpr_free(req);
}
static void fail_call(grpc_server *server, requested_call *rc) {
@@ -1180,15 +1201,19 @@ static void fail_call(grpc_server *server, requested_call *rc) {
rc->data.registered.initial_metadata->count = 0;
break;
}
- grpc_cq_end_op(rc->cq_for_notification, rc->tag, NULL, 0);
+ grpc_cq_end_op(rc->cq_for_notification, rc->tag, 0, done_request_event, rc,
+ &rc->completion);
}
static void publish_registered_or_batch(grpc_call *call, int success,
- void *tag) {
+ void *prc) {
grpc_call_element *elem =
grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
+ requested_call *rc = prc;
call_data *calld = elem->call_data;
- grpc_cq_end_op(calld->cq_new, tag, call, success);
+ grpc_cq_end_op(calld->cq_new, rc->tag, success, done_request_event, rc,
+ &rc->completion);
+ GRPC_CALL_INTERNAL_UNREF(call, "server", 0);
}
const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {