diff options
author | Robbie Shade <rjshade@google.com> | 2016-07-26 07:44:10 -0400 |
---|---|---|
committer | Robbie Shade <rjshade@google.com> | 2016-07-26 07:44:10 -0400 |
commit | 43b633d54f4708750cffe4374c9f56c65bec8fef (patch) | |
tree | 8b1813cd15a7792bfadf1275ab59773d74e71efc /src/core/ext/transport/chttp2 | |
parent | 53826511f95cbfa31e3b8316b11cf397830502b0 (diff) | |
parent | 7b104cd1c23a3e6ee3cb0809f39617ceda5e2575 (diff) |
Merge branch 'master' into move_timeout_encoding
Diffstat (limited to 'src/core/ext/transport/chttp2')
5 files changed, 138 insertions, 46 deletions
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c index 85f9efb3b6..645a011748 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c @@ -45,6 +45,7 @@ #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/compress_filter.h" +#include "src/core/lib/channel/handshaker.h" #include "src/core/lib/channel/http_client_filter.h" #include "src/core/lib/iomgr/tcp_client.h" #include "src/core/lib/surface/api_trace.h" @@ -63,6 +64,8 @@ typedef struct { grpc_endpoint *tcp; grpc_closure connected; + + grpc_handshake_manager *handshake_mgr; } connector; static void connector_ref(grpc_connector *con) { @@ -74,6 +77,7 @@ static void connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *con) { connector *c = (connector *)con; if (gpr_unref(&c->refs)) { /* c->initial_string_buffer does not need to be destroyed */ + grpc_handshake_manager_destroy(exec_ctx, c->handshake_mgr); gpr_free(c); } } @@ -83,9 +87,21 @@ static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg, connector_unref(exec_ctx, arg); } +static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, + grpc_channel_args *args, void *user_data) { + connector *c = user_data; + c->result->transport = + grpc_create_chttp2_transport(exec_ctx, args, endpoint, 1); + GPR_ASSERT(c->result->transport); + grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, NULL, 0); + c->result->channel_args = args; + grpc_closure *notify = c->notify; + c->notify = NULL; + grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_NONE, NULL); +} + static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { connector *c = arg; - grpc_closure *notify; grpc_endpoint *tcp = c->tcp; if (tcp != NULL) { if (!GPR_SLICE_IS_EMPTY(c->args.initial_connect_string)) { @@ -97,19 +113,17 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { connector_ref(arg); grpc_endpoint_write(exec_ctx, tcp, &c->initial_string_buffer, &c->initial_string_sent); + } else { + grpc_handshake_manager_do_handshake( + exec_ctx, c->handshake_mgr, tcp, c->args.channel_args, + c->args.deadline, NULL /* acceptor */, on_handshake_done, c); } - c->result->transport = - grpc_create_chttp2_transport(exec_ctx, c->args.channel_args, tcp, 1); - grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, NULL, - 0); - GPR_ASSERT(c->result->transport); - c->result->channel_args = grpc_channel_args_copy(c->args.channel_args); } else { memset(c->result, 0, sizeof(*c->result)); + grpc_closure *notify = c->notify; + c->notify = NULL; + grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(error), NULL); } - notify = c->notify; - c->notify = NULL; - grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(error), NULL); } static void connector_shutdown(grpc_exec_ctx *exec_ctx, grpc_connector *con) {} @@ -171,6 +185,7 @@ static grpc_subchannel *client_channel_factory_create_subchannel( memset(c, 0, sizeof(*c)); c->base.vtable = &connector_vtable; gpr_ref_init(&c->refs, 1); + c->handshake_mgr = grpc_handshake_manager_create(); args->args = final_args; s = grpc_subchannel_create(exec_ctx, &c->base, args); grpc_connector_unref(exec_ctx, &c->base); diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c index 9acacbd92d..01d949add3 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c @@ -44,6 +44,7 @@ #include "src/core/ext/client_config/resolver_registry.h" #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/handshaker.h" #include "src/core/lib/iomgr/tcp_client.h" #include "src/core/lib/security/context/security_context.h" #include "src/core/lib/security/credentials/credentials.h" @@ -69,6 +70,11 @@ typedef struct { grpc_endpoint *newly_connecting_endpoint; grpc_closure connected_closure; + + grpc_handshake_manager *handshake_mgr; + + // TODO(roth): Remove once we eliminate on_secure_handshake_done(). + grpc_channel_args *tmp_args; } connector; static void connector_ref(grpc_connector *con) { @@ -80,6 +86,8 @@ static void connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *con) { connector *c = (connector *)con; if (gpr_unref(&c->refs)) { /* c->initial_string_buffer does not need to be destroyed */ + grpc_channel_args_destroy(c->tmp_args); + grpc_handshake_manager_destroy(exec_ctx, c->handshake_mgr); gpr_free(c); } } @@ -89,7 +97,6 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *secure_endpoint, grpc_auth_context *auth_context) { connector *c = arg; - grpc_closure *notify; gpr_mu_lock(&c->mu); grpc_error *error = GRPC_ERROR_NONE; if (c->connecting_endpoint == NULL) { @@ -110,25 +117,36 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, NULL, 0); auth_context_arg = grpc_auth_context_to_arg(auth_context); - c->result->channel_args = grpc_channel_args_copy_and_add( - c->args.channel_args, &auth_context_arg, 1); + c->result->channel_args = + grpc_channel_args_copy_and_add(c->tmp_args, &auth_context_arg, 1); } - notify = c->notify; + grpc_closure *notify = c->notify; c->notify = NULL; grpc_exec_ctx_sched(exec_ctx, notify, error, NULL); } +static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, + grpc_channel_args *args, void *user_data) { + connector *c = user_data; + // TODO(roth, jboeuf): Convert security connector handshaking to use new + // handshake API, and then move the code from on_secure_handshake_done() + // into this function. + c->tmp_args = args; + grpc_channel_security_connector_do_handshake(exec_ctx, c->security_connector, + endpoint, c->args.deadline, + on_secure_handshake_done, c); +} + static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { connector *c = arg; - grpc_channel_security_connector_do_handshake( - exec_ctx, c->security_connector, c->connecting_endpoint, c->args.deadline, - on_secure_handshake_done, c); + grpc_handshake_manager_do_handshake( + exec_ctx, c->handshake_mgr, c->connecting_endpoint, c->args.channel_args, + c->args.deadline, NULL /* acceptor */, on_handshake_done, c); } static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { connector *c = arg; - grpc_closure *notify; grpc_endpoint *tcp = c->newly_connecting_endpoint; if (tcp != NULL) { gpr_mu_lock(&c->mu); @@ -144,13 +162,13 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_endpoint_write(exec_ctx, tcp, &c->initial_string_buffer, &c->initial_string_sent); } else { - grpc_channel_security_connector_do_handshake( - exec_ctx, c->security_connector, tcp, c->args.deadline, - on_secure_handshake_done, c); + grpc_handshake_manager_do_handshake( + exec_ctx, c->handshake_mgr, tcp, c->args.channel_args, + c->args.deadline, NULL /* acceptor */, on_handshake_done, c); } } else { memset(c->result, 0, sizeof(*c->result)); - notify = c->notify; + grpc_closure *notify = c->notify; c->notify = NULL; grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(error), NULL); } @@ -229,6 +247,7 @@ static grpc_subchannel *client_channel_factory_create_subchannel( memset(c, 0, sizeof(*c)); c->base.vtable = &connector_vtable; c->security_connector = f->security_connector; + c->handshake_mgr = grpc_handshake_manager_create(); gpr_mu_init(&c->mu); gpr_ref_init(&c->refs, 1); args->args = final_args; diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c index e5c987925c..a33fc93382 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c @@ -37,16 +37,26 @@ #include <grpc/support/log.h> #include <grpc/support/string_util.h> #include <grpc/support/useful.h> + #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/handshaker.h" #include "src/core/lib/channel/http_server_filter.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/tcp_server.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/server.h" -static void new_transport(grpc_exec_ctx *exec_ctx, void *server, - grpc_endpoint *tcp, grpc_pollset *accepting_pollset, - grpc_tcp_server_acceptor *acceptor) { +typedef struct server_connect_state { + grpc_server *server; + grpc_pollset *accepting_pollset; + grpc_tcp_server_acceptor *acceptor; + grpc_handshake_manager *handshake_mgr; +} server_connect_state; + +static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, + grpc_channel_args *args, void *user_data) { + server_connect_state *state = user_data; /* * Beware that the call to grpc_create_chttp2_transport() has to happen before * grpc_tcp_server_destroy(). This is fine here, but similar code @@ -54,18 +64,40 @@ static void new_transport(grpc_exec_ctx *exec_ctx, void *server, * (as in server_secure_chttp2.c) needs to add synchronization to avoid this * case. */ - grpc_transport *transport = grpc_create_chttp2_transport( - exec_ctx, grpc_server_get_channel_args(server), tcp, 0); - grpc_server_setup_transport(exec_ctx, server, transport, accepting_pollset, - grpc_server_get_channel_args(server)); + grpc_transport *transport = + grpc_create_chttp2_transport(exec_ctx, args, endpoint, 0); + grpc_server_setup_transport(exec_ctx, state->server, transport, + state->accepting_pollset, + grpc_server_get_channel_args(state->server)); grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL, 0); + // Clean up. + grpc_channel_args_destroy(args); + grpc_handshake_manager_destroy(exec_ctx, state->handshake_mgr); + gpr_free(state); +} + +static void on_accept(grpc_exec_ctx *exec_ctx, void *server, grpc_endpoint *tcp, + grpc_pollset *accepting_pollset, + grpc_tcp_server_acceptor *acceptor) { + server_connect_state *state = gpr_malloc(sizeof(server_connect_state)); + state->server = server; + state->accepting_pollset = accepting_pollset; + state->acceptor = acceptor; + state->handshake_mgr = grpc_handshake_manager_create(); + // TODO(roth): We should really get this timeout value from channel + // args instead of hard-coding it. + const gpr_timespec deadline = gpr_time_add( + gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(120, GPR_TIMESPAN)); + grpc_handshake_manager_do_handshake( + exec_ctx, state->handshake_mgr, tcp, grpc_server_get_channel_args(server), + deadline, acceptor, on_handshake_done, state); } /* Server callback: start listening on our ports */ static void start(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp, grpc_pollset **pollsets, size_t pollset_count) { grpc_tcp_server *tcp = tcpp; - grpc_tcp_server_start(exec_ctx, tcp, pollsets, pollset_count, new_transport, + grpc_tcp_server_start(exec_ctx, tcp, pollsets, pollset_count, on_accept, server); } @@ -74,6 +106,7 @@ static void start(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp, static void destroy(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp, grpc_closure *destroy_done) { grpc_tcp_server *tcp = tcpp; + grpc_tcp_server_shutdown_listeners(exec_ctx, tcp); grpc_tcp_server_unref(exec_ctx, tcp); grpc_exec_ctx_sched(exec_ctx, destroy_done, GRPC_ERROR_NONE, NULL); } diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c index c42810e913..67e5ee897c 100644 --- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c +++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c @@ -42,6 +42,7 @@ #include <grpc/support/useful.h> #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/handshaker.h" #include "src/core/lib/channel/http_server_filter.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/resolve_address.h" @@ -68,6 +69,12 @@ typedef struct server_secure_state { typedef struct server_secure_connect { server_secure_state *state; grpc_pollset *accepting_pollset; + grpc_tcp_server_acceptor *acceptor; + grpc_handshake_manager *handshake_mgr; + // TODO(roth): Remove the following two fields when we eliminate + // grpc_server_security_connector_do_handshake(). + gpr_timespec deadline; + grpc_channel_args *args; } server_secure_connect; static void state_ref(server_secure_state *state) { gpr_ref(&state->refcount); } @@ -97,13 +104,11 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *statep, transport = grpc_create_chttp2_transport( exec_ctx, grpc_server_get_channel_args(state->state->server), secure_endpoint, 0); - grpc_channel_args *args_copy; grpc_arg args_to_add[2]; args_to_add[0] = grpc_server_credentials_to_arg(state->state->creds); args_to_add[1] = grpc_auth_context_to_arg(auth_context); - args_copy = grpc_channel_args_copy_and_add( - grpc_server_get_channel_args(state->state->server), args_to_add, - GPR_ARRAY_SIZE(args_to_add)); + grpc_channel_args *args_copy = grpc_channel_args_copy_and_add( + state->args, args_to_add, GPR_ARRAY_SIZE(args_to_add)); grpc_server_setup_transport(exec_ctx, state->state->server, transport, state->accepting_pollset, args_copy); grpc_channel_args_destroy(args_copy); @@ -118,10 +123,25 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *statep, } else { gpr_log(GPR_ERROR, "Secure transport failed with error %d", status); } + grpc_channel_args_destroy(state->args); state_unref(state->state); gpr_free(state); } +static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, + grpc_channel_args *args, void *user_data) { + server_secure_connect *state = user_data; + // TODO(roth, jboeuf): Convert security connector handshaking to use new + // handshake API, and then move the code from on_secure_handshake_done() + // into this function. + grpc_handshake_manager_destroy(exec_ctx, state->handshake_mgr); + state->handshake_mgr = NULL; + state->args = args; + grpc_server_security_connector_do_handshake( + exec_ctx, state->state->sc, state->acceptor, endpoint, state->deadline, + on_secure_handshake_done, state); +} + static void on_accept(grpc_exec_ctx *exec_ctx, void *statep, grpc_endpoint *tcp, grpc_pollset *accepting_pollset, grpc_tcp_server_acceptor *acceptor) { @@ -129,11 +149,16 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *statep, grpc_endpoint *tcp, state->state = statep; state_ref(state->state); state->accepting_pollset = accepting_pollset; - grpc_server_security_connector_do_handshake( - exec_ctx, state->state->sc, acceptor, tcp, - gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), - gpr_time_from_seconds(120, GPR_TIMESPAN)), - on_secure_handshake_done, state); + state->acceptor = acceptor; + state->handshake_mgr = grpc_handshake_manager_create(); + // TODO(roth): We should really get this timeout value from channel + // args instead of hard-coding it. + state->deadline = gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), + gpr_time_from_seconds(120, GPR_TIMESPAN)); + grpc_handshake_manager_do_handshake( + exec_ctx, state->handshake_mgr, tcp, + grpc_server_get_channel_args(state->state->server), state->deadline, + acceptor, on_handshake_done, state); } /* Server callback: start listening on our ports */ @@ -166,6 +191,7 @@ static void destroy(grpc_exec_ctx *exec_ctx, grpc_server *server, void *statep, state->destroy_callback = callback; tcp = state->tcp; gpr_mu_unlock(&state->mu); + grpc_tcp_server_shutdown_listeners(exec_ctx, tcp); grpc_tcp_server_unref(exec_ctx, tcp); } diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index a32d45c1c2..6bd65cea02 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -2005,6 +2005,7 @@ static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx, static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_chttp2_transport *t = arg; + grpc_error *err = GRPC_ERROR_NONE; GPR_TIMER_BEGIN("reading_action.parse", 0); size_t i = 0; grpc_error *errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE, @@ -2013,15 +2014,13 @@ static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg, errors[1] = grpc_chttp2_perform_read(exec_ctx, &t->parsing, t->read_buffer.slices[i]); }; - if (i != t->read_buffer.count) { + if (errors[1] == GRPC_ERROR_NONE) { + err = GRPC_ERROR_REF(error); + } else { errors[2] = try_http_parsing(exec_ctx, t); + err = GRPC_ERROR_CREATE_REFERENCING("Failed parsing HTTP/2", errors, + GPR_ARRAY_SIZE(errors)); } - grpc_error *err = - errors[0] == GRPC_ERROR_NONE && errors[1] == GRPC_ERROR_NONE && - errors[2] == GRPC_ERROR_NONE - ? GRPC_ERROR_NONE - : GRPC_ERROR_CREATE_REFERENCING("Failed parsing HTTP/2", errors, - GPR_ARRAY_SIZE(errors)); for (i = 0; i < GPR_ARRAY_SIZE(errors); i++) { GRPC_ERROR_UNREF(errors[i]); } |