From 7d9f276ea278313f6ba1930931b5dde14167b8ac Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 4 Aug 2016 11:06:49 -0700 Subject: Change handshaker API to use a read buffer to pass leftover bytes read between handshakers. --- .../transport/chttp2/client/insecure/channel_create.c | 19 +++++++++++++------ .../chttp2/client/insecure/channel_create_posix.c | 2 +- .../chttp2/client/secure/secure_channel_create.c | 13 +++++++------ .../transport/chttp2/server/insecure/server_chttp2.c | 6 ++++-- .../chttp2/server/insecure/server_chttp2_posix.c | 2 +- .../chttp2/server/secure/server_secure_chttp2.c | 12 +++++++----- .../ext/transport/chttp2/transport/chttp2_transport.c | 7 +++++-- .../ext/transport/chttp2/transport/chttp2_transport.h | 4 +++- 8 files changed, 41 insertions(+), 24 deletions(-) (limited to 'src/core/ext/transport') diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c index 6f6855584a..cbaa75a90a 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c @@ -88,14 +88,21 @@ static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg, } static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, - grpc_channel_args *args, void *user_data, + grpc_channel_args *args, + gpr_slice_buffer *read_buffer, void *user_data, grpc_error *error) { connector *c = user_data; - c->result->transport = - grpc_create_chttp2_transport(exec_ctx, args, endpoint, 1); - GPR_ASSERT(c->result->transport); - grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, NULL, 0); - c->result->channel_args = args; + if (error != GRPC_ERROR_NONE) { + grpc_channel_args_destroy(args); + gpr_free(read_buffer); + } else { + c->result->transport = + grpc_create_chttp2_transport(exec_ctx, args, endpoint, 1); + GPR_ASSERT(c->result->transport); + grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, + read_buffer); + c->result->channel_args = args; + } grpc_closure *notify = c->notify; c->notify = NULL; grpc_exec_ctx_sched(exec_ctx, notify, error, NULL); diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c index ca435c25ce..b2c5e5b088 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c @@ -75,7 +75,7 @@ grpc_channel *grpc_insecure_channel_create_from_fd( grpc_channel *channel = grpc_channel_create( &exec_ctx, target, final_args, GRPC_CLIENT_DIRECT_CHANNEL, transport); grpc_channel_args_destroy(final_args); - grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL, 0); + grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL); grpc_exec_ctx_finish(&exec_ctx); diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c index 4e33b6fa61..9e2bdd758f 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c @@ -114,8 +114,7 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, gpr_mu_unlock(&c->mu); c->result->transport = grpc_create_chttp2_transport( exec_ctx, c->args.channel_args, secure_endpoint, 1); - grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, NULL, - 0); + grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, NULL); auth_context_arg = grpc_auth_context_to_arg(auth_context); c->result->channel_args = grpc_channel_args_copy_and_add(c->tmp_args, &auth_context_arg, 1); @@ -126,10 +125,13 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, } static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, - grpc_channel_args *args, void *user_data, + grpc_channel_args *args, + gpr_slice_buffer *read_buffer, void *user_data, grpc_error *error) { connector *c = user_data; + c->tmp_args = args; if (error != GRPC_ERROR_NONE) { + gpr_free(read_buffer); grpc_closure *notify = c->notify; c->notify = NULL; grpc_exec_ctx_sched(exec_ctx, notify, error, NULL); @@ -137,10 +139,9 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, // TODO(roth, jboeuf): Convert security connector handshaking to use new // handshake API, and then move the code from on_secure_handshake_done() // into this function. - c->tmp_args = args; grpc_channel_security_connector_do_handshake( - exec_ctx, c->security_connector, endpoint, c->args.deadline, - on_secure_handshake_done, c); + exec_ctx, c->security_connector, endpoint, read_buffer, + c->args.deadline, on_secure_handshake_done, c); } } diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c index 9cd374777e..f0e07429fa 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c @@ -55,7 +55,8 @@ typedef struct server_connect_state { } server_connect_state; static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, - grpc_channel_args *args, void *user_data, + grpc_channel_args *args, + gpr_slice_buffer *read_buffer, void *user_data, grpc_error *error) { server_connect_state *state = user_data; if (error != GRPC_ERROR_NONE) { @@ -64,6 +65,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, grpc_error_free_string(error_str); GRPC_ERROR_UNREF(error); grpc_handshake_manager_shutdown(exec_ctx, state->handshake_mgr); + gpr_free(read_buffer); } else { // Beware that the call to grpc_create_chttp2_transport() has to happen // before grpc_tcp_server_destroy(). This is fine here, but similar code @@ -75,7 +77,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, grpc_server_setup_transport(exec_ctx, state->server, transport, state->accepting_pollset, grpc_server_get_channel_args(state->server)); - grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL, 0); + grpc_chttp2_transport_start_reading(exec_ctx, transport, read_buffer); } // Clean up. grpc_channel_args_destroy(args); diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c index 96bf4d6f30..4350543c27 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c @@ -67,7 +67,7 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server *server, &exec_ctx, server_args, server_endpoint, 0 /* is_client */); grpc_endpoint_add_to_pollset(&exec_ctx, server_endpoint, grpc_cq_pollset(cq)); grpc_server_setup_transport(&exec_ctx, server, transport, NULL, server_args); - grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL, 0); + grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL); grpc_exec_ctx_finish(&exec_ctx); } diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c index ccea15a648..da3e284fcf 100644 --- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c +++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c @@ -111,7 +111,7 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *statep, grpc_server_setup_transport(exec_ctx, state->state->server, transport, state->accepting_pollset, args_copy); grpc_channel_args_destroy(args_copy); - grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL, 0); + grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL); } else { /* We need to consume this here, because the server may already have * gone away. */ @@ -128,7 +128,8 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *statep, } static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, - grpc_channel_args *args, void *user_data, + grpc_channel_args *args, + gpr_slice_buffer *read_buffer, void *user_data, grpc_error *error) { server_secure_connect *state = user_data; if (error != GRPC_ERROR_NONE) { @@ -136,9 +137,10 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, gpr_log(GPR_ERROR, "Handshaking failed: %s", error_str); grpc_error_free_string(error_str); GRPC_ERROR_UNREF(error); + grpc_channel_args_destroy(args); + gpr_free(read_buffer); grpc_handshake_manager_shutdown(exec_ctx, state->handshake_mgr); grpc_handshake_manager_destroy(exec_ctx, state->handshake_mgr); - grpc_channel_args_destroy(args); state_unref(state->state); gpr_free(state); return; @@ -150,8 +152,8 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, // into this function. state->args = args; grpc_server_security_connector_do_handshake( - exec_ctx, state->state->sc, state->acceptor, endpoint, state->deadline, - on_secure_handshake_done, state); + exec_ctx, state->state->sc, state->acceptor, endpoint, read_buffer, + state->deadline, on_secure_handshake_done, state); } static void on_accept(grpc_exec_ctx *exec_ctx, void *statep, grpc_endpoint *tcp, diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 6bd65cea02..f2f5465201 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -2538,9 +2538,12 @@ grpc_transport *grpc_create_chttp2_transport( void grpc_chttp2_transport_start_reading(grpc_exec_ctx *exec_ctx, grpc_transport *transport, - gpr_slice *slices, size_t nslices) { + gpr_slice_buffer *read_buffer) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)transport; REF_TRANSPORT(t, "reading_action"); /* matches unref inside reading_action */ - gpr_slice_buffer_addn(&t->read_buffer, slices, nslices); + if (read_buffer != NULL) { + gpr_slice_buffer_move_into(read_buffer, &t->read_buffer); + gpr_free(read_buffer); + } reading_action(exec_ctx, t, GRPC_ERROR_NONE); } diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.h b/src/core/ext/transport/chttp2/transport/chttp2_transport.h index 5da4276f82..4e2d0954bf 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.h +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.h @@ -44,8 +44,10 @@ grpc_transport *grpc_create_chttp2_transport( grpc_exec_ctx *exec_ctx, const grpc_channel_args *channel_args, grpc_endpoint *ep, int is_client); +/// Takes ownership of \a read_buffer, which (if non-NULL) contains +/// leftover bytes previously read from the endpoint (e.g., by handshakers). void grpc_chttp2_transport_start_reading(grpc_exec_ctx *exec_ctx, grpc_transport *transport, - gpr_slice *slices, size_t nslices); + gpr_slice_buffer *read_buffer); #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_CHTTP2_TRANSPORT_H */ -- cgit v1.2.3