diff options
author | Mark D. Roth <roth@google.com> | 2016-12-09 13:43:44 -0800 |
---|---|---|
committer | Mark D. Roth <roth@google.com> | 2016-12-09 13:43:44 -0800 |
commit | 389f272b49940a66e2f7876416c375754d67e571 (patch) | |
tree | acdbb750b05949012e629181e4534b2c736b2bac /src | |
parent | 39b1f913d4e4be5cbf3ba4aea989632a93cad3b7 (diff) | |
parent | a3727aa8d5a93b8d719190b13b6edea850d54af8 (diff) |
Merge remote-tracking branch 'upstream/master' into client_channel_init_cleanup
Diffstat (limited to 'src')
23 files changed, 193 insertions, 128 deletions
diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.c b/src/core/ext/transport/chttp2/client/chttp2_connector.c index 568b114d64..58a6877b9b 100644 --- a/src/core/ext/transport/chttp2/client/chttp2_connector.c +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.c @@ -59,8 +59,8 @@ typedef struct { bool connecting; char *server_name; - grpc_chttp2_create_handshakers_func create_handshakers; - void *create_handshakers_user_data; + grpc_chttp2_add_handshakers_func add_handshakers; + void *add_handshakers_user_data; grpc_closure *notify; grpc_connect_in_args args; @@ -160,9 +160,9 @@ static void start_handshake_locked(grpc_exec_ctx *exec_ctx, grpc_http_connect_handshaker_create(proxy_name, c->server_name)); gpr_free(proxy_name); } - if (c->create_handshakers != NULL) { - c->create_handshakers(exec_ctx, c->create_handshakers_user_data, - c->handshake_mgr); + if (c->add_handshakers != NULL) { + c->add_handshakers(exec_ctx, c->add_handshakers_user_data, + c->handshake_mgr); } grpc_handshake_manager_do_handshake( exec_ctx, c->handshake_mgr, c->endpoint, c->args.channel_args, @@ -255,15 +255,15 @@ static const grpc_connector_vtable chttp2_connector_vtable = { grpc_connector *grpc_chttp2_connector_create( grpc_exec_ctx *exec_ctx, const char *server_name, - grpc_chttp2_create_handshakers_func create_handshakers, - void *create_handshakers_user_data) { + grpc_chttp2_add_handshakers_func add_handshakers, + void *add_handshakers_user_data) { chttp2_connector *c = gpr_malloc(sizeof(*c)); memset(c, 0, sizeof(*c)); c->base.vtable = &chttp2_connector_vtable; gpr_mu_init(&c->mu); gpr_ref_init(&c->refs, 1); c->server_name = gpr_strdup(server_name); - c->create_handshakers = create_handshakers; - c->create_handshakers_user_data = create_handshakers_user_data; + c->add_handshakers = add_handshakers; + c->add_handshakers_user_data = add_handshakers_user_data; return &c->base; } diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.h b/src/core/ext/transport/chttp2/client/chttp2_connector.h index 6c34ce1af1..c57fb1a9a0 100644 --- a/src/core/ext/transport/chttp2/client/chttp2_connector.h +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.h @@ -38,15 +38,15 @@ #include "src/core/lib/channel/handshaker.h" #include "src/core/lib/iomgr/exec_ctx.h" -typedef void (*grpc_chttp2_create_handshakers_func)( +typedef void (*grpc_chttp2_add_handshakers_func)( grpc_exec_ctx* exec_ctx, void* user_data, grpc_handshake_manager* handshake_mgr); -/// If \a create_handshakers is non-NULL, it will be called with -/// \a create_handshakers_user_data to add handshakers. +/// If \a add_handshakers is non-NULL, it will be called with +/// \a add_handshakers_user_data to add handshakers. grpc_connector* grpc_chttp2_connector_create( grpc_exec_ctx* exec_ctx, const char* server_name, - grpc_chttp2_create_handshakers_func create_handshakers, - void* create_handshakers_user_data); + grpc_chttp2_add_handshakers_func add_handshakers, + void* add_handshakers_user_data); #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_CLIENT_CHTTP2_CONNECTOR_H */ diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c index 7325f9413d..3ad34b0870 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c @@ -54,7 +54,7 @@ static grpc_subchannel *client_channel_factory_create_subchannel( grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory, const grpc_subchannel_args *args) { grpc_connector *connector = grpc_chttp2_connector_create( - exec_ctx, args->server_name, NULL /* create_handshakers */, + exec_ctx, args->server_name, NULL /* add_handshakers */, NULL /* user_data */); grpc_subchannel *s = grpc_subchannel_create(exec_ctx, connector, args); grpc_connector_unref(exec_ctx, connector); diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c index 63f2488088..2bef684398 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c @@ -68,11 +68,10 @@ static void client_channel_factory_unref( } } -static void create_handshakers(grpc_exec_ctx *exec_ctx, - void *security_connector, - grpc_handshake_manager *handshake_mgr) { - grpc_channel_security_connector_create_handshakers( - exec_ctx, security_connector, handshake_mgr); +static void add_handshakers(grpc_exec_ctx *exec_ctx, void *security_connector, + grpc_handshake_manager *handshake_mgr) { + grpc_channel_security_connector_add_handshakers(exec_ctx, security_connector, + handshake_mgr); } static grpc_subchannel *client_channel_factory_create_subchannel( @@ -80,7 +79,7 @@ static grpc_subchannel *client_channel_factory_create_subchannel( const grpc_subchannel_args *args) { client_channel_factory *f = (client_channel_factory *)cc_factory; grpc_connector *connector = grpc_chttp2_connector_create( - exec_ctx, args->server_name, create_handshakers, f->security_connector); + exec_ctx, args->server_name, add_handshakers, f->security_connector); grpc_subchannel *s = grpc_subchannel_create(exec_ctx, connector, args); grpc_connector_unref(exec_ctx, connector); return s; diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.c b/src/core/ext/transport/chttp2/server/chttp2_server.c index 8ee7e29316..f0857714fc 100644 --- a/src/core/ext/transport/chttp2/server/chttp2_server.c +++ b/src/core/ext/transport/chttp2/server/chttp2_server.c @@ -53,13 +53,13 @@ #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/server.h" -void grpc_chttp2_server_handshaker_factory_create_handshakers( +void grpc_chttp2_server_handshaker_factory_add_handshakers( grpc_exec_ctx *exec_ctx, grpc_chttp2_server_handshaker_factory *handshaker_factory, grpc_handshake_manager *handshake_mgr) { if (handshaker_factory != NULL) { - handshaker_factory->vtable->create_handshakers(exec_ctx, handshaker_factory, - handshake_mgr); + handshaker_factory->vtable->add_handshakers(exec_ctx, handshaker_factory, + handshake_mgr); } } @@ -139,7 +139,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, const char *error_str = grpc_error_string(error); gpr_log(GPR_ERROR, "Handshaking failed: %s", error_str); grpc_error_free_string(error_str); - if (error == GRPC_ERROR_NONE) { + if (error == GRPC_ERROR_NONE && args->endpoint != NULL) { // We were shut down after handshaking completed successfully, so // destroy the endpoint here. // TODO(ctiller): It is currently necessary to shutdown endpoints @@ -153,19 +153,26 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, gpr_free(args->read_buffer); } } else { - grpc_transport *transport = - grpc_create_chttp2_transport(exec_ctx, args->args, args->endpoint, 0); - grpc_server_setup_transport( - exec_ctx, connection_state->server_state->server, transport, - connection_state->accepting_pollset, args->args); - grpc_chttp2_transport_start_reading(exec_ctx, transport, args->read_buffer); - grpc_channel_args_destroy(args->args); + // If the handshaking succeeded but there is no endpoint, then the + // handshaker may have handed off the connection to some external + // code, so we can just clean up here without creating a transport. + if (args->endpoint != NULL) { + grpc_transport *transport = + grpc_create_chttp2_transport(exec_ctx, args->args, args->endpoint, 0); + grpc_server_setup_transport( + exec_ctx, connection_state->server_state->server, transport, + connection_state->accepting_pollset, args->args); + grpc_chttp2_transport_start_reading(exec_ctx, transport, + args->read_buffer); + grpc_channel_args_destroy(args->args); + } } pending_handshake_manager_remove_locked(connection_state->server_state, connection_state->handshake_mgr); gpr_mu_unlock(&connection_state->server_state->mu); grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr); grpc_tcp_server_unref(exec_ctx, connection_state->server_state->tcp_server); + gpr_free(connection_state->acceptor); gpr_free(connection_state); } @@ -177,6 +184,7 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, if (state->shutdown) { gpr_mu_unlock(&state->mu); grpc_endpoint_destroy(exec_ctx, tcp); + gpr_free(acceptor); return; } grpc_handshake_manager *handshake_mgr = grpc_handshake_manager_create(); @@ -189,7 +197,7 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, connection_state->accepting_pollset = accepting_pollset; connection_state->acceptor = acceptor; connection_state->handshake_mgr = handshake_mgr; - grpc_chttp2_server_handshaker_factory_create_handshakers( + grpc_chttp2_server_handshaker_factory_add_handshakers( exec_ctx, state->handshaker_factory, connection_state->handshake_mgr); // TODO(roth): We should really get this timeout value from channel // args instead of hard-coding it. diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.h b/src/core/ext/transport/chttp2/server/chttp2_server.h index 3073399267..aa364b565d 100644 --- a/src/core/ext/transport/chttp2/server/chttp2_server.h +++ b/src/core/ext/transport/chttp2/server/chttp2_server.h @@ -45,7 +45,7 @@ typedef struct grpc_chttp2_server_handshaker_factory grpc_chttp2_server_handshaker_factory; typedef struct { - void (*create_handshakers)( + void (*add_handshakers)( grpc_exec_ctx *exec_ctx, grpc_chttp2_server_handshaker_factory *handshaker_factory, grpc_handshake_manager *handshake_mgr); @@ -57,7 +57,7 @@ struct grpc_chttp2_server_handshaker_factory { const grpc_chttp2_server_handshaker_factory_vtable *vtable; }; -void grpc_chttp2_server_handshaker_factory_create_handshakers( +void grpc_chttp2_server_handshaker_factory_add_handshakers( grpc_exec_ctx *exec_ctx, grpc_chttp2_server_handshaker_factory *handshaker_factory, grpc_handshake_manager *handshake_mgr); diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c index 85c21f0ca2..a33a7a3f7d 100644 --- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c +++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c @@ -54,12 +54,12 @@ typedef struct { grpc_server_security_connector *security_connector; } server_security_handshaker_factory; -static void server_security_handshaker_factory_create_handshakers( +static void server_security_handshaker_factory_add_handshakers( grpc_exec_ctx *exec_ctx, grpc_chttp2_server_handshaker_factory *hf, grpc_handshake_manager *handshake_mgr) { server_security_handshaker_factory *handshaker_factory = (server_security_handshaker_factory *)hf; - grpc_server_security_connector_create_handshakers( + grpc_server_security_connector_add_handshakers( exec_ctx, handshaker_factory->security_connector, handshake_mgr); } @@ -74,7 +74,7 @@ static void server_security_handshaker_factory_destroy( static const grpc_chttp2_server_handshaker_factory_vtable server_security_handshaker_factory_vtable = { - server_security_handshaker_factory_create_handshakers, + server_security_handshaker_factory_add_handshakers, server_security_handshaker_factory_destroy}; int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, diff --git a/src/core/lib/channel/handshaker.c b/src/core/lib/channel/handshaker.c index 90626dc2d1..23edc826ca 100644 --- a/src/core/lib/channel/handshaker.c +++ b/src/core/lib/channel/handshaker.c @@ -49,21 +49,21 @@ void grpc_handshaker_init(const grpc_handshaker_vtable* vtable, handshaker->vtable = vtable; } -static void grpc_handshaker_destroy(grpc_exec_ctx* exec_ctx, - grpc_handshaker* handshaker) { +void grpc_handshaker_destroy(grpc_exec_ctx* exec_ctx, + grpc_handshaker* handshaker) { handshaker->vtable->destroy(exec_ctx, handshaker); } -static void grpc_handshaker_shutdown(grpc_exec_ctx* exec_ctx, - grpc_handshaker* handshaker) { +void grpc_handshaker_shutdown(grpc_exec_ctx* exec_ctx, + grpc_handshaker* handshaker) { handshaker->vtable->shutdown(exec_ctx, handshaker); } -static void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx, - grpc_handshaker* handshaker, - grpc_tcp_server_acceptor* acceptor, - grpc_closure* on_handshake_done, - grpc_handshaker_args* args) { +void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx, + grpc_handshaker* handshaker, + grpc_tcp_server_acceptor* acceptor, + grpc_closure* on_handshake_done, + grpc_handshaker_args* args) { handshaker->vtable->do_handshake(exec_ctx, handshaker, acceptor, on_handshake_done, args); } @@ -157,10 +157,11 @@ static bool call_next_handshaker_locked(grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr, grpc_error* error) { GPR_ASSERT(mgr->index <= mgr->count); - // If we got an error or we've been shut down or we've finished the last - // handshaker, invoke the on_handshake_done callback. Otherwise, call the - // next handshaker. - if (error != GRPC_ERROR_NONE || mgr->shutdown || mgr->index == mgr->count) { + // If we got an error or we've been shut down or we're exiting early or + // we've finished the last handshaker, invoke the on_handshake_done + // callback. Otherwise, call the next handshaker. + if (error != GRPC_ERROR_NONE || mgr->shutdown || mgr->args.exit_early || + mgr->index == mgr->count) { // Cancel deadline timer, since we're invoking the on_handshake_done // callback now. grpc_timer_cancel(exec_ctx, &mgr->deadline_timer); diff --git a/src/core/lib/channel/handshaker.h b/src/core/lib/channel/handshaker.h index ebbc1ff7f3..450b7adaee 100644 --- a/src/core/lib/channel/handshaker.h +++ b/src/core/lib/channel/handshaker.h @@ -72,6 +72,9 @@ typedef struct { grpc_endpoint* endpoint; grpc_channel_args* args; grpc_slice_buffer* read_buffer; + // A handshaker may set this to true before invoking on_handshake_done + // to indicate that subsequent handshakers should be skipped. + bool exit_early; // User data passed through the handshake manager. Not used by // individual handshakers. void* user_data; @@ -105,6 +108,16 @@ struct grpc_handshaker { void grpc_handshaker_init(const grpc_handshaker_vtable* vtable, grpc_handshaker* handshaker); +void grpc_handshaker_destroy(grpc_exec_ctx* exec_ctx, + grpc_handshaker* handshaker); +void grpc_handshaker_shutdown(grpc_exec_ctx* exec_ctx, + grpc_handshaker* handshaker); +void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx, + grpc_handshaker* handshaker, + grpc_tcp_server_acceptor* acceptor, + grpc_closure* on_handshake_done, + grpc_handshaker_args* args); + /// /// grpc_handshake_manager /// diff --git a/src/core/lib/http/httpcli_security_connector.c b/src/core/lib/http/httpcli_security_connector.c index 0ab34d00e4..14cdb1dab3 100644 --- a/src/core/lib/http/httpcli_security_connector.c +++ b/src/core/lib/http/httpcli_security_connector.c @@ -60,9 +60,9 @@ static void httpcli_ssl_destroy(grpc_security_connector *sc) { gpr_free(sc); } -static void httpcli_ssl_create_handshakers( - grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *sc, - grpc_handshake_manager *handshake_mgr) { +static void httpcli_ssl_add_handshakers(grpc_exec_ctx *exec_ctx, + grpc_channel_security_connector *sc, + grpc_handshake_manager *handshake_mgr) { grpc_httpcli_ssl_channel_security_connector *c = (grpc_httpcli_ssl_channel_security_connector *)sc; tsi_handshaker *handshaker = NULL; @@ -74,8 +74,9 @@ static void httpcli_ssl_create_handshakers( tsi_result_to_string(result)); } } - grpc_security_create_handshakers(exec_ctx, handshaker, &sc->base, - handshake_mgr); + grpc_handshake_manager_add( + handshake_mgr, + grpc_security_handshaker_create(exec_ctx, handshaker, &sc->base)); } static void httpcli_ssl_check_peer(grpc_exec_ctx *exec_ctx, @@ -132,7 +133,7 @@ static grpc_security_status httpcli_ssl_channel_security_connector_create( *sc = NULL; return GRPC_SECURITY_ERROR; } - c->base.create_handshakers = httpcli_ssl_create_handshakers; + c->base.add_handshakers = httpcli_ssl_add_handshakers; *sc = &c->base; return GRPC_SECURITY_OK; } @@ -185,8 +186,8 @@ static void ssl_handshake(grpc_exec_ctx *exec_ctx, void *arg, GPR_ASSERT(httpcli_ssl_channel_security_connector_create( pem_root_certs, pem_root_certs_size, host, &sc) == GRPC_SECURITY_OK); - grpc_channel_security_connector_create_handshakers(exec_ctx, sc, - c->handshake_mgr); + grpc_channel_security_connector_add_handshakers(exec_ctx, sc, + c->handshake_mgr); grpc_handshake_manager_do_handshake( exec_ctx, c->handshake_mgr, tcp, NULL /* channel_args */, deadline, NULL /* acceptor */, on_handshake_done, c /* user_data */); diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index fd80779519..540305e4fa 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -373,6 +373,11 @@ static bool tcp_flush(grpc_tcp *tcp, grpc_error **error) { tcp->outgoing_slice_idx = unwind_slice_idx; tcp->outgoing_byte_idx = unwind_byte_idx; return false; + } else if (errno == EPIPE) { + *error = grpc_error_set_int(GRPC_OS_ERROR(errno, "sendmsg"), + GRPC_ERROR_INT_GRPC_STATUS, + GRPC_STATUS_UNAVAILABLE); + return true; } else { *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp); return true; diff --git a/src/core/lib/iomgr/tcp_server.h b/src/core/lib/iomgr/tcp_server.h index 6eba8c4057..437a94beff 100644 --- a/src/core/lib/iomgr/tcp_server.h +++ b/src/core/lib/iomgr/tcp_server.h @@ -52,7 +52,8 @@ typedef struct grpc_tcp_server_acceptor { unsigned fd_index; } grpc_tcp_server_acceptor; -/* Called for newly connected TCP connections. */ +/* Called for newly connected TCP connections. + Takes ownership of acceptor. */ typedef void (*grpc_tcp_server_cb)(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *ep, grpc_pollset *accepting_pollset, diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c index 7e2fb0f1f9..179f47ef76 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.c @@ -381,16 +381,12 @@ error: /* event manager callback when reads are ready */ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) { grpc_tcp_listener *sp = arg; - grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index, - sp->fd_index}; - grpc_pollset *read_notifier_pollset = NULL; - grpc_fd *fdobj; if (err != GRPC_ERROR_NONE) { goto error; } - read_notifier_pollset = + grpc_pollset *read_notifier_pollset = sp->server->pollsets[(size_t)gpr_atm_no_barrier_fetch_add( &sp->server->next_pollset_to_assign, 1) % sp->server->pollset_count]; @@ -426,7 +422,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) { gpr_log(GPR_DEBUG, "SERVER_CONNECT: incoming connection: %s", addr_str); } - fdobj = grpc_fd_create(fd, name); + grpc_fd *fdobj = grpc_fd_create(fd, name); if (read_notifier_pollset == NULL) { gpr_log(GPR_ERROR, "Read notifier pollset is not set on the fd"); @@ -435,11 +431,17 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) { grpc_pollset_add_fd(exec_ctx, read_notifier_pollset, fdobj); + // Create acceptor. + grpc_tcp_server_acceptor *acceptor = gpr_malloc(sizeof(*acceptor)); + acceptor->from_server = sp->server; + acceptor->port_index = sp->port_index; + acceptor->fd_index = sp->fd_index; + sp->server->on_accept_cb( exec_ctx, sp->server->on_accept_cb_arg, grpc_tcp_create(fdobj, sp->server->resource_quota, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str), - read_notifier_pollset, &acceptor); + read_notifier_pollset, acceptor); gpr_free(name); gpr_free(addr_str); diff --git a/src/core/lib/iomgr/tcp_server_uv.c b/src/core/lib/iomgr/tcp_server_uv.c index b5b9b92a20..e1a174cfa2 100644 --- a/src/core/lib/iomgr/tcp_server_uv.c +++ b/src/core/lib/iomgr/tcp_server_uv.c @@ -188,7 +188,6 @@ static void accepted_connection_close_cb(uv_handle_t *handle) { static void on_connect(uv_stream_t *server, int status) { grpc_tcp_listener *sp = (grpc_tcp_listener *)server->data; - grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index, 0}; uv_tcp_t *client; grpc_endpoint *ep = NULL; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; @@ -201,6 +200,7 @@ static void on_connect(uv_stream_t *server, int status) { uv_strerror(status)); return; } + client = gpr_malloc(sizeof(uv_tcp_t)); uv_tcp_init(uv_default_loop(), client); // UV documentation says this is guaranteed to succeed @@ -220,8 +220,13 @@ static void on_connect(uv_stream_t *server, int status) { gpr_log(GPR_INFO, "uv_tcp_getpeername error: %s", uv_strerror(status)); } ep = grpc_tcp_create(client, sp->server->resource_quota, peer_name_string); + // Create acceptor. + grpc_tcp_server_acceptor *acceptor = gpr_malloc(sizeof(*acceptor)); + acceptor->from_server = sp->server; + acceptor->port_index = sp->port_index; + acceptor->fd_index = 0; sp->server->on_accept_cb(&exec_ctx, sp->server->on_accept_cb_arg, ep, NULL, - &acceptor); + acceptor); grpc_exec_ctx_finish(&exec_ctx); } } diff --git a/src/core/lib/iomgr/tcp_server_windows.c b/src/core/lib/iomgr/tcp_server_windows.c index b8a391c059..b0c8586bac 100644 --- a/src/core/lib/iomgr/tcp_server_windows.c +++ b/src/core/lib/iomgr/tcp_server_windows.c @@ -323,7 +323,6 @@ failure: /* Event manager callback when reads are ready. */ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_tcp_listener *sp = arg; - grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index, 0}; SOCKET sock = sp->new_socket; grpc_winsocket_callback_info *info = &sp->socket->read_info; grpc_endpoint *ep = NULL; @@ -396,8 +395,13 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { /* The only time we should call our callback, is where we successfully managed to accept a connection, and created an endpoint. */ if (ep) { + // Create acceptor. + grpc_tcp_server_acceptor *acceptor = gpr_malloc(sizeof(*acceptor)); + acceptor->from_server = sp->server; + acceptor->port_index = sp->port_index; + acceptor->fd_index = 0; sp->server->on_accept_cb(exec_ctx, sp->server->on_accept_cb_arg, ep, NULL, - &acceptor); + acceptor); } /* As we were notified from the IOCP of one and exactly one accept, the former socked we created has now either been destroy or assigned diff --git a/src/core/lib/security/transport/security_connector.c b/src/core/lib/security/transport/security_connector.c index a2e0c7c7c7..5b088aa58d 100644 --- a/src/core/lib/security/transport/security_connector.c +++ b/src/core/lib/security/transport/security_connector.c @@ -43,6 +43,7 @@ #include <grpc/support/string_util.h> #include "src/core/ext/transport/chttp2/alpn/alpn.h" +#include "src/core/lib/channel/handshaker.h" #include "src/core/lib/iomgr/load_file.h" #include "src/core/lib/security/context/security_context.h" #include "src/core/lib/security/credentials/credentials.h" @@ -111,19 +112,19 @@ const tsi_peer_property *tsi_peer_get_property_by_name(const tsi_peer *peer, return NULL; } -void grpc_channel_security_connector_create_handshakers( +void grpc_channel_security_connector_add_handshakers( grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *connector, grpc_handshake_manager *handshake_mgr) { if (connector != NULL) { - connector->create_handshakers(exec_ctx, connector, handshake_mgr); + connector->add_handshakers(exec_ctx, connector, handshake_mgr); } } -void grpc_server_security_connector_create_handshakers( +void grpc_server_security_connector_add_handshakers( grpc_exec_ctx *exec_ctx, grpc_server_security_connector *connector, grpc_handshake_manager *handshake_mgr) { if (connector != NULL) { - connector->create_handshakers(exec_ctx, connector, handshake_mgr); + connector->add_handshakers(exec_ctx, connector, handshake_mgr); } } @@ -285,20 +286,24 @@ static void fake_channel_check_call_host(grpc_exec_ctx *exec_ctx, cb(exec_ctx, user_data, GRPC_SECURITY_OK); } -static void fake_channel_create_handshakers( +static void fake_channel_add_handshakers( grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *sc, grpc_handshake_manager *handshake_mgr) { - grpc_security_create_handshakers( - exec_ctx, tsi_create_fake_handshaker(true /* is_client */), &sc->base, - handshake_mgr); + grpc_handshake_manager_add( + handshake_mgr, + grpc_security_handshaker_create( + exec_ctx, tsi_create_fake_handshaker(true /* is_client */), + &sc->base)); } -static void fake_server_create_handshakers( - grpc_exec_ctx *exec_ctx, grpc_server_security_connector *sc, - grpc_handshake_manager *handshake_mgr) { - grpc_security_create_handshakers( - exec_ctx, tsi_create_fake_handshaker(false /* is_client */), &sc->base, - handshake_mgr); +static void fake_server_add_handshakers(grpc_exec_ctx *exec_ctx, + grpc_server_security_connector *sc, + grpc_handshake_manager *handshake_mgr) { + grpc_handshake_manager_add( + handshake_mgr, + grpc_security_handshaker_create( + exec_ctx, tsi_create_fake_handshaker(false /* is_client */), + &sc->base)); } static grpc_security_connector_vtable fake_channel_vtable = { @@ -316,7 +321,7 @@ grpc_channel_security_connector *grpc_fake_channel_security_connector_create( c->base.vtable = &fake_channel_vtable; c->request_metadata_creds = grpc_call_credentials_ref(request_metadata_creds); c->check_call_host = fake_channel_check_call_host; - c->create_handshakers = fake_channel_create_handshakers; + c->add_handshakers = fake_channel_add_handshakers; return c; } @@ -328,7 +333,7 @@ grpc_server_security_connector *grpc_fake_server_security_connector_create( gpr_ref_init(&c->base.refcount, 1); c->base.vtable = &fake_server_vtable; c->base.url_scheme = GRPC_FAKE_SECURITY_URL_SCHEME; - c->create_handshakers = fake_server_create_handshakers; + c->add_handshakers = fake_server_add_handshakers; return c; } @@ -382,9 +387,9 @@ static grpc_security_status ssl_create_handshaker( return GRPC_SECURITY_OK; } -static void ssl_channel_create_handshakers( - grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *sc, - grpc_handshake_manager *handshake_mgr) { +static void ssl_channel_add_handshakers(grpc_exec_ctx *exec_ctx, + grpc_channel_security_connector *sc, + grpc_handshake_manager *handshake_mgr) { grpc_ssl_channel_security_connector *c = (grpc_ssl_channel_security_connector *)sc; // Instantiate TSI handshaker. @@ -395,12 +400,13 @@ static void ssl_channel_create_handshakers( : c->target_name, &tsi_hs); // Create handshakers. - grpc_security_create_handshakers(exec_ctx, tsi_hs, &sc->base, handshake_mgr); + grpc_handshake_manager_add(handshake_mgr, grpc_security_handshaker_create( + exec_ctx, tsi_hs, &sc->base)); } -static void ssl_server_create_handshakers( - grpc_exec_ctx *exec_ctx, grpc_server_security_connector *sc, - grpc_handshake_manager *handshake_mgr) { +static void ssl_server_add_handshakers(grpc_exec_ctx *exec_ctx, + grpc_server_security_connector *sc, + grpc_handshake_manager *handshake_mgr) { grpc_ssl_server_security_connector *c = (grpc_ssl_server_security_connector *)sc; // Instantiate TSI handshaker. @@ -408,7 +414,8 @@ static void ssl_server_create_handshakers( ssl_create_handshaker(c->handshaker_factory, false /* is_client */, NULL /* peer_name */, &tsi_hs); // Create handshakers. - grpc_security_create_handshakers(exec_ctx, tsi_hs, &sc->base, handshake_mgr); + grpc_handshake_manager_add(handshake_mgr, grpc_security_handshaker_create( + exec_ctx, tsi_hs, &sc->base)); } static int ssl_host_matches_name(const tsi_peer *peer, const char *peer_name) { @@ -708,7 +715,7 @@ grpc_security_status grpc_ssl_channel_security_connector_create( c->base.request_metadata_creds = grpc_call_credentials_ref(request_metadata_creds); c->base.check_call_host = ssl_channel_check_call_host; - c->base.create_handshakers = ssl_channel_create_handshakers; + c->base.add_handshakers = ssl_channel_add_handshakers; gpr_split_host_port(target_name, &c->target_name, &port); gpr_free(port); if (overridden_target_name != NULL) { @@ -783,7 +790,7 @@ grpc_security_status grpc_ssl_server_security_connector_create( *sc = NULL; goto error; } - c->base.create_handshakers = ssl_server_create_handshakers; + c->base.add_handshakers = ssl_server_add_handshakers; *sc = &c->base; gpr_free((void *)alpn_protocol_strings); gpr_free(alpn_protocol_string_lengths); diff --git a/src/core/lib/security/transport/security_connector.h b/src/core/lib/security/transport/security_connector.h index 696db0e02e..a84b359051 100644 --- a/src/core/lib/security/transport/security_connector.h +++ b/src/core/lib/security/transport/security_connector.h @@ -98,7 +98,7 @@ void grpc_security_connector_unref(grpc_security_connector *policy); #endif /* Check the peer. Callee takes ownership of the peer object. - Sets *auth_context and invokes on_peer_checked when done. */ + When done, sets *auth_context and invokes on_peer_checked. */ void grpc_security_connector_check_peer(grpc_exec_ctx *exec_ctx, grpc_security_connector *sc, tsi_peer peer, @@ -133,9 +133,9 @@ struct grpc_channel_security_connector { grpc_channel_security_connector *sc, const char *host, grpc_auth_context *auth_context, grpc_security_call_host_check_cb cb, void *user_data); - void (*create_handshakers)(grpc_exec_ctx *exec_ctx, - grpc_channel_security_connector *sc, - grpc_handshake_manager *handshake_mgr); + void (*add_handshakers)(grpc_exec_ctx *exec_ctx, + grpc_channel_security_connector *sc, + grpc_handshake_manager *handshake_mgr); }; /* Checks that the host that will be set for a call is acceptable. */ @@ -145,7 +145,7 @@ void grpc_channel_security_connector_check_call_host( grpc_security_call_host_check_cb cb, void *user_data); /* Registers handshakers with \a handshake_mgr. */ -void grpc_channel_security_connector_create_handshakers( +void grpc_channel_security_connector_add_handshakers( grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *connector, grpc_handshake_manager *handshake_mgr); @@ -158,12 +158,12 @@ typedef struct grpc_server_security_connector grpc_server_security_connector; struct grpc_server_security_connector { grpc_security_connector base; - void (*create_handshakers)(grpc_exec_ctx *exec_ctx, - grpc_server_security_connector *sc, - grpc_handshake_manager *handshake_mgr); + void (*add_handshakers)(grpc_exec_ctx *exec_ctx, + grpc_server_security_connector *sc, + grpc_handshake_manager *handshake_mgr); }; -void grpc_server_security_connector_create_handshakers( +void grpc_server_security_connector_add_handshakers( grpc_exec_ctx *exec_ctx, grpc_server_security_connector *sc, grpc_handshake_manager *handshake_mgr); diff --git a/src/core/lib/security/transport/security_handshaker.c b/src/core/lib/security/transport/security_handshaker.c index fc01bec2f2..41a775db85 100644 --- a/src/core/lib/security/transport/security_handshaker.c +++ b/src/core/lib/security/transport/security_handshaker.c @@ -131,6 +131,9 @@ static void security_handshake_failed_locked(grpc_exec_ctx *exec_ctx, // Not shutting down, so the write failed. Clean up before // invoking the callback. cleanup_args_for_failure_locked(h); + // Set shutdown to true so that subsequent calls to + // security_handshaker_shutdown() do nothing. + h->shutdown = true; } // Invoke callback. grpc_exec_ctx_sched(exec_ctx, h->on_handshake_done, error, NULL); @@ -434,17 +437,14 @@ static grpc_handshaker *fail_handshaker_create() { // exported functions // -void grpc_security_create_handshakers(grpc_exec_ctx *exec_ctx, - tsi_handshaker *handshaker, - grpc_security_connector *connector, - grpc_handshake_manager *handshake_mgr) { - // If no TSI handshaker was created, add a handshaker that always fails. - // Otherwise, add a real security handshaker. +grpc_handshaker *grpc_security_handshaker_create( + grpc_exec_ctx *exec_ctx, tsi_handshaker *handshaker, + grpc_security_connector *connector) { + // If no TSI handshaker was created, return a handshaker that always fails. + // Otherwise, return a real security handshaker. if (handshaker == NULL) { - grpc_handshake_manager_add(handshake_mgr, fail_handshaker_create()); + return fail_handshaker_create(); } else { - grpc_handshake_manager_add( - handshake_mgr, - security_handshaker_create(exec_ctx, handshaker, connector)); + return security_handshaker_create(exec_ctx, handshaker, connector); } } diff --git a/src/core/lib/security/transport/security_handshaker.h b/src/core/lib/security/transport/security_handshaker.h index f71f43a359..5ddbf4b451 100644 --- a/src/core/lib/security/transport/security_handshaker.h +++ b/src/core/lib/security/transport/security_handshaker.h @@ -34,14 +34,13 @@ #ifndef GRPC_CORE_LIB_SECURITY_TRANSPORT_SECURITY_HANDSHAKER_H #define GRPC_CORE_LIB_SECURITY_TRANSPORT_SECURITY_HANDSHAKER_H -#include "src/core/lib/iomgr/endpoint.h" +#include "src/core/lib/channel/handshaker.h" +#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/security/transport/security_connector.h" -/// Creates any necessary security handshakers and adds them to -/// \a handshake_mgr. -void grpc_security_create_handshakers(grpc_exec_ctx *exec_ctx, - tsi_handshaker *handshaker, - grpc_security_connector *connector, - grpc_handshake_manager *handshake_mgr); +/// Creates a security handshaker using \a handshaker. +grpc_handshaker *grpc_security_handshaker_create( + grpc_exec_ctx *exec_ctx, tsi_handshaker *handshaker, + grpc_security_connector *connector); #endif /* GRPC_CORE_LIB_SECURITY_TRANSPORT_SECURITY_HANDSHAKER_H */ diff --git a/src/core/lib/support/string.h b/src/core/lib/support/string.h index e933e2eb46..db59308425 100644 --- a/src/core/lib/support/string.h +++ b/src/core/lib/support/string.h @@ -36,8 +36,6 @@ #include <stddef.h> -#include <grpc/slice.h> -#include <grpc/slice_buffer.h> #include <grpc/support/port_platform.h> #ifdef __cplusplus diff --git a/src/core/lib/support/tmpfile.h b/src/core/lib/support/tmpfile.h index 8952e5ec3d..f613cf9bc8 100644 --- a/src/core/lib/support/tmpfile.h +++ b/src/core/lib/support/tmpfile.h @@ -36,8 +36,6 @@ #include <stdio.h> -#include <grpc/slice.h> - #ifdef __cplusplus extern "C" { #endif diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py index 6087276d51..1c8508bbde 100644 --- a/src/python/grpcio/grpc/__init__.py +++ b/src/python/grpcio/grpc/__init__.py @@ -849,6 +849,26 @@ class GenericRpcHandler(six.with_metaclass(abc.ABCMeta)): raise NotImplementedError() +class ServiceRpcHandler(six.with_metaclass(abc.ABCMeta, GenericRpcHandler)): + """An implementation of RPC methods belonging to a service. + + A service handles RPC methods with structured names of the form + '/Service.Name/Service.MethodX', where 'Service.Name' is the value + returned by service_name(), and 'Service.MethodX' is the service method + name. A service can have multiple service methods names, but only a single + service name. + """ + + @abc.abstractmethod + def service_name(self): + """Returns this services name. + + Returns: + The service name. + """ + raise NotImplementedError() + + ############################# Server Interface ############################### diff --git a/src/python/grpcio/grpc/_utilities.py b/src/python/grpcio/grpc/_utilities.py index 4850967fbc..a375896e6e 100644 --- a/src/python/grpcio/grpc/_utilities.py +++ b/src/python/grpcio/grpc/_utilities.py @@ -53,13 +53,17 @@ class RpcMethodHandler( pass -class DictionaryGenericHandler(grpc.GenericRpcHandler): +class DictionaryGenericHandler(grpc.ServiceRpcHandler): def __init__(self, service, method_handlers): + self._name = service self._method_handlers = { _common.fully_qualified_method(service, method): method_handler for method, method_handler in six.iteritems(method_handlers)} + def service_name(self): + return self._name + def service(self, handler_call_details): return self._method_handlers.get(handler_call_details.method) |