aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-07-04 22:35:00 -0700
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-07-04 22:35:00 -0700
commit092d8d1b7f4acd55a7bb89b10deb8e4c33ad6930 (patch)
tree4d4b64d2bd4877eb1332fa59fd5bd428a4773d4d /src/core/surface
parent51fbeb0c7c1f9d6f92fd66adcb1c788bdfce29e2 (diff)
Remove ALL_CALLS list
Diffstat (limited to 'src/core/surface')
-rw-r--r--src/core/surface/server.c104
1 files changed, 43 insertions, 61 deletions
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index f29d47c17c..ee394bb33a 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -51,7 +51,7 @@
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
-typedef enum { PENDING_START, ALL_CALLS, CALL_LIST_COUNT } call_list;
+typedef enum { PENDING_START, CALL_LIST_COUNT } call_list;
typedef struct listener {
void *arg;
@@ -183,6 +183,10 @@ typedef enum {
struct call_data {
grpc_call *call;
+ /** is this call counted towards the channels total
+ number of calls? */
+ gpr_uint8 active;
+
call_state state;
grpc_mdstr *path;
grpc_mdstr *host;
@@ -280,15 +284,27 @@ static void send_shutdown(grpc_channel *channel, int send_goaway,
}
static void channel_broadcaster_shutdown(channel_broadcaster *cb,
- int send_goaway, int send_disconnect) {
+ int send_goaway, int force_disconnect) {
size_t i;
+ if (send_goaway) {
+ for (i = 0; i < cb->num_channels; i++) {
+ send_shutdown(cb->channels[i], 1, 0);
+ }
+ }
+ if (force_disconnect) {
+ for (i = 0; i < cb->num_channels; i++) {
+ send_shutdown(cb->channels[i], 0, 1);
+ }
+ } else {
+ for (i = 0; i < cb->num_disconnects; i++) {
+ send_shutdown(cb->disconnects[i], 0, 1);
+ }
+ }
for (i = 0; i < cb->num_channels; i++) {
- send_shutdown(cb->channels[i], 1, 0);
GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast");
}
for (i = 0; i < cb->num_disconnects; i++) {
- send_shutdown(cb->disconnects[i], 0, 1);
GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast-disconnect");
}
gpr_free(cb->channels);
@@ -501,15 +517,6 @@ static void maybe_finish_shutdown(grpc_server *server) {
return;
}
- gpr_mu_lock(&server->mu_call);
- if (server->lists[ALL_CALLS] != NULL) {
- gpr_log(GPR_DEBUG,
- "Waiting for all calls to finish before destroying server");
- gpr_mu_unlock(&server->mu_call);
- return;
- }
- gpr_mu_unlock(&server->mu_call);
-
if (server->root_channel_data.next != &server->root_channel_data) {
gpr_log(GPR_DEBUG,
"Waiting for all channels to close before destroying server");
@@ -601,7 +608,8 @@ static void server_on_recv(void *ptr, int success) {
grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
grpc_iomgr_add_callback(&calld->kill_zombie_closure);
}
- remove_res = call_list_remove(calld, ALL_CALLS);
+ remove_res = calld->active;
+ calld->active = 0;
gpr_mu_unlock(&chand->server->mu_call);
gpr_mu_lock(&chand->server->mu_global);
if (remove_res) {
@@ -676,13 +684,10 @@ static void init_call_elem(grpc_call_element *elem,
memset(calld, 0, sizeof(call_data));
calld->deadline = gpr_inf_future;
calld->call = grpc_call_from_top_element(elem);
+ calld->active = 1;
grpc_iomgr_closure_init(&calld->server_on_recv, server_on_recv, elem);
- gpr_mu_lock(&chand->server->mu_call);
- call_list_join(&chand->server->lists[ALL_CALLS], calld, ALL_CALLS);
- gpr_mu_unlock(&chand->server->mu_call);
-
gpr_mu_lock(&chand->server->mu_global);
chand->num_calls++;
gpr_mu_unlock(&chand->server->mu_global);
@@ -695,20 +700,31 @@ static void init_call_elem(grpc_call_element *elem,
static void destroy_call_elem(grpc_call_element *elem) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
- int removed[CALL_LIST_COUNT];
+ int disconnect = 0;
+ int active;
size_t i;
gpr_mu_lock(&chand->server->mu_call);
for (i = 0; i < CALL_LIST_COUNT; i++) {
- removed[i] = call_list_remove(elem->call_data, i);
+ call_list_remove(elem->call_data, i);
}
+ active = calld->active;
+ calld->active = 0;
gpr_mu_unlock(&chand->server->mu_call);
- if (removed[ALL_CALLS]) {
+ if (active) {
gpr_mu_lock(&chand->server->mu_global);
- decrement_call_count(chand);
+ disconnect = decrement_call_count(chand);
+ if (disconnect) {
+ GRPC_CHANNEL_INTERNAL_REF(chand->channel, "send-disconnect");
+ }
gpr_mu_unlock(&chand->server->mu_global);
}
+ if (disconnect) {
+ send_shutdown(chand->channel, 0, 1);
+ GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "send-disconnect");
+ }
+
if (calld->host) {
grpc_mdstr_unref(calld->host);
}
@@ -1049,47 +1065,13 @@ void grpc_server_listener_destroy_done(void *s) {
}
void grpc_server_cancel_all_calls(grpc_server *server) {
- call_data *calld;
- grpc_call **calls;
- size_t call_count;
- size_t call_capacity;
- int is_first = 1;
- size_t i;
-
- gpr_mu_lock(&server->mu_call);
-
- GPR_ASSERT(server->shutdown);
-
- if (!server->lists[ALL_CALLS]) {
- gpr_mu_unlock(&server->mu_call);
- return;
- }
-
- call_capacity = 8;
- call_count = 0;
- calls = gpr_malloc(sizeof(grpc_call *) * call_capacity);
-
- for (calld = server->lists[ALL_CALLS];
- calld != server->lists[ALL_CALLS] || is_first;
- calld = calld->links[ALL_CALLS].next) {
- if (call_count == call_capacity) {
- call_capacity *= 2;
- calls = gpr_realloc(calls, sizeof(grpc_call *) * call_capacity);
- }
- calls[call_count++] = calld->call;
- GRPC_CALL_INTERNAL_REF(calld->call, "cancel_all");
- is_first = 0;
- }
-
- gpr_mu_unlock(&server->mu_call);
+ channel_broadcaster broadcaster;
- for (i = 0; i < call_count; i++) {
- grpc_call_cancel_with_status(calls[i], GRPC_STATUS_UNAVAILABLE,
- "Unavailable");
- GRPC_CALL_INTERNAL_UNREF(calls[i], "cancel_all", 1);
- }
+ gpr_mu_lock(&server->mu_global);
+ channel_broadcaster_init(server, &broadcaster);
+ gpr_mu_unlock(&server->mu_global);
- gpr_free(calls);
+ channel_broadcaster_shutdown(&broadcaster, 0, 1);
}
void grpc_server_destroy(grpc_server *server) {