aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-07-16 07:39:59 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-07-16 07:39:59 -0700
commitf101af1ab4c553ff2a8cd8f72e8ca156ef188b86 (patch)
tree5b8b6c316210ae1ae094fe17923bb0f8b44b2a8d /src/core
parent0327ce5ca851440f168ae3a8b359e4a13a88f2e8 (diff)
parent097468d43ad44c087a01abee5044bd683c144d18 (diff)
Merge pull request #2468 from yang-g/cleanup_handshaking_channels_when_server_shutdown
Clean up handshaking server channels properly
Diffstat (limited to 'src/core')
-rw-r--r--src/core/httpcli/httpcli.c1
-rw-r--r--src/core/security/secure_transport_setup.c29
-rw-r--r--src/core/security/secure_transport_setup.h2
-rw-r--r--src/core/security/server_secure_chttp2.c47
-rw-r--r--src/core/surface/secure_channel_create.c1
5 files changed, 67 insertions, 13 deletions
diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c
index 3f5557e08e..65997d5f44 100644
--- a/src/core/httpcli/httpcli.c
+++ b/src/core/httpcli/httpcli.c
@@ -165,6 +165,7 @@ static void start_write(internal_request *req) {
static void on_secure_transport_setup_done(void *rp,
grpc_security_status status,
+ grpc_endpoint *wrapped_endpoint,
grpc_endpoint *secure_endpoint) {
internal_request *req = rp;
if (status != GRPC_SECURITY_OK) {
diff --git a/src/core/security/secure_transport_setup.c b/src/core/security/secure_transport_setup.c
index 731b382f09..0c3572b53c 100644
--- a/src/core/security/secure_transport_setup.c
+++ b/src/core/security/secure_transport_setup.c
@@ -47,7 +47,8 @@ typedef struct {
tsi_handshaker *handshaker;
unsigned char *handshake_buffer;
size_t handshake_buffer_size;
- grpc_endpoint *endpoint;
+ grpc_endpoint *wrapped_endpoint;
+ grpc_endpoint *secure_endpoint;
gpr_slice_buffer left_overs;
grpc_secure_transport_setup_done_cb cb;
void *user_data;
@@ -63,13 +64,16 @@ static void on_handshake_data_sent_to_peer(void *setup,
static void secure_transport_setup_done(grpc_secure_transport_setup *s,
int is_success) {
if (is_success) {
- s->cb(s->user_data, GRPC_SECURITY_OK, s->endpoint);
+ s->cb(s->user_data, GRPC_SECURITY_OK, s->wrapped_endpoint,
+ s->secure_endpoint);
} else {
- if (s->endpoint != NULL) {
- grpc_endpoint_shutdown(s->endpoint);
- grpc_endpoint_destroy(s->endpoint);
+ if (s->secure_endpoint != NULL) {
+ grpc_endpoint_shutdown(s->secure_endpoint);
+ grpc_endpoint_destroy(s->secure_endpoint);
+ } else {
+ grpc_endpoint_destroy(s->wrapped_endpoint);
}
- s->cb(s->user_data, GRPC_SECURITY_ERROR, NULL);
+ s->cb(s->user_data, GRPC_SECURITY_ERROR, s->wrapped_endpoint, NULL);
}
if (s->handshaker != NULL) tsi_handshaker_destroy(s->handshaker);
if (s->handshake_buffer != NULL) gpr_free(s->handshake_buffer);
@@ -95,8 +99,9 @@ static void on_peer_checked(void *user_data, grpc_security_status status) {
secure_transport_setup_done(s, 0);
return;
}
- s->endpoint = grpc_secure_endpoint_create(
- protector, s->endpoint, s->left_overs.slices, s->left_overs.count);
+ s->secure_endpoint =
+ grpc_secure_endpoint_create(protector, s->wrapped_endpoint,
+ s->left_overs.slices, s->left_overs.count);
secure_transport_setup_done(s, 1);
return;
}
@@ -152,7 +157,7 @@ static void send_handshake_bytes_to_peer(grpc_secure_transport_setup *s) {
gpr_slice_from_copied_buffer((const char *)s->handshake_buffer, offset);
/* TODO(klempner,jboeuf): This should probably use the client setup
deadline */
- write_status = grpc_endpoint_write(s->endpoint, &to_send, 1,
+ write_status = grpc_endpoint_write(s->wrapped_endpoint, &to_send, 1,
on_handshake_data_sent_to_peer, s);
if (write_status == GRPC_ENDPOINT_WRITE_ERROR) {
gpr_log(GPR_ERROR, "Could not send handshake data to peer.");
@@ -198,7 +203,7 @@ static void on_handshake_data_received_from_peer(
if (result == TSI_INCOMPLETE_DATA) {
/* TODO(klempner,jboeuf): This should probably use the client setup
deadline */
- grpc_endpoint_notify_on_read(s->endpoint,
+ grpc_endpoint_notify_on_read(s->wrapped_endpoint,
on_handshake_data_received_from_peer, setup);
cleanup_slices(slices, nslices);
return;
@@ -256,7 +261,7 @@ static void on_handshake_data_sent_to_peer(void *setup,
if (tsi_handshaker_is_in_progress(s->handshaker)) {
/* TODO(klempner,jboeuf): This should probably use the client setup
deadline */
- grpc_endpoint_notify_on_read(s->endpoint,
+ grpc_endpoint_notify_on_read(s->wrapped_endpoint,
on_handshake_data_received_from_peer, setup);
} else {
check_peer(s);
@@ -280,7 +285,7 @@ void grpc_setup_secure_transport(grpc_security_connector *connector,
GRPC_SECURITY_CONNECTOR_REF(connector, "secure_transport_setup");
s->handshake_buffer_size = GRPC_INITIAL_HANDSHAKE_BUFFER_SIZE;
s->handshake_buffer = gpr_malloc(s->handshake_buffer_size);
- s->endpoint = nonsecure_endpoint;
+ s->wrapped_endpoint = nonsecure_endpoint;
s->user_data = user_data;
s->cb = cb;
gpr_slice_buffer_init(&s->left_overs);
diff --git a/src/core/security/secure_transport_setup.h b/src/core/security/secure_transport_setup.h
index 58701c461d..29025f5236 100644
--- a/src/core/security/secure_transport_setup.h
+++ b/src/core/security/secure_transport_setup.h
@@ -42,7 +42,7 @@
/* Ownership of the secure_endpoint is transfered. */
typedef void (*grpc_secure_transport_setup_done_cb)(
void *user_data, grpc_security_status status,
- grpc_endpoint *secure_endpoint);
+ grpc_endpoint *wrapped_endpoint, grpc_endpoint *secure_endpoint);
/* Calls the callback upon completion. */
void grpc_setup_secure_transport(grpc_security_connector *connector,
diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c
index 8a7ada07af..3717b8989f 100644
--- a/src/core/security/server_secure_chttp2.c
+++ b/src/core/security/server_secure_chttp2.c
@@ -51,10 +51,16 @@
#include <grpc/support/sync.h>
#include <grpc/support/useful.h>
+typedef struct tcp_endpoint_list {
+ grpc_endpoint *tcp_endpoint;
+ struct tcp_endpoint_list *next;
+} tcp_endpoint_list;
+
typedef struct grpc_server_secure_state {
grpc_server *server;
grpc_tcp_server *tcp;
grpc_security_connector *sc;
+ tcp_endpoint_list *handshaking_tcp_endpoints;
int is_shutdown;
gpr_mu mu;
gpr_refcount refcount;
@@ -88,14 +94,37 @@ static void setup_transport(void *statep, grpc_transport *transport,
grpc_channel_args_destroy(args_copy);
}
+static int remove_tcp_from_list_locked(grpc_server_secure_state *state,
+ grpc_endpoint *tcp) {
+ tcp_endpoint_list *node = state->handshaking_tcp_endpoints;
+ tcp_endpoint_list *tmp = NULL;
+ if (node && node->tcp_endpoint == tcp) {
+ state->handshaking_tcp_endpoints = state->handshaking_tcp_endpoints->next;
+ gpr_free(node);
+ return 0;
+ }
+ while (node) {
+ if (node->next->tcp_endpoint == tcp) {
+ tmp = node->next;
+ node->next = node->next->next;
+ gpr_free(tmp);
+ return 0;
+ }
+ node = node->next;
+ }
+ return -1;
+}
+
static void on_secure_transport_setup_done(void *statep,
grpc_security_status status,
+ grpc_endpoint *wrapped_endpoint,
grpc_endpoint *secure_endpoint) {
grpc_server_secure_state *state = statep;
grpc_transport *transport;
grpc_mdctx *mdctx;
if (status == GRPC_SECURITY_OK) {
gpr_mu_lock(&state->mu);
+ remove_tcp_from_list_locked(state, wrapped_endpoint);
if (!state->is_shutdown) {
mdctx = grpc_mdctx_create();
transport = grpc_create_chttp2_transport(
@@ -110,6 +139,9 @@ static void on_secure_transport_setup_done(void *statep,
}
gpr_mu_unlock(&state->mu);
} else {
+ gpr_mu_lock(&state->mu);
+ remove_tcp_from_list_locked(state, wrapped_endpoint);
+ gpr_mu_unlock(&state->mu);
gpr_log(GPR_ERROR, "Secure transport failed with error %d", status);
}
state_unref(state);
@@ -117,7 +149,14 @@ static void on_secure_transport_setup_done(void *statep,
static void on_accept(void *statep, grpc_endpoint *tcp) {
grpc_server_secure_state *state = statep;
+ tcp_endpoint_list *node;
state_ref(state);
+ node = gpr_malloc(sizeof(tcp_endpoint_list));
+ node->tcp_endpoint = tcp;
+ gpr_mu_lock(&state->mu);
+ node->next = state->handshaking_tcp_endpoints;
+ state->handshaking_tcp_endpoints = node;
+ gpr_mu_unlock(&state->mu);
grpc_setup_secure_transport(state->sc, tcp, on_secure_transport_setup_done,
state);
}
@@ -132,6 +171,13 @@ static void start(grpc_server *server, void *statep, grpc_pollset **pollsets,
static void destroy_done(void *statep) {
grpc_server_secure_state *state = statep;
grpc_server_listener_destroy_done(state->server);
+ gpr_mu_lock(&state->mu);
+ while (state->handshaking_tcp_endpoints != NULL) {
+ grpc_endpoint_shutdown(state->handshaking_tcp_endpoints->tcp_endpoint);
+ remove_tcp_from_list_locked(state,
+ state->handshaking_tcp_endpoints->tcp_endpoint);
+ }
+ gpr_mu_unlock(&state->mu);
state_unref(state);
}
@@ -209,6 +255,7 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
state->server = server;
state->tcp = tcp;
state->sc = sc;
+ state->handshaking_tcp_endpoints = NULL;
state->is_shutdown = 0;
gpr_mu_init(&state->mu);
gpr_ref_init(&state->refcount, 1);
diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c
index 34ee3f8400..f3c7d8397b 100644
--- a/src/core/surface/secure_channel_create.c
+++ b/src/core/surface/secure_channel_create.c
@@ -75,6 +75,7 @@ static void connector_unref(grpc_connector *con) {
static void on_secure_transport_setup_done(void *arg,
grpc_security_status status,
+ grpc_endpoint *wrapped_endpoint,
grpc_endpoint *secure_endpoint) {
connector *c = arg;
grpc_iomgr_closure *notify;