diff options
Diffstat (limited to 'src/core/surface/server.c')
-rw-r--r-- | src/core/surface/server.c | 281 |
1 files changed, 153 insertions, 128 deletions
diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 341ca2942c..4dc51bf031 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -72,12 +72,14 @@ typedef struct { typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type; -typedef struct { +typedef struct requested_call { requested_call_type type; + struct requested_call *next; void *tag; grpc_completion_queue *cq_bound_to_call; grpc_completion_queue *cq_for_notification; grpc_call **call; + grpc_cq_completion completion; union { struct { grpc_call_details *details; @@ -92,17 +94,11 @@ typedef struct { } data; } requested_call; -typedef struct { - requested_call *calls; - size_t count; - size_t capacity; -} requested_call_array; - struct registered_method { char *method; char *host; call_data *pending; - requested_call_array requested; + requested_call *requests; registered_method *next; }; @@ -131,6 +127,7 @@ struct channel_data { typedef struct shutdown_tag { void *tag; grpc_completion_queue *cq; + grpc_cq_completion completion; } shutdown_tag; struct grpc_server { @@ -153,7 +150,7 @@ struct grpc_server { gpr_mu mu_call; /* mutex for call-specific state */ registered_method *registered_methods; - requested_call_array requested_calls; + requested_call *requests; gpr_uint8 shutdown; gpr_uint8 shutdown_published; @@ -166,6 +163,9 @@ struct grpc_server { listener *listeners; int listeners_destroyed; gpr_refcount internal_refcount; + + /** when did we print the last shutdown progress message */ + gpr_timespec last_shutdown_message_time; }; typedef enum { @@ -182,7 +182,11 @@ typedef enum { struct call_data { grpc_call *call; + /** protects state */ + gpr_mu mu_state; + /** the current state of a call - see call_state */ call_state state; + grpc_mdstr *path; grpc_mdstr *host; gpr_timespec deadline; @@ -266,7 +270,8 @@ static void send_shutdown(grpc_channel *channel, int send_goaway, } static void channel_broadcaster_shutdown(channel_broadcaster *cb, - int send_goaway, int force_disconnect) { + int send_goaway, + int force_disconnect) { size_t i; for (i = 0; i < cb->num_channels; i++) { @@ -325,22 +330,6 @@ static int call_list_remove(call_data *call, call_list list) { return 1; } -static void requested_call_array_destroy(requested_call_array *array) { - gpr_free(array->calls); -} - -static requested_call *requested_call_array_add(requested_call_array *array) { - requested_call *rc; - if (array->count == array->capacity) { - array->capacity = GPR_MAX(array->capacity + 8, array->capacity * 2); - array->calls = - gpr_realloc(array->calls, sizeof(requested_call) * array->capacity); - } - rc = &array->calls[array->count++]; - memset(rc, 0, sizeof(*rc)); - return rc; -} - static void server_ref(grpc_server *server) { gpr_ref(&server->internal_refcount); } @@ -352,12 +341,10 @@ static void server_delete(grpc_server *server) { gpr_mu_destroy(&server->mu_global); gpr_mu_destroy(&server->mu_call); gpr_free(server->channel_filters); - requested_call_array_destroy(&server->requested_calls); while ((rm = server->registered_methods) != NULL) { server->registered_methods = rm->next; gpr_free(rm->method); gpr_free(rm->host); - requested_call_array_destroy(&rm->requested); gpr_free(rm); } for (i = 0; i < server->cq_count; i++) { @@ -403,21 +390,26 @@ static void destroy_channel(channel_data *chand) { grpc_iomgr_add_callback(&chand->finish_destroy_channel_closure); } -static void finish_start_new_rpc_and_unlock(grpc_server *server, - grpc_call_element *elem, - call_data **pending_root, - requested_call_array *array) { - requested_call rc; +static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem, + call_data **pending_root, + requested_call **requests) { + requested_call *rc; call_data *calld = elem->call_data; - if (array->count == 0) { + gpr_mu_lock(&server->mu_call); + rc = *requests; + if (rc == NULL) { + gpr_mu_lock(&calld->mu_state); calld->state = PENDING; + gpr_mu_unlock(&calld->mu_state); call_list_join(pending_root, calld, PENDING_START); gpr_mu_unlock(&server->mu_call); } else { - rc = array->calls[--array->count]; + *requests = rc->next; + gpr_mu_lock(&calld->mu_state); calld->state = ACTIVATED; + gpr_mu_unlock(&calld->mu_state); gpr_mu_unlock(&server->mu_call); - begin_call(server, calld, &rc); + begin_call(server, calld, rc); } } @@ -429,20 +421,18 @@ static void start_new_rpc(grpc_call_element *elem) { gpr_uint32 hash; channel_registered_method *rm; - gpr_mu_lock(&server->mu_call); if (chand->registered_methods && calld->path && calld->host) { /* TODO(ctiller): unify these two searches */ /* check for an exact match with host */ hash = GRPC_MDSTR_KV_HASH(calld->host->hash, calld->path->hash); - for (i = 0; i < chand->registered_method_max_probes; i++) { + for (i = 0; i <= chand->registered_method_max_probes; i++) { rm = &chand->registered_methods[(hash + i) % chand->registered_method_slots]; if (!rm) break; if (rm->host != calld->host) continue; if (rm->method != calld->path) continue; - finish_start_new_rpc_and_unlock(server, elem, - &rm->server_registered_method->pending, - &rm->server_registered_method->requested); + finish_start_new_rpc(server, elem, &rm->server_registered_method->pending, + &rm->server_registered_method->requests); return; } /* check for a wildcard method definition (no host set) */ @@ -453,14 +443,13 @@ static void start_new_rpc(grpc_call_element *elem) { if (!rm) break; if (rm->host != NULL) continue; if (rm->method != calld->path) continue; - finish_start_new_rpc_and_unlock(server, elem, - &rm->server_registered_method->pending, - &rm->server_registered_method->requested); + finish_start_new_rpc(server, elem, &rm->server_registered_method->pending, + &rm->server_registered_method->requests); return; } } - finish_start_new_rpc_and_unlock(server, elem, &server->lists[PENDING_START], - &server->requested_calls); + finish_start_new_rpc(server, elem, &server->lists[PENDING_START], + &server->requests); } static void kill_zombie(void *elem, int success) { @@ -476,26 +465,47 @@ static int num_listeners(grpc_server *server) { return n; } +static void done_shutdown_event(void *server, grpc_cq_completion *completion) { + server_unref(server); +} + +static int num_channels(grpc_server *server) { + channel_data *chand; + int n = 0; + for (chand = server->root_channel_data.next; + chand != &server->root_channel_data; chand = chand->next) { + n++; + } + return n; +} + static void maybe_finish_shutdown(grpc_server *server) { size_t i; if (!server->shutdown || server->shutdown_published) { return; } - if (server->root_channel_data.next != &server->root_channel_data) { - gpr_log(GPR_DEBUG, - "Waiting for all channels to close before destroying server"); - return; - } - if (server->listeners_destroyed < num_listeners(server)) { - gpr_log(GPR_DEBUG, "Waiting for all listeners to be destroyed (@ %d/%d)", - server->listeners_destroyed, num_listeners(server)); + if (server->root_channel_data.next != &server->root_channel_data || + server->listeners_destroyed < num_listeners(server)) { + if (gpr_time_cmp( + gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), server->last_shutdown_message_time), + gpr_time_from_seconds(1)) >= 0) { + server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME); + gpr_log(GPR_DEBUG, + "Waiting for %d channels and %d/%d listeners to be destroyed" + " before shutting down server", + num_channels(server), + num_listeners(server) - server->listeners_destroyed, + num_listeners(server)); + } return; } server->shutdown_published = 1; for (i = 0; i < server->num_shutdown_tags; i++) { - grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag, - NULL, 1); + server_ref(server); + grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag, 1, + done_shutdown_event, server, + &server->shutdown_tags[i].completion); } } @@ -504,10 +514,10 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; if (md->key == chand->path_key) { - calld->path = grpc_mdstr_ref(md->value); + calld->path = GRPC_MDSTR_REF(md->value); return NULL; } else if (md->key == chand->authority_key) { - calld->host = grpc_mdstr_ref(md->value); + calld->host = GRPC_MDSTR_REF(md->value); return NULL; } return md; @@ -541,27 +551,34 @@ static void server_on_recv(void *ptr, int success) { case GRPC_STREAM_SEND_CLOSED: break; case GRPC_STREAM_RECV_CLOSED: - gpr_mu_lock(&chand->server->mu_call); + gpr_mu_lock(&calld->mu_state); if (calld->state == NOT_STARTED) { calld->state = ZOMBIED; + gpr_mu_unlock(&calld->mu_state); grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); grpc_iomgr_add_callback(&calld->kill_zombie_closure); + } else { + gpr_mu_unlock(&calld->mu_state); } - gpr_mu_unlock(&chand->server->mu_call); break; case GRPC_STREAM_CLOSED: - gpr_mu_lock(&chand->server->mu_call); + gpr_mu_lock(&calld->mu_state); if (calld->state == NOT_STARTED) { calld->state = ZOMBIED; + gpr_mu_unlock(&calld->mu_state); grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); grpc_iomgr_add_callback(&calld->kill_zombie_closure); } else if (calld->state == PENDING) { - call_list_remove(calld, PENDING_START); calld->state = ZOMBIED; + gpr_mu_unlock(&calld->mu_state); + gpr_mu_lock(&chand->server->mu_call); + call_list_remove(calld, PENDING_START); + gpr_mu_unlock(&chand->server->mu_call); grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); grpc_iomgr_add_callback(&calld->kill_zombie_closure); + } else { + gpr_mu_unlock(&calld->mu_state); } - gpr_mu_unlock(&chand->server->mu_call); break; } @@ -623,6 +640,7 @@ static void init_call_elem(grpc_call_element *elem, memset(calld, 0, sizeof(call_data)); calld->deadline = gpr_inf_future; calld->call = grpc_call_from_top_element(elem); + gpr_mu_init(&calld->mu_state); grpc_iomgr_closure_init(&calld->server_on_recv, server_on_recv, elem); @@ -634,21 +652,22 @@ static void init_call_elem(grpc_call_element *elem, static void destroy_call_elem(grpc_call_element *elem) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; - size_t i; - gpr_mu_lock(&chand->server->mu_call); - for (i = 0; i < CALL_LIST_COUNT; i++) { - call_list_remove(elem->call_data, i); + if (calld->state == PENDING) { + gpr_mu_lock(&chand->server->mu_call); + call_list_remove(elem->call_data, PENDING_START); + gpr_mu_unlock(&chand->server->mu_call); } - gpr_mu_unlock(&chand->server->mu_call); if (calld->host) { - grpc_mdstr_unref(calld->host); + GRPC_MDSTR_UNREF(calld->host); } if (calld->path) { - grpc_mdstr_unref(calld->path); + GRPC_MDSTR_UNREF(calld->path); } + gpr_mu_destroy(&calld->mu_state); + server_unref(chand->server); } @@ -676,10 +695,10 @@ static void destroy_channel_elem(grpc_channel_element *elem) { if (chand->registered_methods) { for (i = 0; i < chand->registered_method_slots; i++) { if (chand->registered_methods[i].method) { - grpc_mdstr_unref(chand->registered_methods[i].method); + GRPC_MDSTR_UNREF(chand->registered_methods[i].method); } if (chand->registered_methods[i].host) { - grpc_mdstr_unref(chand->registered_methods[i].host); + GRPC_MDSTR_UNREF(chand->registered_methods[i].host); } } gpr_free(chand->registered_methods); @@ -691,8 +710,8 @@ static void destroy_channel_elem(grpc_channel_element *elem) { chand->next = chand->prev = chand; maybe_finish_shutdown(chand->server); gpr_mu_unlock(&chand->server->mu_global); - grpc_mdstr_unref(chand->path_key); - grpc_mdstr_unref(chand->authority_key); + GRPC_MDSTR_UNREF(chand->path_key); + GRPC_MDSTR_UNREF(chand->authority_key); server_unref(chand->server); } } @@ -910,15 +929,14 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport, void grpc_server_shutdown_and_notify(grpc_server *server, grpc_completion_queue *cq, void *tag) { listener *l; - requested_call_array requested_calls; - size_t i; + requested_call *requests = NULL; registered_method *rm; shutdown_tag *sdt; channel_broadcaster broadcaster; /* lock, and gather up some stuff to do */ gpr_mu_lock(&server->mu_global); - grpc_cq_begin_op(cq, NULL); + grpc_cq_begin_op(cq); server->shutdown_tags = gpr_realloc(server->shutdown_tags, sizeof(shutdown_tag) * (server->num_shutdown_tags + 1)); @@ -930,27 +948,21 @@ void grpc_server_shutdown_and_notify(grpc_server *server, return; } + server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME); + channel_broadcaster_init(server, &broadcaster); /* collect all unregistered then registered calls */ gpr_mu_lock(&server->mu_call); - requested_calls = server->requested_calls; - memset(&server->requested_calls, 0, sizeof(server->requested_calls)); + requests = server->requests; + server->requests = NULL; for (rm = server->registered_methods; rm; rm = rm->next) { - if (requested_calls.count + rm->requested.count > - requested_calls.capacity) { - requested_calls.capacity = - GPR_MAX(requested_calls.count + rm->requested.count, - 2 * requested_calls.capacity); - requested_calls.calls = - gpr_realloc(requested_calls.calls, sizeof(*requested_calls.calls) * - requested_calls.capacity); + while (rm->requests != NULL) { + requested_call *c = rm->requests; + rm->requests = c->next; + c->next = requests; + requests = c; } - memcpy(requested_calls.calls + requested_calls.count, rm->requested.calls, - sizeof(*requested_calls.calls) * rm->requested.count); - requested_calls.count += rm->requested.count; - gpr_free(rm->requested.calls); - memset(&rm->requested, 0, sizeof(rm->requested)); } gpr_mu_unlock(&server->mu_call); @@ -959,10 +971,11 @@ void grpc_server_shutdown_and_notify(grpc_server *server, gpr_mu_unlock(&server->mu_global); /* terminate all the requested calls */ - for (i = 0; i < requested_calls.count; i++) { - fail_call(server, &requested_calls.calls[i]); + while (requests != NULL) { + requested_call *next = requests->next; + fail_call(server, requests); + requests = next; } - gpr_free(requested_calls.calls); /* Shutdown listeners */ for (l = server->listeners; l; l = l->next) { @@ -1024,7 +1037,7 @@ void grpc_server_add_listener(grpc_server *server, void *arg, static grpc_call_error queue_call_request(grpc_server *server, requested_call *rc) { call_data *calld = NULL; - requested_call_array *requested_calls = NULL; + requested_call **requests = NULL; gpr_mu_lock(&server->mu_call); if (server->shutdown) { gpr_mu_unlock(&server->mu_call); @@ -1035,22 +1048,25 @@ static grpc_call_error queue_call_request(grpc_server *server, case BATCH_CALL: calld = call_list_remove_head(&server->lists[PENDING_START], PENDING_START); - requested_calls = &server->requested_calls; + requests = &server->requests; break; case REGISTERED_CALL: calld = call_list_remove_head( &rc->data.registered.registered_method->pending, PENDING_START); - requested_calls = &rc->data.registered.registered_method->requested; + requests = &rc->data.registered.registered_method->requests; break; } - if (calld) { + if (calld != NULL) { + gpr_mu_unlock(&server->mu_call); + gpr_mu_lock(&calld->mu_state); GPR_ASSERT(calld->state == PENDING); calld->state = ACTIVATED; - gpr_mu_unlock(&server->mu_call); + gpr_mu_unlock(&calld->mu_state); begin_call(server, calld, rc); return GRPC_CALL_OK; } else { - *requested_call_array_add(requested_calls) = *rc; + rc->next = *requests; + *requests = rc; gpr_mu_unlock(&server->mu_call); return GRPC_CALL_OK; } @@ -1061,22 +1077,23 @@ grpc_call_error grpc_server_request_call( grpc_metadata_array *initial_metadata, grpc_completion_queue *cq_bound_to_call, grpc_completion_queue *cq_for_notification, void *tag) { - requested_call rc; + requested_call *rc = gpr_malloc(sizeof(*rc)); GRPC_SERVER_LOG_REQUEST_CALL(GPR_INFO, server, call, details, initial_metadata, cq_bound_to_call, cq_for_notification, tag); if (!grpc_cq_is_server_cq(cq_for_notification)) { + gpr_free(rc); return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; } - grpc_cq_begin_op(cq_for_notification, NULL); - rc.type = BATCH_CALL; - rc.tag = tag; - rc.cq_bound_to_call = cq_bound_to_call; - rc.cq_for_notification = cq_for_notification; - rc.call = call; - rc.data.batch.details = details; - rc.data.batch.initial_metadata = initial_metadata; - return queue_call_request(server, &rc); + grpc_cq_begin_op(cq_for_notification); + rc->type = BATCH_CALL; + rc->tag = tag; + rc->cq_bound_to_call = cq_bound_to_call; + rc->cq_for_notification = cq_for_notification; + rc->call = call; + rc->data.batch.details = details; + rc->data.batch.initial_metadata = initial_metadata; + return queue_call_request(server, rc); } grpc_call_error grpc_server_request_registered_call( @@ -1084,22 +1101,23 @@ grpc_call_error grpc_server_request_registered_call( grpc_metadata_array *initial_metadata, grpc_byte_buffer **optional_payload, grpc_completion_queue *cq_bound_to_call, grpc_completion_queue *cq_for_notification, void *tag) { - requested_call rc; + requested_call *rc = gpr_malloc(sizeof(*rc)); registered_method *registered_method = rm; if (!grpc_cq_is_server_cq(cq_for_notification)) { + gpr_free(rc); return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; } - grpc_cq_begin_op(cq_for_notification, NULL); - rc.type = REGISTERED_CALL; - rc.tag = tag; - rc.cq_bound_to_call = cq_bound_to_call; - rc.cq_for_notification = cq_for_notification; - rc.call = call; - rc.data.registered.registered_method = registered_method; - rc.data.registered.deadline = deadline; - rc.data.registered.initial_metadata = initial_metadata; - rc.data.registered.optional_payload = optional_payload; - return queue_call_request(server, &rc); + grpc_cq_begin_op(cq_for_notification); + rc->type = REGISTERED_CALL; + rc->tag = tag; + rc->cq_bound_to_call = cq_bound_to_call; + rc->cq_for_notification = cq_for_notification; + rc->call = call; + rc->data.registered.registered_method = registered_method; + rc->data.registered.deadline = deadline; + rc->data.registered.initial_metadata = initial_metadata; + rc->data.registered.optional_payload = optional_payload; + return queue_call_request(server, rc); } static void publish_registered_or_batch(grpc_call *call, int success, @@ -1166,8 +1184,11 @@ static void begin_call(grpc_server *server, call_data *calld, } GRPC_CALL_INTERNAL_REF(calld->call, "server"); - grpc_call_start_ioreq_and_call_back(calld->call, req, r - req, publish, - rc->tag); + grpc_call_start_ioreq_and_call_back(calld->call, req, r - req, publish, rc); +} + +static void done_request_event(void *req, grpc_cq_completion *c) { + gpr_free(req); } static void fail_call(grpc_server *server, requested_call *rc) { @@ -1180,15 +1201,19 @@ static void fail_call(grpc_server *server, requested_call *rc) { rc->data.registered.initial_metadata->count = 0; break; } - grpc_cq_end_op(rc->cq_for_notification, rc->tag, NULL, 0); + grpc_cq_end_op(rc->cq_for_notification, rc->tag, 0, done_request_event, rc, + &rc->completion); } static void publish_registered_or_batch(grpc_call *call, int success, - void *tag) { + void *prc) { grpc_call_element *elem = grpc_call_stack_element(grpc_call_get_call_stack(call), 0); + requested_call *rc = prc; call_data *calld = elem->call_data; - grpc_cq_end_op(calld->cq_new, tag, call, success); + grpc_cq_end_op(calld->cq_new, rc->tag, success, done_request_event, rc, + &rc->completion); + GRPC_CALL_INTERNAL_UNREF(call, "server", 0); } const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) { |