diff options
Diffstat (limited to 'src/core')
45 files changed, 1400 insertions, 467 deletions
diff --git a/src/core/ext/client_config/channel_connectivity.c b/src/core/ext/client_config/channel_connectivity.c index c1220e3a8c..ce3c13a4ee 100644 --- a/src/core/ext/client_config/channel_connectivity.c +++ b/src/core/ext/client_config/channel_connectivity.c @@ -59,7 +59,7 @@ grpc_connectivity_state grpc_channel_check_connectivity_state( } gpr_log(GPR_ERROR, "grpc_channel_check_connectivity_state called on something that is " - "not a (u)client channel, but '%s'", + "not a client channel, but '%s'", client_channel_elem->filter->name); grpc_exec_ctx_finish(&exec_ctx); return GRPC_CHANNEL_SHUTDOWN; diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c index 85f9efb3b6..6f6855584a 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c @@ -45,6 +45,7 @@ #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/compress_filter.h" +#include "src/core/lib/channel/handshaker.h" #include "src/core/lib/channel/http_client_filter.h" #include "src/core/lib/iomgr/tcp_client.h" #include "src/core/lib/surface/api_trace.h" @@ -63,6 +64,8 @@ typedef struct { grpc_endpoint *tcp; grpc_closure connected; + + grpc_handshake_manager *handshake_mgr; } connector; static void connector_ref(grpc_connector *con) { @@ -74,6 +77,7 @@ static void connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *con) { connector *c = (connector *)con; if (gpr_unref(&c->refs)) { /* c->initial_string_buffer does not need to be destroyed */ + grpc_handshake_manager_destroy(exec_ctx, c->handshake_mgr); gpr_free(c); } } @@ -83,9 +87,22 @@ static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg, connector_unref(exec_ctx, arg); } +static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, + grpc_channel_args *args, void *user_data, + grpc_error *error) { + connector *c = user_data; + c->result->transport = + grpc_create_chttp2_transport(exec_ctx, args, endpoint, 1); + GPR_ASSERT(c->result->transport); + grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, NULL, 0); + c->result->channel_args = args; + grpc_closure *notify = c->notify; + c->notify = NULL; + grpc_exec_ctx_sched(exec_ctx, notify, error, NULL); +} + static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { connector *c = arg; - grpc_closure *notify; grpc_endpoint *tcp = c->tcp; if (tcp != NULL) { if (!GPR_SLICE_IS_EMPTY(c->args.initial_connect_string)) { @@ -97,19 +114,17 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { connector_ref(arg); grpc_endpoint_write(exec_ctx, tcp, &c->initial_string_buffer, &c->initial_string_sent); + } else { + grpc_handshake_manager_do_handshake( + exec_ctx, c->handshake_mgr, tcp, c->args.channel_args, + c->args.deadline, NULL /* acceptor */, on_handshake_done, c); } - c->result->transport = - grpc_create_chttp2_transport(exec_ctx, c->args.channel_args, tcp, 1); - grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, NULL, - 0); - GPR_ASSERT(c->result->transport); - c->result->channel_args = grpc_channel_args_copy(c->args.channel_args); } else { memset(c->result, 0, sizeof(*c->result)); + grpc_closure *notify = c->notify; + c->notify = NULL; + grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(error), NULL); } - notify = c->notify; - c->notify = NULL; - grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(error), NULL); } static void connector_shutdown(grpc_exec_ctx *exec_ctx, grpc_connector *con) {} @@ -171,6 +186,7 @@ static grpc_subchannel *client_channel_factory_create_subchannel( memset(c, 0, sizeof(*c)); c->base.vtable = &connector_vtable; gpr_ref_init(&c->refs, 1); + c->handshake_mgr = grpc_handshake_manager_create(); args->args = final_args; s = grpc_subchannel_create(exec_ctx, &c->base, args); grpc_connector_unref(exec_ctx, &c->base); diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c index 721ba82d8f..4e33b6fa61 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c @@ -44,6 +44,7 @@ #include "src/core/ext/client_config/resolver_registry.h" #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/handshaker.h" #include "src/core/lib/iomgr/tcp_client.h" #include "src/core/lib/security/context/security_context.h" #include "src/core/lib/security/credentials/credentials.h" @@ -69,6 +70,11 @@ typedef struct { grpc_endpoint *newly_connecting_endpoint; grpc_closure connected_closure; + + grpc_handshake_manager *handshake_mgr; + + // TODO(roth): Remove once we eliminate on_secure_handshake_done(). + grpc_channel_args *tmp_args; } connector; static void connector_ref(grpc_connector *con) { @@ -80,6 +86,8 @@ static void connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *con) { connector *c = (connector *)con; if (gpr_unref(&c->refs)) { /* c->initial_string_buffer does not need to be destroyed */ + grpc_channel_args_destroy(c->tmp_args); + grpc_handshake_manager_destroy(exec_ctx, c->handshake_mgr); gpr_free(c); } } @@ -89,13 +97,14 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *secure_endpoint, grpc_auth_context *auth_context) { connector *c = arg; - grpc_closure *notify; gpr_mu_lock(&c->mu); + grpc_error *error = GRPC_ERROR_NONE; if (c->connecting_endpoint == NULL) { memset(c->result, 0, sizeof(*c->result)); gpr_mu_unlock(&c->mu); } else if (status != GRPC_SECURITY_OK) { - gpr_log(GPR_ERROR, "Secure handshake failed with error %d.", status); + error = grpc_error_set_int(GRPC_ERROR_CREATE("Secure handshake failed"), + GRPC_ERROR_INT_SECURITY_STATUS, status); memset(c->result, 0, sizeof(*c->result)); c->connecting_endpoint = NULL; gpr_mu_unlock(&c->mu); @@ -108,25 +117,43 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, NULL, 0); auth_context_arg = grpc_auth_context_to_arg(auth_context); - c->result->channel_args = grpc_channel_args_copy_and_add( - c->args.channel_args, &auth_context_arg, 1); + c->result->channel_args = + grpc_channel_args_copy_and_add(c->tmp_args, &auth_context_arg, 1); } - notify = c->notify; + grpc_closure *notify = c->notify; c->notify = NULL; - grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, notify, error, NULL); +} + +static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, + grpc_channel_args *args, void *user_data, + grpc_error *error) { + connector *c = user_data; + if (error != GRPC_ERROR_NONE) { + grpc_closure *notify = c->notify; + c->notify = NULL; + grpc_exec_ctx_sched(exec_ctx, notify, error, NULL); + } else { + // TODO(roth, jboeuf): Convert security connector handshaking to use new + // handshake API, and then move the code from on_secure_handshake_done() + // into this function. + c->tmp_args = args; + grpc_channel_security_connector_do_handshake( + exec_ctx, c->security_connector, endpoint, c->args.deadline, + on_secure_handshake_done, c); + } } static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { connector *c = arg; - grpc_channel_security_connector_do_handshake( - exec_ctx, c->security_connector, c->connecting_endpoint, c->args.deadline, - on_secure_handshake_done, c); + grpc_handshake_manager_do_handshake( + exec_ctx, c->handshake_mgr, c->connecting_endpoint, c->args.channel_args, + c->args.deadline, NULL /* acceptor */, on_handshake_done, c); } static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { connector *c = arg; - grpc_closure *notify; grpc_endpoint *tcp = c->newly_connecting_endpoint; if (tcp != NULL) { gpr_mu_lock(&c->mu); @@ -142,13 +169,13 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_endpoint_write(exec_ctx, tcp, &c->initial_string_buffer, &c->initial_string_sent); } else { - grpc_channel_security_connector_do_handshake( - exec_ctx, c->security_connector, tcp, c->args.deadline, - on_secure_handshake_done, c); + grpc_handshake_manager_do_handshake( + exec_ctx, c->handshake_mgr, tcp, c->args.channel_args, + c->args.deadline, NULL /* acceptor */, on_handshake_done, c); } } else { memset(c->result, 0, sizeof(*c->result)); - notify = c->notify; + grpc_closure *notify = c->notify; c->notify = NULL; grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(error), NULL); } @@ -227,6 +254,7 @@ static grpc_subchannel *client_channel_factory_create_subchannel( memset(c, 0, sizeof(*c)); c->base.vtable = &connector_vtable; c->security_connector = f->security_connector; + c->handshake_mgr = grpc_handshake_manager_create(); gpr_mu_init(&c->mu); gpr_ref_init(&c->refs, 1); args->args = final_args; diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c index e5c987925c..9cd374777e 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c @@ -37,35 +37,74 @@ #include <grpc/support/log.h> #include <grpc/support/string_util.h> #include <grpc/support/useful.h> + #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/handshaker.h" #include "src/core/lib/channel/http_server_filter.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/tcp_server.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/server.h" -static void new_transport(grpc_exec_ctx *exec_ctx, void *server, - grpc_endpoint *tcp, grpc_pollset *accepting_pollset, - grpc_tcp_server_acceptor *acceptor) { - /* - * Beware that the call to grpc_create_chttp2_transport() has to happen before - * grpc_tcp_server_destroy(). This is fine here, but similar code - * asynchronously doing a handshake instead of calling grpc_tcp_server_start() - * (as in server_secure_chttp2.c) needs to add synchronization to avoid this - * case. - */ - grpc_transport *transport = grpc_create_chttp2_transport( - exec_ctx, grpc_server_get_channel_args(server), tcp, 0); - grpc_server_setup_transport(exec_ctx, server, transport, accepting_pollset, - grpc_server_get_channel_args(server)); - grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL, 0); +typedef struct server_connect_state { + grpc_server *server; + grpc_pollset *accepting_pollset; + grpc_tcp_server_acceptor *acceptor; + grpc_handshake_manager *handshake_mgr; +} server_connect_state; + +static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, + grpc_channel_args *args, void *user_data, + grpc_error *error) { + server_connect_state *state = user_data; + if (error != GRPC_ERROR_NONE) { + const char *error_str = grpc_error_string(error); + gpr_log(GPR_ERROR, "Handshaking failed: %s", error_str); + grpc_error_free_string(error_str); + GRPC_ERROR_UNREF(error); + grpc_handshake_manager_shutdown(exec_ctx, state->handshake_mgr); + } else { + // Beware that the call to grpc_create_chttp2_transport() has to happen + // before grpc_tcp_server_destroy(). This is fine here, but similar code + // asynchronously doing a handshake instead of calling + // grpc_tcp_server_start() (as in server_secure_chttp2.c) needs to add + // synchronization to avoid this case. + grpc_transport *transport = + grpc_create_chttp2_transport(exec_ctx, args, endpoint, 0); + grpc_server_setup_transport(exec_ctx, state->server, transport, + state->accepting_pollset, + grpc_server_get_channel_args(state->server)); + grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL, 0); + } + // Clean up. + grpc_channel_args_destroy(args); + grpc_handshake_manager_destroy(exec_ctx, state->handshake_mgr); + gpr_free(state); +} + +static void on_accept(grpc_exec_ctx *exec_ctx, void *server, grpc_endpoint *tcp, + grpc_pollset *accepting_pollset, + grpc_tcp_server_acceptor *acceptor) { + server_connect_state *state = gpr_malloc(sizeof(server_connect_state)); + state->server = server; + state->accepting_pollset = accepting_pollset; + state->acceptor = acceptor; + state->handshake_mgr = grpc_handshake_manager_create(); + // TODO(roth): We should really get this timeout value from channel + // args instead of hard-coding it. + const gpr_timespec deadline = gpr_time_add( + gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(120, GPR_TIMESPAN)); + grpc_handshake_manager_do_handshake( + exec_ctx, state->handshake_mgr, tcp, grpc_server_get_channel_args(server), + deadline, acceptor, on_handshake_done, state); } /* Server callback: start listening on our ports */ static void start(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp, grpc_pollset **pollsets, size_t pollset_count) { grpc_tcp_server *tcp = tcpp; - grpc_tcp_server_start(exec_ctx, tcp, pollsets, pollset_count, new_transport, + grpc_tcp_server_start(exec_ctx, tcp, pollsets, pollset_count, on_accept, server); } @@ -74,6 +113,7 @@ static void start(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp, static void destroy(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp, grpc_closure *destroy_done) { grpc_tcp_server *tcp = tcpp; + grpc_tcp_server_shutdown_listeners(exec_ctx, tcp); grpc_tcp_server_unref(exec_ctx, tcp); grpc_exec_ctx_sched(exec_ctx, destroy_done, GRPC_ERROR_NONE, NULL); } diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c index c42810e913..ccea15a648 100644 --- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c +++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c @@ -42,6 +42,7 @@ #include <grpc/support/useful.h> #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/handshaker.h" #include "src/core/lib/channel/http_server_filter.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/resolve_address.h" @@ -58,7 +59,7 @@ typedef struct server_secure_state { grpc_tcp_server *tcp; grpc_server_security_connector *sc; grpc_server_credentials *creds; - int is_shutdown; + bool is_shutdown; gpr_mu mu; gpr_refcount refcount; grpc_closure destroy_closure; @@ -68,6 +69,12 @@ typedef struct server_secure_state { typedef struct server_secure_connect { server_secure_state *state; grpc_pollset *accepting_pollset; + grpc_tcp_server_acceptor *acceptor; + grpc_handshake_manager *handshake_mgr; + // TODO(roth): Remove the following two fields when we eliminate + // grpc_server_security_connector_do_handshake(). + gpr_timespec deadline; + grpc_channel_args *args; } server_secure_connect; static void state_ref(server_secure_state *state) { gpr_ref(&state->refcount); } @@ -89,21 +96,18 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *statep, grpc_endpoint *secure_endpoint, grpc_auth_context *auth_context) { server_secure_connect *state = statep; - grpc_transport *transport; if (status == GRPC_SECURITY_OK) { if (secure_endpoint) { gpr_mu_lock(&state->state->mu); if (!state->state->is_shutdown) { - transport = grpc_create_chttp2_transport( + grpc_transport *transport = grpc_create_chttp2_transport( exec_ctx, grpc_server_get_channel_args(state->state->server), secure_endpoint, 0); - grpc_channel_args *args_copy; grpc_arg args_to_add[2]; args_to_add[0] = grpc_server_credentials_to_arg(state->state->creds); args_to_add[1] = grpc_auth_context_to_arg(auth_context); - args_copy = grpc_channel_args_copy_and_add( - grpc_server_get_channel_args(state->state->server), args_to_add, - GPR_ARRAY_SIZE(args_to_add)); + grpc_channel_args *args_copy = grpc_channel_args_copy_and_add( + state->args, args_to_add, GPR_ARRAY_SIZE(args_to_add)); grpc_server_setup_transport(exec_ctx, state->state->server, transport, state->accepting_pollset, args_copy); grpc_channel_args_destroy(args_copy); @@ -118,10 +122,38 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *statep, } else { gpr_log(GPR_ERROR, "Secure transport failed with error %d", status); } + grpc_channel_args_destroy(state->args); state_unref(state->state); gpr_free(state); } +static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, + grpc_channel_args *args, void *user_data, + grpc_error *error) { + server_secure_connect *state = user_data; + if (error != GRPC_ERROR_NONE) { + const char *error_str = grpc_error_string(error); + gpr_log(GPR_ERROR, "Handshaking failed: %s", error_str); + grpc_error_free_string(error_str); + GRPC_ERROR_UNREF(error); + grpc_handshake_manager_shutdown(exec_ctx, state->handshake_mgr); + grpc_handshake_manager_destroy(exec_ctx, state->handshake_mgr); + grpc_channel_args_destroy(args); + state_unref(state->state); + gpr_free(state); + return; + } + grpc_handshake_manager_destroy(exec_ctx, state->handshake_mgr); + state->handshake_mgr = NULL; + // TODO(roth, jboeuf): Convert security connector handshaking to use new + // handshake API, and then move the code from on_secure_handshake_done() + // into this function. + state->args = args; + grpc_server_security_connector_do_handshake( + exec_ctx, state->state->sc, state->acceptor, endpoint, state->deadline, + on_secure_handshake_done, state); +} + static void on_accept(grpc_exec_ctx *exec_ctx, void *statep, grpc_endpoint *tcp, grpc_pollset *accepting_pollset, grpc_tcp_server_acceptor *acceptor) { @@ -129,11 +161,16 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *statep, grpc_endpoint *tcp, state->state = statep; state_ref(state->state); state->accepting_pollset = accepting_pollset; - grpc_server_security_connector_do_handshake( - exec_ctx, state->state->sc, acceptor, tcp, - gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), - gpr_time_from_seconds(120, GPR_TIMESPAN)), - on_secure_handshake_done, state); + state->acceptor = acceptor; + state->handshake_mgr = grpc_handshake_manager_create(); + // TODO(roth): We should really get this timeout value from channel + // args instead of hard-coding it. + state->deadline = gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), + gpr_time_from_seconds(120, GPR_TIMESPAN)); + grpc_handshake_manager_do_handshake( + exec_ctx, state->handshake_mgr, tcp, + grpc_server_get_channel_args(state->state->server), state->deadline, + acceptor, on_handshake_done, state); } /* Server callback: start listening on our ports */ @@ -162,10 +199,11 @@ static void destroy(grpc_exec_ctx *exec_ctx, grpc_server *server, void *statep, server_secure_state *state = statep; grpc_tcp_server *tcp; gpr_mu_lock(&state->mu); - state->is_shutdown = 1; + state->is_shutdown = true; state->destroy_callback = callback; tcp = state->tcp; gpr_mu_unlock(&state->mu); + grpc_tcp_server_shutdown_listeners(exec_ctx, tcp); grpc_tcp_server_unref(exec_ctx, tcp); } @@ -226,7 +264,7 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, state->tcp = tcp; state->sc = sc; state->creds = grpc_server_credentials_ref(creds); - state->is_shutdown = 0; + state->is_shutdown = false; gpr_mu_init(&state->mu); gpr_ref_init(&state->refcount, 1); diff --git a/src/core/ext/transport/chttp2/transport/chttp2_plugin.c b/src/core/ext/transport/chttp2/transport/chttp2_plugin.c index bd87253ed3..7d5279b9da 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_plugin.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_plugin.c @@ -36,11 +36,14 @@ #include "src/core/lib/debug/trace.h" #include "src/core/lib/transport/metadata.h" +extern int grpc_http_write_state_trace; + void grpc_chttp2_plugin_init(void) { grpc_chttp2_base64_encode_and_huffman_compress = grpc_chttp2_base64_encode_and_huffman_compress_impl; grpc_register_tracer("http", &grpc_http_trace); grpc_register_tracer("flowctl", &grpc_flowctl_trace); + grpc_register_tracer("http_write_state", &grpc_http_write_state_trace); } void grpc_chttp2_plugin_shutdown(void) {} diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 5aae753c07..d050467a02 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -48,6 +48,7 @@ #include "src/core/ext/transport/chttp2/transport/status_conversion.h" #include "src/core/ext/transport/chttp2/transport/timeout_encoding.h" #include "src/core/lib/http/parser.h" +#include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/string.h" #include "src/core/lib/transport/static_metadata.h" @@ -60,9 +61,9 @@ #define DEFAULT_MAX_HEADER_LIST_SIZE (16 * 1024) #define MAX_CLIENT_STREAM_ID 0x7fffffffu - int grpc_http_trace = 0; int grpc_flowctl_trace = 0; +int grpc_http_write_state_trace = 0; #define TRANSPORT_FROM_WRITING(tw) \ ((grpc_chttp2_transport *)((char *)(tw)-offsetof(grpc_chttp2_transport, \ @@ -88,10 +89,16 @@ static const grpc_transport_vtable vtable; static void writing_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); static void reading_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); static void parsing_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); +static void initiate_writing(grpc_exec_ctx *exec_ctx, void *t, + grpc_error *error); + +static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t); +static void end_waiting_for_write(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, grpc_error *error); /** Set a transport level setting, and push it to our peer */ -static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, - uint32_t value); +static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_chttp2_setting_id id, uint32_t value); /** Start disconnection chain */ static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, @@ -137,7 +144,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global); static void incoming_byte_stream_update_flow_control( - grpc_chttp2_transport_global *transport_global, + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, size_t max_size_hint, size_t have_already); static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, @@ -201,6 +208,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, gpr_free(t); } +/*#define REFCOUNTING_DEBUG 1*/ #ifdef REFCOUNTING_DEBUG #define REF_TRANSPORT(t, r) ref_transport(t, r, __FILE__, __LINE__) #define UNREF_TRANSPORT(cl, t, r) unref_transport(cl, t, r, __FILE__, __LINE__) @@ -231,7 +239,7 @@ static void ref_transport(grpc_chttp2_transport *t) { gpr_ref(&t->refs); } static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, const grpc_channel_args *channel_args, - grpc_endpoint *ep, uint8_t is_client) { + grpc_endpoint *ep, bool is_client) { size_t i; int j; @@ -273,6 +281,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_closure_init(&t->writing_action, writing_action, t); grpc_closure_init(&t->reading_action, reading_action, t); grpc_closure_init(&t->parsing_action, parsing_action, t); + grpc_closure_init(&t->initiate_writing, initiate_writing, t); gpr_slice_buffer_init(&t->parsing.qbuf); grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser); @@ -286,6 +295,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, gpr_slice_buffer_add( &t->global.qbuf, gpr_slice_from_copied_string(GRPC_CHTTP2_CLIENT_CONNECT_STRING)); + grpc_chttp2_initiate_write(exec_ctx, &t->global, false, "initial_write"); } /* 8 is a random stab in the dark as to a good initial size: it's small enough that it shouldn't waste memory for infrequently used connections, yet @@ -311,11 +321,12 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, /* configure http2 the way we like it */ if (is_client) { - push_setting(t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0); - push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0); + push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0); + push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0); } - push_setting(t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, DEFAULT_WINDOW); - push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, + push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, + DEFAULT_WINDOW); + push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, DEFAULT_MAX_HEADER_LIST_SIZE); if (channel_args) { @@ -329,7 +340,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, gpr_log(GPR_ERROR, "%s: must be an integer", GRPC_ARG_MAX_CONCURRENT_STREAMS); } else { - push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, + push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, (uint32_t)channel_args->args[i].value.integer); } } else if (0 == strcmp(channel_args->args[i].key, @@ -368,7 +379,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, gpr_log(GPR_ERROR, "%s: must be non-negative", GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER); } else { - push_setting(t, GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE, + push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE, (uint32_t)channel_args->args[i].value.integer); } } else if (0 == strcmp(channel_args->args[i].key, @@ -393,7 +404,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, gpr_log(GPR_ERROR, "%s: must be non-negative", GRPC_ARG_MAX_METADATA_SIZE); } else { - push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, + push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, (uint32_t)channel_args->args[i].value.integer); } } @@ -444,6 +455,9 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_error *error) { if (!t->closed) { + if (grpc_http_write_state_trace) { + gpr_log(GPR_DEBUG, "W:%p close transport", t); + } t->closed = 1; connectivity_state_set(exec_ctx, &t->global, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "close_transport"); @@ -590,7 +604,8 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer_destroy( &s->global.received_trailing_metadata); gpr_slice_buffer_destroy(&s->writing.flow_controlled_buffer); - GRPC_ERROR_UNREF(s->global.removal_error); + GRPC_ERROR_UNREF(s->global.read_closed_error); + GRPC_ERROR_UNREF(s->global.write_closed_error); UNREF_TRANSPORT(exec_ctx, t, "stream"); @@ -634,6 +649,36 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream( * LOCK MANAGEMENT */ +static const char *write_state_name(grpc_chttp2_write_state state) { + switch (state) { + case GRPC_CHTTP2_WRITING_INACTIVE: + return "INACTIVE"; + case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER: + return "REQUESTED[p=0]"; + case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER: + return "REQUESTED[p=1]"; + case GRPC_CHTTP2_WRITE_SCHEDULED: + return "SCHEDULED"; + case GRPC_CHTTP2_WRITING: + return "WRITING"; + case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER: + return "WRITING[p=1]"; + case GRPC_CHTTP2_WRITING_STALE_NO_POLLER: + return "WRITING[p=0]"; + } + GPR_UNREACHABLE_CODE(return "UNKNOWN"); +} + +static void set_write_state(grpc_chttp2_transport *t, + grpc_chttp2_write_state state, const char *reason) { + if (grpc_http_write_state_trace) { + gpr_log(GPR_DEBUG, "W:%p %s -> %s because %s", t, + write_state_name(t->executor.write_state), write_state_name(state), + reason); + } + t->executor.write_state = state; +} + static void finish_global_actions(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { grpc_chttp2_executor_action_header *hdr; @@ -642,13 +687,6 @@ static void finish_global_actions(grpc_exec_ctx *exec_ctx, GPR_TIMER_BEGIN("finish_global_actions", 0); for (;;) { - if (!t->executor.writing_active && !t->closed && - grpc_chttp2_unlocking_check_writes(exec_ctx, &t->global, &t->writing)) { - t->executor.writing_active = 1; - REF_TRANSPORT(t, "writing"); - prevent_endpoint_shutdown(t); - grpc_exec_ctx_sched(exec_ctx, &t->writing_action, GRPC_ERROR_NONE, NULL); - } check_read_ops(exec_ctx, &t->global); gpr_mu_lock(&t->executor.mu); @@ -669,8 +707,28 @@ static void finish_global_actions(grpc_exec_ctx *exec_ctx, continue; } else { t->executor.global_active = false; + switch (t->executor.write_state) { + case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER: + set_write_state(t, GRPC_CHTTP2_WRITE_SCHEDULED, "unlocking"); + REF_TRANSPORT(t, "initiate_writing"); + gpr_mu_unlock(&t->executor.mu); + grpc_exec_ctx_sched( + exec_ctx, &t->initiate_writing, GRPC_ERROR_NONE, + t->ep != NULL ? grpc_endpoint_get_workqueue(t->ep) : NULL); + break; + case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER: + start_writing(exec_ctx, t); + gpr_mu_unlock(&t->executor.mu); + break; + case GRPC_CHTTP2_WRITING_INACTIVE: + case GRPC_CHTTP2_WRITING: + case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER: + case GRPC_CHTTP2_WRITING_STALE_NO_POLLER: + case GRPC_CHTTP2_WRITE_SCHEDULED: + gpr_mu_unlock(&t->executor.mu); + break; + } } - gpr_mu_unlock(&t->executor.mu); break; } @@ -741,16 +799,118 @@ void grpc_chttp2_run_with_global_lock(grpc_exec_ctx *exec_ctx, * OUTPUT PROCESSING */ -void grpc_chttp2_become_writable(grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global *stream_global) { +void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_global *transport_global, + bool covered_by_poller, const char *reason) { + /* Perform state checks, and transition to a scheduled state if appropriate. + Each time we finish the global lock execution, we check if we need to + write. If we do: + - (if there is a poller surrounding the write) schedule + initiate_writing, which locks and calls initiate_writing_locked to... + - call start_writing, which verifies (under the global lock) that there + are things that need to be written by calling + grpc_chttp2_unlocking_check_writes, and if so schedules writing_action + against the current exec_ctx, to be executed OUTSIDE of the global lock + - eventually writing_action results in grpc_chttp2_terminate_writing being + called, which re-takes the global lock, updates state, checks if we need + to do *another* write immediately, and if so loops back to + start_writing. + + Current problems: + - too much lock entry/exiting + - the writing thread can become stuck indefinitely (punt through the + workqueue periodically to fix) */ + + grpc_chttp2_transport *t = TRANSPORT_FROM_GLOBAL(transport_global); + switch (t->executor.write_state) { + case GRPC_CHTTP2_WRITING_INACTIVE: + set_write_state(t, covered_by_poller + ? GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER + : GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER, + reason); + break; + case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER: + /* nothing to do: write already requested */ + break; + case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER: + if (covered_by_poller) { + /* upgrade to note poller is available to cover the write */ + set_write_state(t, GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER, reason); + } + break; + case GRPC_CHTTP2_WRITE_SCHEDULED: + /* nothing to do: write already scheduled */ + break; + case GRPC_CHTTP2_WRITING: + set_write_state(t, + covered_by_poller ? GRPC_CHTTP2_WRITING_STALE_WITH_POLLER + : GRPC_CHTTP2_WRITING_STALE_NO_POLLER, + reason); + break; + case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER: + /* nothing to do: write already requested */ + break; + case GRPC_CHTTP2_WRITING_STALE_NO_POLLER: + if (covered_by_poller) { + /* upgrade to note poller is available to cover the write */ + set_write_state(t, GRPC_CHTTP2_WRITING_STALE_WITH_POLLER, reason); + } + break; + } +} + +static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { + GPR_ASSERT(t->executor.write_state == GRPC_CHTTP2_WRITE_SCHEDULED || + t->executor.write_state == GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER); + if (!t->closed && + grpc_chttp2_unlocking_check_writes(exec_ctx, &t->global, &t->writing)) { + set_write_state(t, GRPC_CHTTP2_WRITING, "start_writing"); + REF_TRANSPORT(t, "writing"); + prevent_endpoint_shutdown(t); + grpc_exec_ctx_sched(exec_ctx, &t->writing_action, GRPC_ERROR_NONE, NULL); + } else { + if (t->closed) { + set_write_state(t, GRPC_CHTTP2_WRITING_INACTIVE, + "start_writing:transport_closed"); + } else { + set_write_state(t, GRPC_CHTTP2_WRITING_INACTIVE, + "start_writing:nothing_to_write"); + } + end_waiting_for_write(exec_ctx, t, GRPC_ERROR_CREATE("Nothing to write")); + if (t->ep && !t->endpoint_reading) { + destroy_endpoint(exec_ctx, t); + } + } +} + +static void initiate_writing_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s_unused, + void *arg_ignored) { + start_writing(exec_ctx, t); + UNREF_TRANSPORT(exec_ctx, t, "initiate_writing"); +} + +static void initiate_writing(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_chttp2_run_with_global_lock(exec_ctx, arg, NULL, initiate_writing_locked, + NULL, 0); +} + +void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, + bool covered_by_poller, const char *reason) { if (!TRANSPORT_FROM_GLOBAL(transport_global)->closed && grpc_chttp2_list_add_writable_stream(transport_global, stream_global)) { GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing"); + grpc_chttp2_initiate_write(exec_ctx, transport_global, covered_by_poller, + reason); } } -static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, - uint32_t value) { +static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_chttp2_setting_id id, uint32_t value) { const grpc_chttp2_setting_parameters *sp = &grpc_chttp2_settings_parameters[id]; uint32_t use_value = GPR_CLAMP(value, sp->min_value, sp->max_value); @@ -761,9 +921,22 @@ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, if (use_value != t->global.settings[GRPC_LOCAL_SETTINGS][id]) { t->global.settings[GRPC_LOCAL_SETTINGS][id] = use_value; t->global.dirtied_local_settings = 1; + grpc_chttp2_initiate_write(exec_ctx, &t->global, false, "push_setting"); } } +static void end_waiting_for_write(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, grpc_error *error) { + grpc_chttp2_stream_global *stream_global; + while (grpc_chttp2_list_pop_closed_waiting_for_writing(&t->global, + &stream_global)) { + fail_pending_writes(exec_ctx, &t->global, stream_global, + GRPC_ERROR_REF(error)); + GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "finish_writes"); + } + GRPC_ERROR_UNREF(error); +} + static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s_ignored, @@ -778,24 +951,32 @@ static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, grpc_chttp2_cleanup_writing(exec_ctx, &t->global, &t->writing); - grpc_chttp2_stream_global *stream_global; - while (grpc_chttp2_list_pop_closed_waiting_for_writing(&t->global, - &stream_global)) { - fail_pending_writes(exec_ctx, &t->global, stream_global, - GRPC_ERROR_REF(error)); - GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "finish_writes"); + end_waiting_for_write(exec_ctx, t, error); + + switch (t->executor.write_state) { + case GRPC_CHTTP2_WRITING_INACTIVE: + case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER: + case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER: + case GRPC_CHTTP2_WRITE_SCHEDULED: + GPR_UNREACHABLE_CODE(break); + case GRPC_CHTTP2_WRITING: + set_write_state(t, GRPC_CHTTP2_WRITING_INACTIVE, "terminate_writing"); + break; + case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER: + set_write_state(t, GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER, + "terminate_writing"); + break; + case GRPC_CHTTP2_WRITING_STALE_NO_POLLER: + set_write_state(t, GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER, + "terminate_writing"); + break; } - /* leave the writing flag up on shutdown to prevent further writes in - unlock() - from starting */ - t->executor.writing_active = 0; if (t->ep && !t->endpoint_reading) { destroy_endpoint(exec_ctx, t); } UNREF_TRANSPORT(exec_ctx, t, "writing"); - GRPC_ERROR_UNREF(error); } void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx, @@ -878,7 +1059,8 @@ static void maybe_start_some_streams( stream_global->id, STREAM_FROM_GLOBAL(stream_global)); stream_global->in_stream_map = true; transport_global->concurrent_stream_count++; - grpc_chttp2_become_writable(transport_global, stream_global); + grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, true, + "new_stream"); } /* cancel out streams that will never be started */ while (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID && @@ -1018,9 +1200,11 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, maybe_start_some_streams(exec_ctx, transport_global); } else { GPR_ASSERT(stream_global->id != 0); - grpc_chttp2_become_writable(transport_global, stream_global); + grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, + true, "op.send_initial_metadata"); } } else { + stream_global->send_trailing_metadata = NULL; grpc_chttp2_complete_closure_step( exec_ctx, transport_global, stream_global, &stream_global->send_initial_metadata_finished, @@ -1042,7 +1226,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, } else { stream_global->send_message = op->send_message; if (stream_global->id != 0) { - grpc_chttp2_become_writable(transport_global, stream_global); + grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, + true, "op.send_message"); } } } @@ -1075,6 +1260,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); } if (stream_global->write_closed) { + stream_global->send_trailing_metadata = NULL; grpc_chttp2_complete_closure_step( exec_ctx, transport_global, stream_global, &stream_global->send_trailing_metadata_finished, @@ -1085,7 +1271,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, } else if (stream_global->id != 0) { /* TODO(ctiller): check if there's flow control for any outstanding bytes before going writable */ - grpc_chttp2_become_writable(transport_global, stream_global); + grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, + true, "op.send_trailing_metadata"); } } } @@ -1106,8 +1293,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, (stream_global->incoming_frames.head == NULL || stream_global->incoming_frames.head->is_tail)) { incoming_byte_stream_update_flow_control( - transport_global, stream_global, transport_global->stream_lookahead, - 0); + exec_ctx, transport_global, stream_global, + transport_global->stream_lookahead, 0); } grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); } @@ -1135,7 +1322,8 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, sizeof(*op)); } -static void send_ping_locked(grpc_chttp2_transport *t, grpc_closure *on_recv) { +static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_closure *on_recv) { grpc_chttp2_outstanding_ping *p = gpr_malloc(sizeof(*p)); p->next = &t->global.pings; p->prev = p->next->prev; @@ -1150,6 +1338,7 @@ static void send_ping_locked(grpc_chttp2_transport *t, grpc_closure *on_recv) { p->id[7] = (uint8_t)(t->global.ping_counter & 0xff); p->on_recv = on_recv; gpr_slice_buffer_add(&t->global.qbuf, grpc_chttp2_ping_create(0, p->id)); + grpc_chttp2_initiate_write(exec_ctx, &t->global, true, "send_ping"); } static void ack_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, @@ -1209,6 +1398,7 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, close_transport = grpc_chttp2_has_streams(t) ? GRPC_ERROR_NONE : GRPC_ERROR_CREATE("GOAWAY sent"); + grpc_chttp2_initiate_write(exec_ctx, &t->global, false, "goaway_sent"); } if (op->set_accept_stream) { @@ -1226,7 +1416,7 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, } if (op->send_ping) { - send_ping_locked(t, op->send_ping); + send_ping_locked(exec_ctx, t, op->send_ping); } if (close_transport != GRPC_ERROR_NONE) { @@ -1414,6 +1604,8 @@ static void cancel_from_api(grpc_exec_ctx *exec_ctx, &transport_global->qbuf, grpc_chttp2_rst_stream_create(stream_global->id, (uint32_t)http_error, &stream_global->stats.outgoing)); + grpc_chttp2_initiate_write(exec_ctx, transport_global, false, + "rst_stream"); } const char *msg = @@ -1473,10 +1665,39 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, } } +static void add_error(grpc_error *error, grpc_error **refs, size_t *nrefs) { + if (error == GRPC_ERROR_NONE) return; + for (size_t i = 0; i < *nrefs; i++) { + if (error == refs[i]) { + return; + } + } + refs[*nrefs] = error; + ++*nrefs; +} + +static grpc_error *removal_error(grpc_error *extra_error, + grpc_chttp2_stream_global *stream_global) { + grpc_error *refs[3]; + size_t nrefs = 0; + add_error(stream_global->read_closed_error, refs, &nrefs); + add_error(stream_global->write_closed_error, refs, &nrefs); + add_error(extra_error, refs, &nrefs); + grpc_error *error = GRPC_ERROR_NONE; + if (nrefs > 0) { + error = GRPC_ERROR_CREATE_REFERENCING("Failed due to stream removal", refs, + nrefs); + } + GRPC_ERROR_UNREF(extra_error); + return error; +} + static void fail_pending_writes(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, grpc_error *error) { + error = removal_error(error, stream_global); + stream_global->send_message = NULL; grpc_chttp2_complete_closure_step( exec_ctx, transport_global, stream_global, &stream_global->send_initial_metadata_finished, GRPC_ERROR_REF(error)); @@ -1499,14 +1720,17 @@ void grpc_chttp2_mark_stream_closed( } grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); if (close_reads && !stream_global->read_closed) { + stream_global->read_closed_error = GRPC_ERROR_REF(error); stream_global->read_closed = true; stream_global->published_initial_metadata = true; stream_global->published_trailing_metadata = true; decrement_active_streams_locked(exec_ctx, transport_global, stream_global); } if (close_writes && !stream_global->write_closed) { + stream_global->write_closed_error = GRPC_ERROR_REF(error); stream_global->write_closed = true; - if (TRANSPORT_FROM_GLOBAL(transport_global)->executor.writing_active) { + if (TRANSPORT_FROM_GLOBAL(transport_global)->executor.write_state != + GRPC_CHTTP2_WRITING_INACTIVE) { GRPC_CHTTP2_STREAM_REF(stream_global, "finish_writes"); grpc_chttp2_list_add_closed_waiting_for_writing(transport_global, stream_global); @@ -1516,7 +1740,6 @@ void grpc_chttp2_mark_stream_closed( } } if (stream_global->read_closed && stream_global->write_closed) { - stream_global->removal_error = GRPC_ERROR_REF(error); if (stream_global->id != 0 && TRANSPORT_FROM_GLOBAL(transport_global)->executor.parsing_active) { grpc_chttp2_list_add_closed_waiting_for_parsing(transport_global, @@ -1524,7 +1747,8 @@ void grpc_chttp2_mark_stream_closed( } else { if (stream_global->id != 0) { remove_stream(exec_ctx, TRANSPORT_FROM_GLOBAL(transport_global), - stream_global->id, GRPC_ERROR_REF(error)); + stream_global->id, + removal_error(GRPC_ERROR_REF(error), stream_global)); } GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2"); } @@ -1649,6 +1873,8 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, 1, 1, error); + grpc_chttp2_initiate_write(exec_ctx, transport_global, false, + "close_from_api"); } typedef struct { @@ -1678,8 +1904,14 @@ static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } /** update window from a settings change */ +typedef struct { + grpc_chttp2_transport *t; + grpc_exec_ctx *exec_ctx; +} update_global_window_args; + static void update_global_window(void *args, uint32_t id, void *stream) { - grpc_chttp2_transport *t = args; + update_global_window_args *a = args; + grpc_chttp2_transport *t = a->t; grpc_chttp2_stream *s = stream; grpc_chttp2_transport_global *transport_global = &t->global; grpc_chttp2_stream_global *stream_global = &s->global; @@ -1693,7 +1925,8 @@ static void update_global_window(void *args, uint32_t id, void *stream) { is_zero = stream_global->outgoing_window <= 0; if (was_zero && !is_zero) { - grpc_chttp2_become_writable(transport_global, stream_global); + grpc_chttp2_become_writable(a->exec_ctx, transport_global, stream_global, + true, "update_global_window"); } } @@ -1772,6 +2005,7 @@ static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx, static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_chttp2_transport *t = arg; + grpc_error *err = GRPC_ERROR_NONE; GPR_TIMER_BEGIN("reading_action.parse", 0); size_t i = 0; grpc_error *errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE, @@ -1780,15 +2014,13 @@ static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg, errors[1] = grpc_chttp2_perform_read(exec_ctx, &t->parsing, t->read_buffer.slices[i]); }; - if (i != t->read_buffer.count) { + if (errors[1] == GRPC_ERROR_NONE) { + err = GRPC_ERROR_REF(error); + } else { errors[2] = try_http_parsing(exec_ctx, t); + err = GRPC_ERROR_CREATE_REFERENCING("Failed parsing HTTP/2", errors, + GPR_ARRAY_SIZE(errors)); } - grpc_error *err = - errors[0] == GRPC_ERROR_NONE && errors[1] == GRPC_ERROR_NONE && - errors[2] == GRPC_ERROR_NONE - ? GRPC_ERROR_NONE - : GRPC_ERROR_CREATE_REFERENCING("Failed parsing HTTP/2", errors, - GPR_ARRAY_SIZE(errors)); for (i = 0; i < GPR_ARRAY_SIZE(errors); i++) { GRPC_ERROR_UNREF(errors[i]); } @@ -1802,14 +2034,19 @@ static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_transport_global *transport_global = &t->global; grpc_chttp2_transport_parsing *transport_parsing = &t->parsing; /* copy parsing qbuf to global qbuf */ - gpr_slice_buffer_move_into(&t->parsing.qbuf, &t->global.qbuf); + if (t->parsing.qbuf.count > 0) { + gpr_slice_buffer_move_into(&t->parsing.qbuf, &t->global.qbuf); + grpc_chttp2_initiate_write(exec_ctx, transport_global, false, + "parsing_qbuf"); + } /* merge stream lists */ grpc_chttp2_stream_map_move_into(&t->new_stream_map, &t->parsing_stream_map); transport_global->concurrent_stream_count = (uint32_t)grpc_chttp2_stream_map_size(&t->parsing_stream_map); if (transport_parsing->initial_window_update != 0) { + update_global_window_args args = {t, exec_ctx}; grpc_chttp2_stream_map_for_each(&t->parsing_stream_map, - update_global_window, t); + update_global_window, &args); transport_parsing->initial_window_update = 0; } /* handle higher level things */ @@ -1832,7 +2069,7 @@ static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, GPR_ASSERT(stream_global->write_closed); GPR_ASSERT(stream_global->read_closed); remove_stream(exec_ctx, t, stream_global->id, - GRPC_ERROR_REF(stream_global->removal_error)); + removal_error(GRPC_ERROR_NONE, stream_global)); GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2"); } @@ -1855,11 +2092,12 @@ static void post_reading_action_locked(grpc_exec_ctx *exec_ctx, } drop_connection(exec_ctx, t, GRPC_ERROR_REF(error)); t->endpoint_reading = 0; - if (!t->executor.writing_active && t->ep) { - grpc_endpoint_destroy(exec_ctx, t->ep); - t->ep = NULL; - /* safe as we still have a ref for read */ - UNREF_TRANSPORT(exec_ctx, t, "disconnect"); + if (grpc_http_write_state_trace) { + gpr_log(GPR_DEBUG, "R:%p -> 0 ws=%s", t, + write_state_name(t->executor.write_state)); + } + if (t->executor.write_state == GRPC_CHTTP2_WRITING_INACTIVE && t->ep) { + destroy_endpoint(exec_ctx, t); } } else if (!t->closed) { keep_reading = true; @@ -1943,7 +2181,7 @@ static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx, } static void incoming_byte_stream_update_flow_control( - grpc_chttp2_transport_global *transport_global, + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, size_t max_size_hint, size_t have_already) { uint32_t max_recv_bytes; @@ -1978,7 +2216,8 @@ static void incoming_byte_stream_update_flow_control( add_max_recv_bytes); grpc_chttp2_list_add_unannounced_incoming_window_available(transport_global, stream_global); - grpc_chttp2_become_writable(transport_global, stream_global); + grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, + false, "read_incoming_stream"); } } @@ -2000,8 +2239,9 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream_global *stream_global = &bs->stream->global; if (bs->is_tail) { - incoming_byte_stream_update_flow_control( - transport_global, stream_global, arg->max_size_hint, bs->slices.length); + incoming_byte_stream_update_flow_control(exec_ctx, transport_global, + stream_global, arg->max_size_hint, + bs->slices.length); } if (bs->slices.count > 0) { *arg->slice = gpr_slice_buffer_take_first(&bs->slices); @@ -2185,7 +2425,7 @@ static char *format_flowctl_context_var(const char *context, const char *var, if (context == NULL) { *scope = NULL; gpr_asprintf(&buf, "%s(%" PRId64 ")", var, val); - result = gpr_leftpad(buf, ' ', 40); + result = gpr_leftpad(buf, ' ', 60); gpr_free(buf); return result; } @@ -2198,7 +2438,7 @@ static char *format_flowctl_context_var(const char *context, const char *var, gpr_free(tmp); } gpr_asprintf(&buf, "%s.%s(%" PRId64 ")", underscore_pos + 1, var, val); - result = gpr_leftpad(buf, ' ', 40); + result = gpr_leftpad(buf, ' ', 60); gpr_free(buf); return result; } @@ -2231,7 +2471,7 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *phase, tmp_phase = gpr_leftpad(phase, ' ', 8); tmp_scope1 = gpr_leftpad(scope1, ' ', 11); - gpr_asprintf(&prefix, "FLOW %s: %s %s ", phase, clisvr, scope1); + gpr_asprintf(&prefix, "FLOW %s: %s %s ", tmp_phase, clisvr, scope1); gpr_free(tmp_phase); gpr_free(tmp_scope1); diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 8d79e93ceb..e1dcf5262a 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -305,6 +305,22 @@ typedef struct grpc_chttp2_executor_action_header { void *arg; } grpc_chttp2_executor_action_header; +typedef enum { + /** no writing activity */ + GRPC_CHTTP2_WRITING_INACTIVE, + /** write has been requested, but not scheduled yet */ + GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER, + GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER, + /** write has been requested and scheduled against the workqueue */ + GRPC_CHTTP2_WRITE_SCHEDULED, + /** write has been initiated after being reaped from the workqueue */ + GRPC_CHTTP2_WRITING, + /** write has been initiated, AND another write needs to be started once it's + done */ + GRPC_CHTTP2_WRITING_STALE_WITH_POLLER, + GRPC_CHTTP2_WRITING_STALE_NO_POLLER, +} grpc_chttp2_write_state; + struct grpc_chttp2_transport { grpc_transport base; /* must be first */ gpr_refcount refs; @@ -319,10 +335,10 @@ struct grpc_chttp2_transport { /** is a thread currently in the global lock */ bool global_active; - /** is a thread currently writing */ - bool writing_active; /** is a thread currently parsing */ bool parsing_active; + /** write execution state of the transport */ + grpc_chttp2_write_state write_state; grpc_chttp2_executor_action_header *pending_actions_head; grpc_chttp2_executor_action_header *pending_actions_tail; @@ -342,7 +358,8 @@ struct grpc_chttp2_transport { /** global state for reading/writing */ grpc_chttp2_transport_global global; /** state only accessible by the chain of execution that - set writing_active=1 */ + set writing_state >= GRPC_WRITING, and only by the writing closure + chain. */ grpc_chttp2_transport_writing writing; /** state only accessible by the chain of execution that set parsing_active=1 */ @@ -363,6 +380,8 @@ struct grpc_chttp2_transport { grpc_closure reading_action; /** closure to actually do parsing */ grpc_closure parsing_action; + /** closure to initiate writing */ + grpc_closure initiate_writing; /** incoming read bytes */ gpr_slice_buffer read_buffer; @@ -436,8 +455,10 @@ typedef struct { bool seen_error; bool exceeded_metadata_size; - /** the error that resulted in this stream being removed */ - grpc_error *removal_error; + /** the error that resulted in this stream being read-closed */ + grpc_error *read_closed_error; + /** the error that resulted in this stream being write-closed */ + grpc_error *write_closed_error; bool published_initial_metadata; bool published_trailing_metadata; @@ -514,15 +535,20 @@ struct grpc_chttp2_stream { }; /** Transport writing call flow: - chttp2_transport.c calls grpc_chttp2_unlocking_check_writes to see if writes - are required; - if they are, chttp2_transport.c calls grpc_chttp2_perform_writes to do the - writes. - Once writes have been completed (meaning another write could potentially be - started), - grpc_chttp2_terminate_writing is called. This will call - grpc_chttp2_cleanup_writing, at which - point the write phase is complete. */ + grpc_chttp2_initiate_write() is called anywhere that we know bytes need to + go out on the wire. + If no other write has been started, a task is enqueued onto our workqueue. + When that task executes, it obtains the global lock, and gathers the data + to write. + The global lock is dropped and we do the syscall to write. + After writing, a follow-up check is made to see if another round of writing + should be performed. + + The actual call chain is documented in the implementation of this function. + */ +void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_global *transport_global, + bool covered_by_poller, const char *reason); /** Someone is unlocking the transport mutex: check to see if writes are required, and schedule them if so */ @@ -610,9 +636,8 @@ int grpc_chttp2_list_pop_check_read_ops( void grpc_chttp2_list_add_writing_stalled_by_transport( grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing); -void grpc_chttp2_list_flush_writing_stalled_by_transport( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing, - bool is_window_available); +bool grpc_chttp2_list_flush_writing_stalled_by_transport( + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing); void grpc_chttp2_list_add_stalled_by_transport( grpc_chttp2_transport_writing *transport_writing, @@ -822,7 +847,9 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, /** add a ref to the stream and add it to the writable list; ref will be dropped in writing.c */ -void grpc_chttp2_become_writable(grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global *stream_global); +void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, + bool covered_by_poller, const char *reason); #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H */ diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c index 84eb5752f1..e1fc0ddee2 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.c +++ b/src/core/ext/transport/chttp2/transport/parsing.c @@ -154,10 +154,8 @@ void grpc_chttp2_publish_reads( transport_parsing, outgoing_window); is_zero = transport_global->outgoing_window <= 0; if (was_zero && !is_zero) { - while (grpc_chttp2_list_pop_stalled_by_transport(transport_global, - &stream_global)) { - grpc_chttp2_become_writable(transport_global, stream_global); - } + grpc_chttp2_initiate_write(exec_ctx, transport_global, false, + "new_global_flow_control"); } if (transport_parsing->incoming_window < @@ -168,6 +166,8 @@ void grpc_chttp2_publish_reads( announce_incoming_window, announce_bytes); GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("parsed", transport_parsing, incoming_window, announce_bytes); + grpc_chttp2_initiate_write(exec_ctx, transport_global, false, + "global incoming window"); } /* for each stream that saw an update, fixup global state */ @@ -190,7 +190,8 @@ void grpc_chttp2_publish_reads( outgoing_window); is_zero = stream_global->outgoing_window <= 0; if (was_zero && !is_zero) { - grpc_chttp2_become_writable(transport_global, stream_global); + grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, + false, "stream.read_flow_control"); } stream_global->max_recv_bytes -= (uint32_t)GPR_MIN( diff --git a/src/core/ext/transport/chttp2/transport/stream_lists.c b/src/core/ext/transport/chttp2/transport/stream_lists.c index 8f3ab00e6d..2eb5f5f632 100644 --- a/src/core/ext/transport/chttp2/transport/stream_lists.c +++ b/src/core/ext/transport/chttp2/transport/stream_lists.c @@ -329,6 +329,7 @@ void grpc_chttp2_list_add_writing_stalled_by_transport( grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing) { grpc_chttp2_stream *stream = STREAM_FROM_WRITING(stream_writing); + gpr_log(GPR_DEBUG, "writing stalled %d", stream->global.id); if (!stream->included[GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT]) { GRPC_CHTTP2_STREAM_REF(&stream->global, "chttp2_writing_stalled"); } @@ -336,27 +337,28 @@ void grpc_chttp2_list_add_writing_stalled_by_transport( GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT); } -void grpc_chttp2_list_flush_writing_stalled_by_transport( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing, - bool is_window_available) { +bool grpc_chttp2_list_flush_writing_stalled_by_transport( + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing) { grpc_chttp2_stream *stream; + bool out = false; grpc_chttp2_transport *transport = TRANSPORT_FROM_WRITING(transport_writing); while (stream_list_pop(transport, &stream, GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT)) { - if (is_window_available) { - grpc_chttp2_become_writable(&transport->global, &stream->global); - } else { - grpc_chttp2_list_add_stalled_by_transport(transport_writing, - &stream->writing); - } + gpr_log(GPR_DEBUG, "move %d from writing stalled to just stalled", + stream->global.id); + grpc_chttp2_list_add_stalled_by_transport(transport_writing, + &stream->writing); GRPC_CHTTP2_STREAM_UNREF(exec_ctx, &stream->global, "chttp2_writing_stalled"); + out = true; } + return out; } void grpc_chttp2_list_add_stalled_by_transport( grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing) { + gpr_log(GPR_DEBUG, "stalled %d", stream_writing->id); stream_list_add(TRANSPORT_FROM_WRITING(transport_writing), STREAM_FROM_WRITING(stream_writing), GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT); diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index a2ac8e8ec9..8f7a1f55c6 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -75,9 +75,13 @@ int grpc_chttp2_unlocking_check_writes( GRPC_CHTTP2_FLOW_MOVE_TRANSPORT("write", transport_writing, outgoing_window, transport_global, outgoing_window); - bool is_window_available = transport_writing->outgoing_window > 0; - grpc_chttp2_list_flush_writing_stalled_by_transport( - exec_ctx, transport_writing, is_window_available); + if (transport_writing->outgoing_window > 0) { + while (grpc_chttp2_list_pop_stalled_by_transport(transport_global, + &stream_global)) { + grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, + false, "transport.read_flow_control"); + } + } /* for each grpc_chttp2_stream that's become writable, frame it's data (according to available window sizes) and add to the output buffer */ @@ -337,6 +341,12 @@ void grpc_chttp2_cleanup_writing( grpc_chttp2_stream_writing *stream_writing; grpc_chttp2_stream_global *stream_global; + if (grpc_chttp2_list_flush_writing_stalled_by_transport(exec_ctx, + transport_writing)) { + grpc_chttp2_initiate_write(exec_ctx, transport_global, false, + "resume_stalled_stream"); + } + while (grpc_chttp2_list_pop_written_stream( transport_global, transport_writing, &stream_global, &stream_writing)) { if (stream_writing->sent_initial_metadata) { diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h index 653d04f427..aec61ee7c6 100644 --- a/src/core/lib/channel/channel_args.h +++ b/src/core/lib/channel/channel_args.h @@ -37,6 +37,8 @@ #include <grpc/compression.h> #include <grpc/grpc.h> +// Channel args are intentionally immutable, to avoid the need for locking. + /** Copy the arguments in \a src into a new instance */ grpc_channel_args *grpc_channel_args_copy(const grpc_channel_args *src); diff --git a/src/core/lib/channel/handshaker.c b/src/core/lib/channel/handshaker.c new file mode 100644 index 0000000000..6c3ca198b7 --- /dev/null +++ b/src/core/lib/channel/handshaker.c @@ -0,0 +1,197 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <string.h> + +#include <grpc/impl/codegen/alloc.h> +#include <grpc/impl/codegen/log.h> + +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/handshaker.h" + +// +// grpc_handshaker +// + +void grpc_handshaker_init(const struct grpc_handshaker_vtable* vtable, + grpc_handshaker* handshaker) { + handshaker->vtable = vtable; +} + +void grpc_handshaker_destroy(grpc_exec_ctx* exec_ctx, + grpc_handshaker* handshaker) { + handshaker->vtable->destroy(exec_ctx, handshaker); +} + +void grpc_handshaker_shutdown(grpc_exec_ctx* exec_ctx, + grpc_handshaker* handshaker) { + handshaker->vtable->shutdown(exec_ctx, handshaker); +} + +void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx, + grpc_handshaker* handshaker, + grpc_endpoint* endpoint, + grpc_channel_args* args, + gpr_timespec deadline, + grpc_tcp_server_acceptor* acceptor, + grpc_handshaker_done_cb cb, void* user_data) { + handshaker->vtable->do_handshake(exec_ctx, handshaker, endpoint, args, + deadline, acceptor, cb, user_data); +} + +// +// grpc_handshake_manager +// + +// State used while chaining handshakers. +struct grpc_handshaker_state { + // The index of the handshaker to invoke next. + size_t index; + // The deadline for all handshakers. + gpr_timespec deadline; + // The acceptor to call the handshakers with. + grpc_tcp_server_acceptor* acceptor; + // The final callback and user_data to invoke after the last handshaker. + grpc_handshaker_done_cb final_cb; + void* final_user_data; +}; + +struct grpc_handshake_manager { + // An array of handshakers added via grpc_handshake_manager_add(). + size_t count; + grpc_handshaker** handshakers; + // State used while chaining handshakers. + struct grpc_handshaker_state* state; +}; + +grpc_handshake_manager* grpc_handshake_manager_create() { + grpc_handshake_manager* mgr = gpr_malloc(sizeof(grpc_handshake_manager)); + memset(mgr, 0, sizeof(*mgr)); + return mgr; +} + +static bool is_power_of_2(size_t n) { return (n & (n - 1)) == 0; } + +void grpc_handshake_manager_add(grpc_handshake_manager* mgr, + grpc_handshaker* handshaker) { + // To avoid allocating memory for each handshaker we add, we double + // the number of elements every time we need more. + size_t realloc_count = 0; + if (mgr->count == 0) { + realloc_count = 2; + } else if (mgr->count >= 2 && is_power_of_2(mgr->count)) { + realloc_count = mgr->count * 2; + } + if (realloc_count > 0) { + mgr->handshakers = + gpr_realloc(mgr->handshakers, realloc_count * sizeof(grpc_handshaker*)); + } + mgr->handshakers[mgr->count++] = handshaker; +} + +void grpc_handshake_manager_destroy(grpc_exec_ctx* exec_ctx, + grpc_handshake_manager* mgr) { + for (size_t i = 0; i < mgr->count; ++i) { + grpc_handshaker_destroy(exec_ctx, mgr->handshakers[i]); + } + gpr_free(mgr->handshakers); + gpr_free(mgr); +} + +void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx, + grpc_handshake_manager* mgr) { + for (size_t i = 0; i < mgr->count; ++i) { + grpc_handshaker_shutdown(exec_ctx, mgr->handshakers[i]); + } + if (mgr->state != NULL) { + gpr_free(mgr->state); + mgr->state = NULL; + } +} + +// A function used as the handshaker-done callback when chaining +// handshakers together. +static void call_next_handshaker(grpc_exec_ctx* exec_ctx, + grpc_endpoint* endpoint, + grpc_channel_args* args, void* user_data, + grpc_error* error) { + grpc_handshake_manager* mgr = user_data; + GPR_ASSERT(mgr->state != NULL); + GPR_ASSERT(mgr->state->index < mgr->count); + // If we got an error, skip all remaining handshakers and invoke the + // caller-supplied callback immediately. + if (error != GRPC_ERROR_NONE) { + mgr->state->final_cb(exec_ctx, endpoint, args, mgr->state->final_user_data, + error); + return; + } + grpc_handshaker_done_cb cb = call_next_handshaker; + // If this is the last handshaker, use the caller-supplied callback + // and user_data instead of chaining back to this function again. + if (mgr->state->index == mgr->count - 1) { + cb = mgr->state->final_cb; + user_data = mgr->state->final_user_data; + } + // Invoke handshaker. + grpc_handshaker_do_handshake(exec_ctx, mgr->handshakers[mgr->state->index], + endpoint, args, mgr->state->deadline, + mgr->state->acceptor, cb, user_data); + ++mgr->state->index; + // If this is the last handshaker, clean up state. + if (mgr->state->index == mgr->count) { + gpr_free(mgr->state); + mgr->state = NULL; + } +} + +void grpc_handshake_manager_do_handshake( + grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr, + grpc_endpoint* endpoint, const grpc_channel_args* args, + gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor, + grpc_handshaker_done_cb cb, void* user_data) { + grpc_channel_args* args_copy = grpc_channel_args_copy(args); + if (mgr->count == 0) { + // No handshakers registered, so we just immediately call the done + // callback with the passed-in endpoint. + cb(exec_ctx, endpoint, args_copy, user_data, GRPC_ERROR_NONE); + } else { + GPR_ASSERT(mgr->state == NULL); + mgr->state = gpr_malloc(sizeof(struct grpc_handshaker_state)); + memset(mgr->state, 0, sizeof(*mgr->state)); + mgr->state->deadline = deadline; + mgr->state->acceptor = acceptor; + mgr->state->final_cb = cb; + mgr->state->final_user_data = user_data; + call_next_handshaker(exec_ctx, endpoint, args_copy, mgr, GRPC_ERROR_NONE); + } +} diff --git a/src/core/lib/channel/handshaker.h b/src/core/lib/channel/handshaker.h new file mode 100644 index 0000000000..dfc469c417 --- /dev/null +++ b/src/core/lib/channel/handshaker.h @@ -0,0 +1,145 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_CHANNEL_HANDSHAKER_H +#define GRPC_CORE_LIB_CHANNEL_HANDSHAKER_H + +#include <grpc/impl/codegen/grpc_types.h> +#include <grpc/impl/codegen/time.h> + +#include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/endpoint.h" +#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/tcp_server.h" + +/// Handshakers are used to perform initial handshakes on a connection +/// before the client sends the initial request. Some examples of what +/// a handshaker can be used for includes support for HTTP CONNECT on +/// the client side and various types of security initialization. +/// +/// In general, handshakers should be used via a handshake manager. + +/// +/// grpc_handshaker +/// + +typedef struct grpc_handshaker grpc_handshaker; + +/// Callback type invoked when a handshaker is done. +/// Takes ownership of \a args. +typedef void (*grpc_handshaker_done_cb)(grpc_exec_ctx* exec_ctx, + grpc_endpoint* endpoint, + grpc_channel_args* args, + void* user_data, grpc_error* error); + +struct grpc_handshaker_vtable { + /// Destroys the handshaker. + void (*destroy)(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker); + + /// Shuts down the handshaker (e.g., to clean up when the operation is + /// aborted in the middle). + void (*shutdown)(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker); + + /// Performs handshaking. When finished, calls \a cb with \a user_data. + /// Takes ownership of \a args. + /// \a acceptor will be NULL for client-side handshakers. + void (*do_handshake)(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker, + grpc_endpoint* endpoint, grpc_channel_args* args, + gpr_timespec deadline, + grpc_tcp_server_acceptor* acceptor, + grpc_handshaker_done_cb cb, void* user_data); +}; + +/// Base struct. To subclass, make this the first member of the +/// implementation struct. +struct grpc_handshaker { + const struct grpc_handshaker_vtable* vtable; +}; + +/// Called by concrete implementations to initialize the base struct. +void grpc_handshaker_init(const struct grpc_handshaker_vtable* vtable, + grpc_handshaker* handshaker); + +/// Convenient wrappers for invoking methods via the vtable. +/// These probably do not need to be called from anywhere but +/// grpc_handshake_manager. +void grpc_handshaker_destroy(grpc_exec_ctx* exec_ctx, + grpc_handshaker* handshaker); +void grpc_handshaker_shutdown(grpc_exec_ctx* exec_ctx, + grpc_handshaker* handshaker); +void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx, + grpc_handshaker* handshaker, + grpc_endpoint* endpoint, + grpc_channel_args* args, + gpr_timespec deadline, + grpc_tcp_server_acceptor* acceptor, + grpc_handshaker_done_cb cb, void* user_data); + +/// +/// grpc_handshake_manager +/// + +typedef struct grpc_handshake_manager grpc_handshake_manager; + +/// Creates a new handshake manager. Caller takes ownership. +grpc_handshake_manager* grpc_handshake_manager_create(); + +/// Adds a handshaker to the handshake manager. +/// Takes ownership of \a handshaker. +void grpc_handshake_manager_add(grpc_handshake_manager* mgr, + grpc_handshaker* handshaker); + +/// Destroys the handshake manager. +void grpc_handshake_manager_destroy(grpc_exec_ctx* exec_ctx, + grpc_handshake_manager* mgr); + +/// Shuts down the handshake manager (e.g., to clean up when the operation is +/// aborted in the middle). +/// The caller must still call grpc_handshake_manager_destroy() after +/// calling this function. +void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx, + grpc_handshake_manager* mgr); + +/// Invokes handshakers in the order they were added. +/// Does NOT take ownership of \a args. Instead, makes a copy before +/// invoking the first handshaker. +/// \a acceptor will be NULL for client-side handshakers. +/// Invokes \a cb with \a user_data after either a handshaker fails or +/// all handshakers have completed successfully. +void grpc_handshake_manager_do_handshake( + grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr, + grpc_endpoint* endpoint, const grpc_channel_args* args, + gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor, + grpc_handshaker_done_cb cb, void* user_data); + +#endif /* GRPC_CORE_LIB_CHANNEL_HANDSHAKER_H */ diff --git a/src/core/lib/iomgr/endpoint.c b/src/core/lib/iomgr/endpoint.c index 1ab3733d38..f901fcf962 100644 --- a/src/core/lib/iomgr/endpoint.c +++ b/src/core/lib/iomgr/endpoint.c @@ -65,3 +65,7 @@ void grpc_endpoint_destroy(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep) { char* grpc_endpoint_get_peer(grpc_endpoint* ep) { return ep->vtable->get_peer(ep); } + +grpc_workqueue* grpc_endpoint_get_workqueue(grpc_endpoint* ep) { + return ep->vtable->get_workqueue(ep); +} diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h index f9808bbda1..894efc0b23 100644 --- a/src/core/lib/iomgr/endpoint.h +++ b/src/core/lib/iomgr/endpoint.h @@ -51,6 +51,7 @@ struct grpc_endpoint_vtable { gpr_slice_buffer *slices, grpc_closure *cb); void (*write)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, gpr_slice_buffer *slices, grpc_closure *cb); + grpc_workqueue *(*get_workqueue)(grpc_endpoint *ep); void (*add_to_pollset)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_pollset *pollset); void (*add_to_pollset_set)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, @@ -69,6 +70,9 @@ void grpc_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, char *grpc_endpoint_get_peer(grpc_endpoint *ep); +/* Retrieve a reference to the workqueue associated with this endpoint */ +grpc_workqueue *grpc_endpoint_get_workqueue(grpc_endpoint *ep); + /* Write slices out to the socket. If the connection is ready for more data after the end of the call, it diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index cf0fe736a0..6a63c4d1d1 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -57,6 +57,7 @@ #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" +#include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/block_annotate.h" @@ -113,9 +114,7 @@ struct grpc_fd { grpc_closure *read_closure; grpc_closure *write_closure; - /* The polling island to which this fd belongs to and the mutex protecting the - the field */ - gpr_mu pi_mu; + /* The polling island to which this fd belongs to (protected by mu) */ struct polling_island *polling_island; struct grpc_fd *freelist_next; @@ -152,16 +151,17 @@ static void fd_global_shutdown(void); * Polling island Declarations */ -// #define GRPC_PI_REF_COUNT_DEBUG +//#define GRPC_PI_REF_COUNT_DEBUG #ifdef GRPC_PI_REF_COUNT_DEBUG #define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__) -#define PI_UNREF(p, r) pi_unref_dbg((p), (r), __FILE__, __LINE__) +#define PI_UNREF(exec_ctx, p, r) \ + pi_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__) #else /* defined(GRPC_PI_REF_COUNT_DEBUG) */ #define PI_ADD_REF(p, r) pi_add_ref((p)) -#define PI_UNREF(p, r) pi_unref((p)) +#define PI_UNREF(exec_ctx, p, r) pi_unref((exec_ctx), (p)) #endif /* !defined(GPRC_PI_REF_COUNT_DEBUG) */ @@ -172,7 +172,7 @@ typedef struct polling_island { Once the ref count becomes zero, this structure is destroyed which means we should ensure that there is never a scenario where a PI_ADD_REF() is racing with a PI_UNREF() that just made the ref_count zero. */ - gpr_refcount ref_count; + gpr_atm ref_count; /* Pointer to the polling_island this merged into. * merged_to value is only set once in polling_island's lifetime (and that too @@ -184,6 +184,9 @@ typedef struct polling_island { * (except mu and ref_count) are invalid and must be ignored. */ gpr_atm merged_to; + /* The workqueue associated with this polling island */ + grpc_workqueue *workqueue; + /* The fd of the underlying epoll set */ int epoll_fd; @@ -191,11 +194,6 @@ typedef struct polling_island { size_t fd_cnt; size_t fd_capacity; grpc_fd **fds; - - /* Polling islands that are no longer needed are kept in a freelist so that - they can be reused. This field points to the next polling island in the - free list */ - struct polling_island *next_free; } polling_island; /******************************************************************************* @@ -253,13 +251,14 @@ struct grpc_pollset_set { * Common helpers */ -static void append_error(grpc_error **composite, grpc_error *error, +static bool append_error(grpc_error **composite, grpc_error *error, const char *desc) { - if (error == GRPC_ERROR_NONE) return; + if (error == GRPC_ERROR_NONE) return true; if (*composite == GRPC_ERROR_NONE) { *composite = GRPC_ERROR_CREATE(desc); } *composite = grpc_error_add_child(*composite, error); + return false; } /******************************************************************************* @@ -275,11 +274,8 @@ static void append_error(grpc_error **composite, grpc_error *error, threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */ static grpc_wakeup_fd polling_island_wakeup_fd; -/* Polling island freelist */ -static gpr_mu g_pi_freelist_mu; -static polling_island *g_pi_freelist = NULL; - -static void polling_island_delete(); /* Forward declaration */ +/* Forward declaration */ +static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi); #ifdef GRPC_TSAN /* Currently TSAN may incorrectly flag data races between epoll_ctl and @@ -293,28 +289,35 @@ gpr_atm g_epoll_sync; #endif /* defined(GRPC_TSAN) */ #ifdef GRPC_PI_REF_COUNT_DEBUG -void pi_add_ref(polling_island *pi); -void pi_unref(polling_island *pi); +static void pi_add_ref(polling_island *pi); +static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi); -void pi_add_ref_dbg(polling_island *pi, char *reason, char *file, int line) { - long old_cnt = gpr_atm_acq_load(&(pi->ref_count.count)); +static void pi_add_ref_dbg(polling_island *pi, char *reason, char *file, + int line) { + long old_cnt = gpr_atm_acq_load(&pi->ref_count); pi_add_ref(pi); gpr_log(GPR_DEBUG, "Add ref pi: %p, old: %ld -> new:%ld (%s) - (%s, %d)", (void *)pi, old_cnt, old_cnt + 1, reason, file, line); } -void pi_unref_dbg(polling_island *pi, char *reason, char *file, int line) { - long old_cnt = gpr_atm_acq_load(&(pi->ref_count.count)); - pi_unref(pi); +static void pi_unref_dbg(grpc_exec_ctx *exec_ctx, polling_island *pi, + char *reason, char *file, int line) { + long old_cnt = gpr_atm_acq_load(&pi->ref_count); + pi_unref(exec_ctx, pi); gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)", (void *)pi, old_cnt, (old_cnt - 1), reason, file, line); } #endif -void pi_add_ref(polling_island *pi) { gpr_ref(&pi->ref_count); } +static void pi_add_ref(polling_island *pi) { + gpr_atm_no_barrier_fetch_add(&pi->ref_count, 1); +} -void pi_unref(polling_island *pi) { - /* If ref count went to zero, delete the polling island. +static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi) { + /* If ref count went to one, we're back to just the workqueue owning a ref. + Unref the workqueue to break the loop. + + If ref count went to zero, delete the polling island. Note that this deletion not be done under a lock. Once the ref count goes to zero, we are guaranteed that no one else holds a reference to the polling island (and that there is no racing pi_add_ref() call either). @@ -322,12 +325,20 @@ void pi_unref(polling_island *pi) { Also, if we are deleting the polling island and the merged_to field is non-empty, we should remove a ref to the merged_to polling island */ - if (gpr_unref(&pi->ref_count)) { - polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to); - polling_island_delete(pi); - if (next != NULL) { - PI_UNREF(next, "pi_delete"); /* Recursive call */ + switch (gpr_atm_full_fetch_add(&pi->ref_count, -1)) { + case 2: /* last external ref: the only one now owned is by the workqueue */ + GRPC_WORKQUEUE_UNREF(exec_ctx, pi->workqueue, "polling_island"); + break; + case 1: { + polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to); + polling_island_delete(exec_ctx, pi); + if (next != NULL) { + PI_UNREF(exec_ctx, next, "pi_delete"); /* Recursive call */ + } + break; } + case 0: + GPR_UNREACHABLE_CODE(return ); } } @@ -462,69 +473,68 @@ static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd, } /* Might return NULL in case of an error */ -static polling_island *polling_island_create(grpc_fd *initial_fd, +static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx, + grpc_fd *initial_fd, grpc_error **error) { polling_island *pi = NULL; - char *err_msg; const char *err_desc = "polling_island_create"; - /* Try to get one from the polling island freelist */ - gpr_mu_lock(&g_pi_freelist_mu); - if (g_pi_freelist != NULL) { - pi = g_pi_freelist; - g_pi_freelist = g_pi_freelist->next_free; - pi->next_free = NULL; - } - gpr_mu_unlock(&g_pi_freelist_mu); + *error = GRPC_ERROR_NONE; - /* Create new polling island if we could not get one from the free list */ - if (pi == NULL) { - pi = gpr_malloc(sizeof(*pi)); - gpr_mu_init(&pi->mu); - pi->fd_cnt = 0; - pi->fd_capacity = 0; - pi->fds = NULL; - } + pi = gpr_malloc(sizeof(*pi)); + gpr_mu_init(&pi->mu); + pi->fd_cnt = 0; + pi->fd_capacity = 0; + pi->fds = NULL; + pi->epoll_fd = -1; + pi->workqueue = NULL; - gpr_ref_init(&pi->ref_count, 0); + gpr_atm_rel_store(&pi->ref_count, 0); gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL); pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC); if (pi->epoll_fd < 0) { - gpr_asprintf(&err_msg, "epoll_create1 failed with error %d (%s)", errno, - strerror(errno)); - append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc); - gpr_free(err_msg); - } else { - polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd, error); - pi->next_free = NULL; + append_error(error, GRPC_OS_ERROR(errno, "epoll_create1"), err_desc); + goto done; + } - if (initial_fd != NULL) { - /* Lock the polling island here just in case we got this structure from - the freelist and the polling island lock was not released yet (by the - code that adds the polling island to the freelist) */ - gpr_mu_lock(&pi->mu); - polling_island_add_fds_locked(pi, &initial_fd, 1, true, error); - gpr_mu_unlock(&pi->mu); - } + polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd, error); + + if (initial_fd != NULL) { + polling_island_add_fds_locked(pi, &initial_fd, 1, true, error); + } + + if (append_error(error, grpc_workqueue_create(exec_ctx, &pi->workqueue), + err_desc) && + *error == GRPC_ERROR_NONE) { + polling_island_add_fds_locked(pi, &pi->workqueue->wakeup_read_fd, 1, true, + error); + GPR_ASSERT(pi->workqueue->wakeup_read_fd->polling_island == NULL); + pi->workqueue->wakeup_read_fd->polling_island = pi; + PI_ADD_REF(pi, "fd"); } +done: + if (*error != GRPC_ERROR_NONE) { + if (pi->workqueue != NULL) { + GRPC_WORKQUEUE_UNREF(exec_ctx, pi->workqueue, "polling_island"); + } + polling_island_delete(exec_ctx, pi); + pi = NULL; + } return pi; } -static void polling_island_delete(polling_island *pi) { +static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi) { GPR_ASSERT(pi->fd_cnt == 0); - gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL); - - close(pi->epoll_fd); - pi->epoll_fd = -1; - - gpr_mu_lock(&g_pi_freelist_mu); - pi->next_free = g_pi_freelist; - g_pi_freelist = pi; - gpr_mu_unlock(&g_pi_freelist_mu); + if (pi->epoll_fd >= 0) { + close(pi->epoll_fd); + } + gpr_mu_destroy(&pi->mu); + gpr_free(pi->fds); + gpr_free(pi); } /* Attempts to gets the last polling island in the linked list (liked by the @@ -704,9 +714,6 @@ static polling_island *polling_island_merge(polling_island *p, static grpc_error *polling_island_global_init() { grpc_error *error = GRPC_ERROR_NONE; - gpr_mu_init(&g_pi_freelist_mu); - g_pi_freelist = NULL; - error = grpc_wakeup_fd_init(&polling_island_wakeup_fd); if (error == GRPC_ERROR_NONE) { error = grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd); @@ -716,18 +723,6 @@ static grpc_error *polling_island_global_init() { } static void polling_island_global_shutdown() { - polling_island *next; - gpr_mu_lock(&g_pi_freelist_mu); - gpr_mu_unlock(&g_pi_freelist_mu); - while (g_pi_freelist != NULL) { - next = g_pi_freelist->next_free; - gpr_mu_destroy(&g_pi_freelist->mu); - gpr_free(g_pi_freelist->fds); - gpr_free(g_pi_freelist); - g_pi_freelist = next; - } - gpr_mu_destroy(&g_pi_freelist_mu); - grpc_wakeup_fd_destroy(&polling_island_wakeup_fd); } @@ -845,7 +840,6 @@ static grpc_fd *fd_create(int fd, const char *name) { if (new_fd == NULL) { new_fd = gpr_malloc(sizeof(grpc_fd)); gpr_mu_init(&new_fd->mu); - gpr_mu_init(&new_fd->pi_mu); } /* Note: It is not really needed to get the new_fd->mu lock here. If this is a @@ -896,6 +890,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, const char *reason) { bool is_fd_closed = false; grpc_error *error = GRPC_ERROR_NONE; + polling_island *unref_pi = NULL; gpr_mu_lock(&fd->mu); fd->on_done_closure = on_done; @@ -923,21 +918,26 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, - Unlock the latest polling island - Set fd->polling_island to NULL (but remove the ref on the polling island before doing this.) */ - gpr_mu_lock(&fd->pi_mu); if (fd->polling_island != NULL) { polling_island *pi_latest = polling_island_lock(fd->polling_island); polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed, &error); gpr_mu_unlock(&pi_latest->mu); - PI_UNREF(fd->polling_island, "fd_orphan"); + unref_pi = fd->polling_island; fd->polling_island = NULL; } - gpr_mu_unlock(&fd->pi_mu); grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, error, NULL); gpr_mu_unlock(&fd->mu); UNREF_BY(fd, 2, reason); /* Drop the reference */ + if (unref_pi != NULL) { + /* Unref stale polling island here, outside the fd lock above. + The polling island owns a workqueue which owns an fd, and unreffing + inside the lock can cause an eventual lock loop that makes TSAN very + unhappy. */ + PI_UNREF(exec_ctx, unref_pi, "fd_orphan"); + } GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error)); } @@ -1037,6 +1037,17 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_mu_unlock(&fd->mu); } +static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { + gpr_mu_lock(&fd->mu); + grpc_workqueue *workqueue = NULL; + if (fd->polling_island != NULL) { + workqueue = + GRPC_WORKQUEUE_REF(fd->polling_island->workqueue, "get_workqueue"); + } + gpr_mu_unlock(&fd->mu); + return workqueue; +} + /******************************************************************************* * Pollset Definitions */ @@ -1227,9 +1238,10 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { gpr_mu_unlock(&fd->mu); } -static void pollset_release_polling_island(grpc_pollset *ps, char *reason) { +static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx, + grpc_pollset *ps, char *reason) { if (ps->polling_island != NULL) { - PI_UNREF(ps->polling_island, reason); + PI_UNREF(exec_ctx, ps->polling_island, reason); } ps->polling_island = NULL; } @@ -1242,7 +1254,7 @@ static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx, pollset->finish_shutdown_called = true; /* Release the ref and set pollset->polling_island to NULL */ - pollset_release_polling_island(pollset, "ps_shutdown"); + pollset_release_polling_island(exec_ctx, pollset, "ps_shutdown"); grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL); } @@ -1281,7 +1293,7 @@ static void pollset_reset(grpc_pollset *pollset) { pollset->finish_shutdown_called = false; pollset->kicked_without_pollers = false; pollset->shutdown_done = NULL; - pollset_release_polling_island(pollset, "ps_reset"); + GPR_ASSERT(pollset->polling_island == NULL); } #define GRPC_EPOLL_MAX_EVENTS 1000 @@ -1309,7 +1321,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, this function (i.e pollset_work_and_unlock()) is called */ if (pollset->polling_island == NULL) { - pollset->polling_island = polling_island_create(NULL, error); + pollset->polling_island = polling_island_create(exec_ctx, NULL, error); if (pollset->polling_island == NULL) { GPR_TIMER_END("pollset_work_and_unlock", 0); return; /* Fatal error. We cannot continue */ @@ -1329,7 +1341,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, /* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the polling island to be deleted */ PI_ADD_REF(pi, "ps"); - PI_UNREF(pollset->polling_island, "ps"); + PI_UNREF(exec_ctx, pollset->polling_island, "ps"); pollset->polling_island = pi; } @@ -1400,7 +1412,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, that we got before releasing the polling island lock). This is because pollset->polling_island pointer might get udpated in other parts of the code when there is an island merge while we are doing epoll_wait() above */ - PI_UNREF(pi, "ps_work"); + PI_UNREF(exec_ctx, pi, "ps_work"); GPR_TIMER_END("pollset_work_and_unlock", 0); } @@ -1517,10 +1529,11 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_error *error = GRPC_ERROR_NONE; gpr_mu_lock(&pollset->mu); - gpr_mu_lock(&fd->pi_mu); + gpr_mu_lock(&fd->mu); polling_island *pi_new = NULL; +retry: /* 1) If fd->polling_island and pollset->polling_island are both non-NULL and * equal, do nothing. * 2) If fd->polling_island and pollset->polling_island are both NULL, create @@ -1535,15 +1548,44 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, * polling_island fields in both fd and pollset to point to the merged * polling island. */ + + if (fd->orphaned) { + gpr_mu_unlock(&fd->mu); + gpr_mu_unlock(&pollset->mu); + /* early out */ + return; + } + if (fd->polling_island == pollset->polling_island) { pi_new = fd->polling_island; if (pi_new == NULL) { - pi_new = polling_island_create(fd, &error); - - GRPC_POLLING_TRACE( - "pollset_add_fd: Created new polling island. pi_new: %p (fd: %d, " - "pollset: %p)", - (void *)pi_new, fd->fd, (void *)pollset); + /* Unlock before creating a new polling island: the polling island will + create a workqueue which creates a file descriptor, and holding an fd + lock here can eventually cause a loop to appear to TSAN (making it + unhappy). We don't think it's a real loop (there's an epoch point where + that loop possibility disappears), but the advantages of keeping TSAN + happy outweigh any performance advantage we might have by keeping the + lock held. */ + gpr_mu_unlock(&fd->mu); + pi_new = polling_island_create(exec_ctx, fd, &error); + gpr_mu_lock(&fd->mu); + /* Need to reverify any assumptions made between the initial lock and + getting to this branch: if they've changed, we need to throw away our + work and figure things out again. */ + if (fd->polling_island != NULL) { + GRPC_POLLING_TRACE( + "pollset_add_fd: Raced creating new polling island. pi_new: %p " + "(fd: %d, pollset: %p)", + (void *)pi_new, fd->fd, (void *)pollset); + PI_ADD_REF(pi_new, "dance_of_destruction"); + PI_UNREF(exec_ctx, pi_new, "dance_of_destruction"); + goto retry; + } else { + GRPC_POLLING_TRACE( + "pollset_add_fd: Created new polling island. pi_new: %p (fd: %d, " + "pollset: %p)", + (void *)pi_new, fd->fd, (void *)pollset); + } } } else if (fd->polling_island == NULL) { pi_new = polling_island_lock(pollset->polling_island); @@ -1579,7 +1621,7 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (fd->polling_island != pi_new) { PI_ADD_REF(pi_new, "fd"); if (fd->polling_island != NULL) { - PI_UNREF(fd->polling_island, "fd"); + PI_UNREF(exec_ctx, fd->polling_island, "fd"); } fd->polling_island = pi_new; } @@ -1587,13 +1629,15 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (pollset->polling_island != pi_new) { PI_ADD_REF(pi_new, "ps"); if (pollset->polling_island != NULL) { - PI_UNREF(pollset->polling_island, "ps"); + PI_UNREF(exec_ctx, pollset->polling_island, "ps"); } pollset->polling_island = pi_new; } - gpr_mu_unlock(&fd->pi_mu); + gpr_mu_unlock(&fd->mu); gpr_mu_unlock(&pollset->mu); + + GRPC_LOG_IF_ERROR("pollset_add_fd", error); } /******************************************************************************* @@ -1744,9 +1788,9 @@ static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, void *grpc_fd_get_polling_island(grpc_fd *fd) { polling_island *pi; - gpr_mu_lock(&fd->pi_mu); + gpr_mu_lock(&fd->mu); pi = fd->polling_island; - gpr_mu_unlock(&fd->pi_mu); + gpr_mu_unlock(&fd->mu); return pi; } @@ -1794,6 +1838,7 @@ static const grpc_event_engine_vtable vtable = { .fd_notify_on_read = fd_notify_on_read, .fd_notify_on_write = fd_notify_on_write, .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset, + .fd_get_workqueue = fd_get_workqueue, .pollset_init = pollset_init, .pollset_shutdown = pollset_shutdown, diff --git a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c index 9e306af5fa..c2107e5e39 100644 --- a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c +++ b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c @@ -725,6 +725,8 @@ static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher, GRPC_FD_UNREF(fd, "poll"); } +static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { return NULL; } + /******************************************************************************* * pollset_posix.c */ @@ -2006,6 +2008,7 @@ static const grpc_event_engine_vtable vtable = { .fd_notify_on_read = fd_notify_on_read, .fd_notify_on_write = fd_notify_on_write, .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset, + .fd_get_workqueue = fd_get_workqueue, .pollset_init = pollset_init, .pollset_shutdown = pollset_shutdown, diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index 45c0a5e954..16a5e3083e 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -617,6 +617,8 @@ static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher, GRPC_FD_UNREF(fd, "poll"); } +static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { return NULL; } + /******************************************************************************* * pollset_posix.c */ @@ -842,6 +844,11 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, *worker_hdl = &worker; grpc_error *error = GRPC_ERROR_NONE; + /* Avoid malloc for small number of elements. */ + enum { inline_elements = 96 }; + struct pollfd pollfd_space[inline_elements]; + struct grpc_fd_watcher watcher_space[inline_elements]; + /* pollset->mu already held */ int added_worker = 0; int locked = 1; @@ -897,15 +904,23 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, int r; size_t i, fd_count; nfds_t pfd_count; - /* TODO(ctiller): inline some elements to avoid an allocation */ grpc_fd_watcher *watchers; struct pollfd *pfds; timeout = poll_deadline_to_millis_timeout(deadline, now); - /* TODO(ctiller): perform just one malloc here if we exceed the inline - * case */ - pfds = gpr_malloc(sizeof(*pfds) * (pollset->fd_count + 2)); - watchers = gpr_malloc(sizeof(*watchers) * (pollset->fd_count + 2)); + + if (pollset->fd_count + 2 <= inline_elements) { + pfds = pollfd_space; + watchers = watcher_space; + } else { + /* Allocate one buffer to hold both pfds and watchers arrays */ + const size_t pfd_size = sizeof(*pfds) * (pollset->fd_count + 2); + const size_t watch_size = sizeof(*watchers) * (pollset->fd_count + 2); + void *buf = gpr_malloc(pfd_size + watch_size); + pfds = buf; + watchers = (void *)((char *)buf + pfd_size); + } + fd_count = 0; pfd_count = 2; pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd); @@ -972,8 +987,11 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } } - gpr_free(pfds); - gpr_free(watchers); + if (pfds != pollfd_space) { + /* pfds and watchers are in the same memory block pointed to by pfds */ + gpr_free(pfds); + } + GPR_TIMER_END("maybe_work_and_unlock", 0); locked = 0; } else { @@ -1234,6 +1252,7 @@ static const grpc_event_engine_vtable vtable = { .fd_notify_on_read = fd_notify_on_read, .fd_notify_on_write = fd_notify_on_write, .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset, + .fd_get_workqueue = fd_get_workqueue, .pollset_init = pollset_init, .pollset_shutdown = pollset_shutdown, diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index a3c1e9db9a..6536672685 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -148,6 +148,10 @@ grpc_fd *grpc_fd_create(int fd, const char *name) { return g_event_engine->fd_create(fd, name); } +grpc_workqueue *grpc_fd_get_workqueue(grpc_fd *fd) { + return g_event_engine->fd_get_workqueue(fd); +} + int grpc_fd_wrapped_fd(grpc_fd *fd) { return g_event_engine->fd_wrapped_fd(fd); } diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index 579c84ef70..c2aa1756ea 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -56,6 +56,7 @@ typedef struct grpc_event_engine_vtable { void (*fd_notify_on_write)(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure); bool (*fd_is_shutdown)(grpc_fd *fd); + grpc_workqueue *(*fd_get_workqueue)(grpc_fd *fd); grpc_pollset *(*fd_get_read_notifier_pollset)(grpc_exec_ctx *exec_ctx, grpc_fd *fd); @@ -107,6 +108,9 @@ const char *grpc_get_poll_strategy_name(); This takes ownership of closing fd. */ grpc_fd *grpc_fd_create(int fd, const char *name); +/* Get a workqueue that's associated with this fd */ +grpc_workqueue *grpc_fd_get_workqueue(grpc_fd *fd); + /* Return the wrapped fd, or -1 if it has been released or closed. */ int grpc_fd_wrapped_fd(grpc_fd *fd); diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c index c44aafcddf..ac7785ec13 100644 --- a/src/core/lib/iomgr/exec_ctx.c +++ b/src/core/lib/iomgr/exec_ctx.c @@ -37,6 +37,7 @@ #include <grpc/support/sync.h> #include <grpc/support/thd.h> +#include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/profiling/timers.h" bool grpc_exec_ctx_ready_to_finish(grpc_exec_ctx *exec_ctx) { @@ -85,14 +86,17 @@ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) { void grpc_exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_error *error, grpc_workqueue *offload_target_or_null) { - GPR_ASSERT(offload_target_or_null == NULL); - grpc_closure_list_append(&exec_ctx->closure_list, closure, error); + if (offload_target_or_null == NULL) { + grpc_closure_list_append(&exec_ctx->closure_list, closure, error); + } else { + grpc_workqueue_enqueue(exec_ctx, offload_target_or_null, closure, error); + GRPC_WORKQUEUE_UNREF(exec_ctx, offload_target_or_null, "exec_ctx_sched"); + } } void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx, grpc_closure_list *list, grpc_workqueue *offload_target_or_null) { - GPR_ASSERT(offload_target_or_null == NULL); grpc_closure_list_move(list, &exec_ctx->closure_list); } diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h index 38f27d9b13..917f332f03 100644 --- a/src/core/lib/iomgr/exec_ctx.h +++ b/src/core/lib/iomgr/exec_ctx.h @@ -93,7 +93,11 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx); /** Finish any pending work for a grpc_exec_ctx. Must be called before * the instance is destroyed, or work may be lost. */ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx); -/** Add a closure to be executed at the next flush/finish point */ +/** Add a closure to be executed in the future. + If \a offload_target_or_null is NULL, the closure will be executed at the + next exec_ctx.{finish,flush} point. + If \a offload_target_or_null is non-NULL, the closure will be scheduled + against the workqueue, and a reference to the workqueue will be consumed. */ void grpc_exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_error *error, grpc_workqueue *offload_target_or_null); diff --git a/src/core/lib/iomgr/iomgr.c b/src/core/lib/iomgr/iomgr.c index 89292a153e..d67d388b8c 100644 --- a/src/core/lib/iomgr/iomgr.c +++ b/src/core/lib/iomgr/iomgr.c @@ -45,6 +45,7 @@ #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/iomgr_internal.h" +#include "src/core/lib/iomgr/network_status_tracker.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/support/env.h" #include "src/core/lib/support/string.h" @@ -62,6 +63,7 @@ void grpc_iomgr_init(void) { grpc_timer_list_init(gpr_now(GPR_CLOCK_MONOTONIC)); g_root_object.next = g_root_object.prev = &g_root_object; g_root_object.name = "root"; + grpc_network_status_init(); grpc_iomgr_platform_init(); } @@ -140,6 +142,7 @@ void grpc_iomgr_shutdown(void) { grpc_iomgr_platform_shutdown(); grpc_exec_ctx_global_shutdown(); + grpc_network_status_shutdown(); gpr_mu_destroy(&g_mu); gpr_cv_destroy(&g_rcv); } diff --git a/src/core/lib/iomgr/network_status_tracker.c b/src/core/lib/iomgr/network_status_tracker.c index ccbe136db9..b4bb7e3cf7 100644 --- a/src/core/lib/iomgr/network_status_tracker.c +++ b/src/core/lib/iomgr/network_status_tracker.c @@ -42,9 +42,8 @@ typedef struct endpoint_ll_node { static endpoint_ll_node *head = NULL; static gpr_mu g_endpoint_mutex; -static gpr_once g_once_init = GPR_ONCE_INIT; -static void destroy_network_status_monitor(void) { +void grpc_network_status_shutdown(void) { if (head != NULL) { gpr_log(GPR_ERROR, "Memory leaked as all network endpoints were not shut down"); @@ -52,14 +51,21 @@ static void destroy_network_status_monitor(void) { gpr_mu_destroy(&g_endpoint_mutex); } -static void initialize_network_status_monitor(void) { +void grpc_network_status_init(void) { gpr_mu_init(&g_endpoint_mutex); - atexit(destroy_network_status_monitor); // TODO(makarandd): Install callback with OS to monitor network status. } +void grpc_destroy_network_status_monitor() { + for (endpoint_ll_node *curr = head; curr != NULL;) { + endpoint_ll_node *next = curr->next; + gpr_free(curr); + curr = next; + } + gpr_mu_destroy(&g_endpoint_mutex); +} + void grpc_network_status_register_endpoint(grpc_endpoint *ep) { - gpr_once_init(&g_once_init, initialize_network_status_monitor); gpr_mu_lock(&g_endpoint_mutex); if (head == NULL) { head = (endpoint_ll_node *)gpr_malloc(sizeof(endpoint_ll_node)); diff --git a/src/core/lib/iomgr/network_status_tracker.h b/src/core/lib/iomgr/network_status_tracker.h index 74a1aa8135..67cb645f44 100644 --- a/src/core/lib/iomgr/network_status_tracker.h +++ b/src/core/lib/iomgr/network_status_tracker.h @@ -35,7 +35,11 @@ #define GRPC_CORE_LIB_IOMGR_NETWORK_STATUS_TRACKER_H #include "src/core/lib/iomgr/endpoint.h" +void grpc_network_status_init(void); +void grpc_network_status_shutdown(void); + void grpc_network_status_register_endpoint(grpc_endpoint *ep); void grpc_network_status_unregister_endpoint(grpc_endpoint *ep); void grpc_network_status_shutdown_all_endpoints(); + #endif /* GRPC_CORE_LIB_IOMGR_NETWORK_STATUS_TRACKER_H */ diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index 2ab45e33ce..ec21e03944 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -284,7 +284,7 @@ static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, } /* returns true if done, false if pending; if returning true, *error is set */ -#define MAX_WRITE_IOVEC 16 +#define MAX_WRITE_IOVEC 1024 static bool tcp_flush(grpc_tcp *tcp, grpc_error **error) { struct msghdr msg; struct iovec iov[MAX_WRITE_IOVEC]; @@ -450,9 +450,19 @@ static char *tcp_get_peer(grpc_endpoint *ep) { return gpr_strdup(tcp->peer_string); } -static const grpc_endpoint_vtable vtable = { - tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set, - tcp_shutdown, tcp_destroy, tcp_get_peer}; +static grpc_workqueue *tcp_get_workqueue(grpc_endpoint *ep) { + grpc_tcp *tcp = (grpc_tcp *)ep; + return grpc_fd_get_workqueue(tcp->em_fd); +} + +static const grpc_endpoint_vtable vtable = {tcp_read, + tcp_write, + tcp_get_workqueue, + tcp_add_to_pollset, + tcp_add_to_pollset_set, + tcp_shutdown, + tcp_destroy, + tcp_get_peer}; grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size, const char *peer_string) { diff --git a/src/core/lib/iomgr/tcp_server.h b/src/core/lib/iomgr/tcp_server.h index 875a6ca14a..5a25d39a0c 100644 --- a/src/core/lib/iomgr/tcp_server.h +++ b/src/core/lib/iomgr/tcp_server.h @@ -105,4 +105,8 @@ void grpc_tcp_server_shutdown_starting_add(grpc_tcp_server *s, a call (exec_ctx!=NULL) to shutdown_complete. */ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s); +/* Shutdown the fds of listeners. */ +void grpc_tcp_server_shutdown_listeners(grpc_exec_ctx *exec_ctx, + grpc_tcp_server *s); + #endif /* GRPC_CORE_LIB_IOMGR_TCP_SERVER_H */ diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c index d3803c3bd0..38ebd2dbcb 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.c @@ -491,7 +491,8 @@ static grpc_error *clone_port(grpc_tcp_listener *listener, unsigned count) { } for (unsigned i = 0; i < count; i++) { - int fd, port; + int fd = -1; + int port = -1; grpc_dualstack_mode dsmode; err = grpc_create_dualstack_socket(&listener->addr.sockaddr, SOCK_STREAM, 0, &dsmode, &fd); @@ -740,4 +741,17 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { } } +void grpc_tcp_server_shutdown_listeners(grpc_exec_ctx *exec_ctx, + grpc_tcp_server *s) { + gpr_mu_lock(&s->mu); + /* shutdown all fd's */ + if (s->active_ports) { + grpc_tcp_listener *sp; + for (sp = s->head; sp; sp = sp->next) { + grpc_fd_shutdown(exec_ctx, sp->emfd); + } + } + gpr_mu_unlock(&s->mu); +} + #endif diff --git a/src/core/lib/iomgr/tcp_server_windows.c b/src/core/lib/iomgr/tcp_server_windows.c index 7b0966704c..1b125e7005 100644 --- a/src/core/lib/iomgr/tcp_server_windows.c +++ b/src/core/lib/iomgr/tcp_server_windows.c @@ -540,4 +540,7 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, gpr_mu_unlock(&s->mu); } +void grpc_tcp_server_shutdown_listeners(grpc_exec_ctx *exec_ctx, + grpc_tcp_server *s) {} + #endif /* GPR_WINSOCK_SOCKET */ diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c index 37ab59021e..35054c42b5 100644 --- a/src/core/lib/iomgr/tcp_windows.c +++ b/src/core/lib/iomgr/tcp_windows.c @@ -389,9 +389,16 @@ static char *win_get_peer(grpc_endpoint *ep) { return gpr_strdup(tcp->peer_string); } -static grpc_endpoint_vtable vtable = { - win_read, win_write, win_add_to_pollset, win_add_to_pollset_set, - win_shutdown, win_destroy, win_get_peer}; +static grpc_workqueue *win_get_workqueue(grpc_endpoint *ep) { return NULL; } + +static grpc_endpoint_vtable vtable = {win_read, + win_write, + win_get_workqueue, + win_add_to_pollset, + win_add_to_pollset_set, + win_shutdown, + win_destroy, + win_get_peer}; grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) { grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp)); diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c index 1ebccf2ee2..48032412a2 100644 --- a/src/core/lib/iomgr/udp_server.c +++ b/src/core/lib/iomgr/udp_server.c @@ -60,6 +60,7 @@ #include <grpc/support/string_util.h> #include <grpc/support/sync.h> #include <grpc/support/time.h> +#include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/sockaddr_utils.h" @@ -128,7 +129,7 @@ grpc_udp_server *grpc_udp_server_create(void) { } static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { - grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, 1, NULL); + grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL); gpr_mu_destroy(&s->mu); gpr_cv_destroy(&s->cv); @@ -138,7 +139,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { } static void destroyed_port(grpc_exec_ctx *exec_ctx, void *server, - bool success) { + grpc_error *error) { grpc_udp_server *s = server; gpr_mu_lock(&s->mu); s->destroyed_ports++; @@ -217,14 +218,23 @@ static int prepare_socket(int fd, const struct sockaddr *addr, goto error; } - if (!grpc_set_socket_nonblocking(fd, 1) || !grpc_set_socket_cloexec(fd, 1)) { - gpr_log(GPR_ERROR, "Unable to configure socket %d: %s", fd, - strerror(errno)); + if (grpc_set_socket_nonblocking(fd, 1) != GRPC_ERROR_NONE) { + gpr_log(GPR_ERROR, "Unable to set nonblocking %d: %s", fd, strerror(errno)); + goto error; + } + if (grpc_set_socket_cloexec(fd, 1) != GRPC_ERROR_NONE) { + gpr_log(GPR_ERROR, "Unable to set cloexec %d: %s", fd, strerror(errno)); + goto error; } - if (grpc_set_socket_ip_pktinfo_if_possible(fd) && - addr->sa_family == AF_INET6) { - grpc_set_socket_ipv6_recvpktinfo_if_possible(fd); + if (grpc_set_socket_ip_pktinfo_if_possible(fd) != GRPC_ERROR_NONE) { + gpr_log(GPR_ERROR, "Unable to set ip_pktinfo."); + goto error; + } else if (addr->sa_family == AF_INET6) { + if (grpc_set_socket_ipv6_recvpktinfo_if_possible(fd) != GRPC_ERROR_NONE) { + gpr_log(GPR_ERROR, "Unable to set ipv6_recvpktinfo."); + goto error; + } } GPR_ASSERT(addr_len < ~(socklen_t)0); @@ -241,13 +251,13 @@ static int prepare_socket(int fd, const struct sockaddr *addr, goto error; } - if (!grpc_set_socket_sndbuf(fd, buffer_size_bytes)) { + if (grpc_set_socket_sndbuf(fd, buffer_size_bytes) != GRPC_ERROR_NONE) { gpr_log(GPR_ERROR, "Failed to set send buffer size to %d bytes", buffer_size_bytes); goto error; } - if (!grpc_set_socket_rcvbuf(fd, buffer_size_bytes)) { + if (grpc_set_socket_rcvbuf(fd, buffer_size_bytes) != GRPC_ERROR_NONE) { gpr_log(GPR_ERROR, "Failed to set receive buffer size to %d bytes", buffer_size_bytes); goto error; @@ -263,10 +273,10 @@ error: } /* event manager callback when reads are ready */ -static void on_read(grpc_exec_ctx *exec_ctx, void *arg, bool success) { +static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { server_port *sp = arg; - if (!success) { + if (error != GRPC_ERROR_NONE) { gpr_mu_lock(&sp->server->mu); if (0 == --sp->server->active_ports) { gpr_mu_unlock(&sp->server->mu); @@ -369,7 +379,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, /* Try listening on IPv6 first. */ addr = (struct sockaddr *)&wild6; addr_len = sizeof(wild6); - fd = grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode); + // TODO(rjshade): Test and propagate the returned grpc_error*: + grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd); allocated_port1 = add_socket_to_server(s, fd, addr, addr_len, read_cb, orphan_cb); if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) { @@ -384,7 +395,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, addr_len = sizeof(wild4); } - fd = grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode); + // TODO(rjshade): Test and propagate the returned grpc_error*: + grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd); if (fd < 0) { gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno)); } diff --git a/src/core/lib/iomgr/workqueue.h b/src/core/lib/iomgr/workqueue.h index 5cc40eea50..7156e490d7 100644 --- a/src/core/lib/iomgr/workqueue.h +++ b/src/core/lib/iomgr/workqueue.h @@ -38,6 +38,7 @@ #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/pollset.h" +#include "src/core/lib/iomgr/pollset_set.h" #ifdef GPR_POSIX_SOCKET #include "src/core/lib/iomgr/workqueue_posix.h" @@ -49,35 +50,45 @@ /* grpc_workqueue is forward declared in exec_ctx.h */ -/** Create a work queue */ -grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx, - grpc_workqueue **workqueue); - +/* Deprecated: do not use. + This has *already* been removed in a future commit. */ void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue); -#define GRPC_WORKQUEUE_REFCOUNT_DEBUG +/* Reference counting functions. Use the macro's always + (GRPC_WORKQUEUE_{REF,UNREF}). + + Pass in a descriptive reason string for reffing/unreffing as the last + argument to each macro. When GRPC_WORKQUEUE_REFCOUNT_DEBUG is defined, that + string will be printed alongside the refcount. When it is not defined, the + string will be discarded at compilation time. */ + +//#define GRPC_WORKQUEUE_REFCOUNT_DEBUG #ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG #define GRPC_WORKQUEUE_REF(p, r) \ - grpc_workqueue_ref((p), __FILE__, __LINE__, (r)) -#define GRPC_WORKQUEUE_UNREF(cl, p, r) \ - grpc_workqueue_unref((cl), (p), __FILE__, __LINE__, (r)) + (grpc_workqueue_ref((p), __FILE__, __LINE__, (r)), (p)) +#define GRPC_WORKQUEUE_UNREF(exec_ctx, p, r) \ + grpc_workqueue_unref((exec_ctx), (p), __FILE__, __LINE__, (r)) void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line, const char *reason); void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, const char *file, int line, const char *reason); #else -#define GRPC_WORKQUEUE_REF(p, r) grpc_workqueue_ref((p)) +#define GRPC_WORKQUEUE_REF(p, r) (grpc_workqueue_ref((p)), (p)) #define GRPC_WORKQUEUE_UNREF(cl, p, r) grpc_workqueue_unref((cl), (p)) void grpc_workqueue_ref(grpc_workqueue *workqueue); void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue); #endif -/** Bind this workqueue to a pollset */ -void grpc_workqueue_add_to_pollset(grpc_exec_ctx *exec_ctx, - grpc_workqueue *workqueue, - grpc_pollset *pollset); +/** Add a work item to a workqueue. Items added to a work queue will be started + in approximately the order they were enqueued, on some thread that may or + may not be the current thread. Successive closures enqueued onto a workqueue + MAY be executed concurrently. + + It is generally more expensive to add a closure to a workqueue than to the + execution context, both in terms of CPU work and in execution latency. -/** Add a work item to a workqueue */ + Use work queues when it's important that other threads be given a chance to + tackle some workload. */ void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, grpc_closure *closure, grpc_error *error); diff --git a/src/core/lib/iomgr/workqueue_posix.c b/src/core/lib/iomgr/workqueue_posix.c index 45e0f6063b..e0d6dac230 100644 --- a/src/core/lib/iomgr/workqueue_posix.c +++ b/src/core/lib/iomgr/workqueue_posix.c @@ -70,7 +70,7 @@ grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx, static void workqueue_destroy(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { - GPR_ASSERT(grpc_closure_list_empty(workqueue->closure_list)); + grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL); grpc_fd_shutdown(exec_ctx, workqueue->wakeup_read_fd); } @@ -100,12 +100,6 @@ void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { } } -void grpc_workqueue_add_to_pollset(grpc_exec_ctx *exec_ctx, - grpc_workqueue *workqueue, - grpc_pollset *pollset) { - grpc_pollset_add_fd(exec_ctx, pollset, workqueue->wakeup_read_fd); -} - void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { gpr_mu_lock(&workqueue->mu); grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL); diff --git a/src/core/lib/iomgr/workqueue_posix.h b/src/core/lib/iomgr/workqueue_posix.h index dcb47e7b59..0f26ba58e2 100644 --- a/src/core/lib/iomgr/workqueue_posix.h +++ b/src/core/lib/iomgr/workqueue_posix.h @@ -50,4 +50,9 @@ struct grpc_workqueue { grpc_closure read_closure; }; +/** Create a work queue. Returns an error if creation fails. If creation + succeeds, sets *workqueue to point to it. */ +grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx, + grpc_workqueue **workqueue); + #endif /* GRPC_CORE_LIB_IOMGR_WORKQUEUE_POSIX_H */ diff --git a/src/core/lib/iomgr/workqueue_windows.c b/src/core/lib/iomgr/workqueue_windows.c index 275f040b1c..23e2dea185 100644 --- a/src/core/lib/iomgr/workqueue_windows.c +++ b/src/core/lib/iomgr/workqueue_windows.c @@ -37,4 +37,26 @@ #include "src/core/lib/iomgr/workqueue.h" +// Minimal implementation of grpc_workqueue for Windows +// Works by directly enqueuing workqueue items onto the current execution +// context, which is at least correct, if not performant or in the spirit of +// workqueues. + +void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {} + +#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG +void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line, + const char *reason) {} +void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, + const char *file, int line, const char *reason) {} +#else +void grpc_workqueue_ref(grpc_workqueue *workqueue) {} +void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {} +#endif + +void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, + grpc_closure *closure, grpc_error *error) { + grpc_exec_ctx_sched(exec_ctx, closure, error, NULL); +} + #endif /* GPR_WINDOWS */ diff --git a/src/core/lib/security/transport/handshake.c b/src/core/lib/security/transport/handshake.c index b374ca7633..540a17283d 100644 --- a/src/core/lib/security/transport/handshake.c +++ b/src/core/lib/security/transport/handshake.c @@ -357,8 +357,9 @@ void grpc_do_security_handshake( gpr_mu_unlock(&server_connector->mu); } send_handshake_bytes_to_peer(exec_ctx, h); - grpc_timer_init(exec_ctx, &h->timer, deadline, on_timeout, h, - gpr_now(deadline.clock_type)); + grpc_timer_init(exec_ctx, &h->timer, + gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), + on_timeout, h, gpr_now(GPR_CLOCK_MONOTONIC)); } void grpc_security_handshake_shutdown(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/security/transport/secure_endpoint.c b/src/core/lib/security/transport/secure_endpoint.c index 7650d68e89..bc50f9d1b0 100644 --- a/src/core/lib/security/transport/secure_endpoint.c +++ b/src/core/lib/security/transport/secure_endpoint.c @@ -360,11 +360,19 @@ static char *endpoint_get_peer(grpc_endpoint *secure_ep) { return grpc_endpoint_get_peer(ep->wrapped_ep); } -static const grpc_endpoint_vtable vtable = { - endpoint_read, endpoint_write, - endpoint_add_to_pollset, endpoint_add_to_pollset_set, - endpoint_shutdown, endpoint_destroy, - endpoint_get_peer}; +static grpc_workqueue *endpoint_get_workqueue(grpc_endpoint *secure_ep) { + secure_endpoint *ep = (secure_endpoint *)secure_ep; + return grpc_endpoint_get_workqueue(ep->wrapped_ep); +} + +static const grpc_endpoint_vtable vtable = {endpoint_read, + endpoint_write, + endpoint_get_workqueue, + endpoint_add_to_pollset, + endpoint_add_to_pollset_set, + endpoint_shutdown, + endpoint_destroy, + endpoint_get_peer}; grpc_endpoint *grpc_secure_endpoint_create( struct tsi_frame_protector *protector, grpc_endpoint *transport, diff --git a/src/core/lib/support/log.c b/src/core/lib/support/log.c index bae0957df7..899f1218b6 100644 --- a/src/core/lib/support/log.c +++ b/src/core/lib/support/log.c @@ -79,17 +79,18 @@ void gpr_set_log_verbosity(gpr_log_severity min_severity_to_print) { void gpr_log_verbosity_init() { char *verbosity = gpr_getenv("GRPC_VERBOSITY"); - if (verbosity == NULL) return; - gpr_atm min_severity_to_print = GPR_LOG_VERBOSITY_UNSET; - if (strcmp(verbosity, "DEBUG") == 0) { - min_severity_to_print = (gpr_atm)GPR_LOG_SEVERITY_DEBUG; - } else if (strcmp(verbosity, "INFO") == 0) { - min_severity_to_print = (gpr_atm)GPR_LOG_SEVERITY_INFO; - } else if (strcmp(verbosity, "ERROR") == 0) { - min_severity_to_print = (gpr_atm)GPR_LOG_SEVERITY_ERROR; + gpr_atm min_severity_to_print = GPR_LOG_SEVERITY_ERROR; + if (verbosity != NULL) { + if (strcmp(verbosity, "DEBUG") == 0) { + min_severity_to_print = (gpr_atm)GPR_LOG_SEVERITY_DEBUG; + } else if (strcmp(verbosity, "INFO") == 0) { + min_severity_to_print = (gpr_atm)GPR_LOG_SEVERITY_INFO; + } else if (strcmp(verbosity, "ERROR") == 0) { + min_severity_to_print = (gpr_atm)GPR_LOG_SEVERITY_ERROR; + } + gpr_free(verbosity); } - gpr_free(verbosity); if ((gpr_atm_no_barrier_load(&g_min_severity_to_print)) == GPR_LOG_VERBOSITY_UNSET) { gpr_atm_no_barrier_store(&g_min_severity_to_print, min_severity_to_print); diff --git a/src/core/lib/support/slice.c b/src/core/lib/support/slice.c index b9a7c77bda..8a2c0a9086 100644 --- a/src/core/lib/support/slice.c +++ b/src/core/lib/support/slice.c @@ -94,14 +94,16 @@ static void new_slice_unref(void *p) { } } -gpr_slice gpr_slice_new(void *p, size_t len, void (*destroy)(void *)) { +gpr_slice gpr_slice_new_with_user_data(void *p, size_t len, + void (*destroy)(void *), + void *user_data) { gpr_slice slice; new_slice_refcount *rc = gpr_malloc(sizeof(new_slice_refcount)); gpr_ref_init(&rc->refs, 1); rc->rc.ref = new_slice_ref; rc->rc.unref = new_slice_unref; rc->user_destroy = destroy; - rc->user_data = p; + rc->user_data = user_data; slice.refcount = &rc->rc; slice.data.refcounted.bytes = p; @@ -109,6 +111,11 @@ gpr_slice gpr_slice_new(void *p, size_t len, void (*destroy)(void *)) { return slice; } +gpr_slice gpr_slice_new(void *p, size_t len, void (*destroy)(void *)) { + /* Pass "p" to *destroy when the slice is no longer needed. */ + return gpr_slice_new_with_user_data(p, len, destroy, p); +} + /* gpr_slice_new_with_len support structures - we create a refcount object extended with the user provided data pointer & destroy function */ typedef struct new_with_len_slice_refcount { diff --git a/src/core/lib/support/time.c b/src/core/lib/support/time.c index 57f8331194..5a7d043aed 100644 --- a/src/core/lib/support/time.c +++ b/src/core/lib/support/time.c @@ -80,103 +80,67 @@ gpr_timespec gpr_inf_past(gpr_clock_type type) { return out; } -/* TODO(ctiller): consider merging _nanos, _micros, _millis into a single - function for maintainability. Similarly for _seconds, _minutes, and _hours */ - -gpr_timespec gpr_time_from_nanos(int64_t ns, gpr_clock_type type) { - gpr_timespec result; - result.clock_type = type; - if (ns == INT64_MAX) { - result = gpr_inf_future(type); - } else if (ns == INT64_MIN) { - result = gpr_inf_past(type); - } else if (ns >= 0) { - result.tv_sec = ns / GPR_NS_PER_SEC; - result.tv_nsec = (int32_t)(ns - result.tv_sec * GPR_NS_PER_SEC); +static gpr_timespec to_seconds_from_sub_second_time(int64_t time_in_units, + int64_t units_per_sec, + gpr_clock_type type) { + gpr_timespec out; + if (time_in_units == INT64_MAX) { + out = gpr_inf_future(type); + } else if (time_in_units == INT64_MIN) { + out = gpr_inf_past(type); } else { - /* Calculation carefully formulated to avoid any possible under/overflow. */ - result.tv_sec = (-(999999999 - (ns + GPR_NS_PER_SEC)) / GPR_NS_PER_SEC) - 1; - result.tv_nsec = (int32_t)(ns - result.tv_sec * GPR_NS_PER_SEC); + if (time_in_units >= 0) { + out.tv_sec = time_in_units / units_per_sec; + } else { + out.tv_sec = (-((units_per_sec - 1) - (time_in_units + units_per_sec)) / + units_per_sec) - + 1; + } + out.tv_nsec = (int32_t)((time_in_units - out.tv_sec * units_per_sec) * + GPR_NS_PER_SEC / units_per_sec); + out.clock_type = type; } - return result; + return out; } -gpr_timespec gpr_time_from_micros(int64_t us, gpr_clock_type type) { - gpr_timespec result; - result.clock_type = type; - if (us == INT64_MAX) { - result = gpr_inf_future(type); - } else if (us == INT64_MIN) { - result = gpr_inf_past(type); - } else if (us >= 0) { - result.tv_sec = us / 1000000; - result.tv_nsec = (int32_t)((us - result.tv_sec * 1000000) * 1000); +static gpr_timespec to_seconds_from_above_second_time(int64_t time_in_units, + int64_t secs_per_unit, + gpr_clock_type type) { + gpr_timespec out; + if (time_in_units >= INT64_MAX / secs_per_unit) { + out = gpr_inf_future(type); + } else if (time_in_units <= INT64_MIN / secs_per_unit) { + out = gpr_inf_past(type); } else { - /* Calculation carefully formulated to avoid any possible under/overflow. */ - result.tv_sec = (-(999999 - (us + 1000000)) / 1000000) - 1; - result.tv_nsec = (int32_t)((us - result.tv_sec * 1000000) * 1000); + out.tv_sec = time_in_units * secs_per_unit; + out.tv_nsec = 0; + out.clock_type = type; } - return result; + return out; +} + +gpr_timespec gpr_time_from_nanos(int64_t ns, gpr_clock_type type) { + return to_seconds_from_sub_second_time(ns, GPR_NS_PER_SEC, type); +} + +gpr_timespec gpr_time_from_micros(int64_t us, gpr_clock_type type) { + return to_seconds_from_sub_second_time(us, GPR_US_PER_SEC, type); } gpr_timespec gpr_time_from_millis(int64_t ms, gpr_clock_type type) { - gpr_timespec result; - result.clock_type = type; - if (ms == INT64_MAX) { - result = gpr_inf_future(type); - } else if (ms == INT64_MIN) { - result = gpr_inf_past(type); - } else if (ms >= 0) { - result.tv_sec = ms / 1000; - result.tv_nsec = (int32_t)((ms - result.tv_sec * 1000) * 1000000); - } else { - /* Calculation carefully formulated to avoid any possible under/overflow. */ - result.tv_sec = (-(999 - (ms + 1000)) / 1000) - 1; - result.tv_nsec = (int32_t)((ms - result.tv_sec * 1000) * 1000000); - } - return result; + return to_seconds_from_sub_second_time(ms, GPR_MS_PER_SEC, type); } gpr_timespec gpr_time_from_seconds(int64_t s, gpr_clock_type type) { - gpr_timespec result; - result.clock_type = type; - if (s == INT64_MAX) { - result = gpr_inf_future(type); - } else if (s == INT64_MIN) { - result = gpr_inf_past(type); - } else { - result.tv_sec = s; - result.tv_nsec = 0; - } - return result; + return to_seconds_from_sub_second_time(s, 1, type); } gpr_timespec gpr_time_from_minutes(int64_t m, gpr_clock_type type) { - gpr_timespec result; - result.clock_type = type; - if (m >= INT64_MAX / 60) { - result = gpr_inf_future(type); - } else if (m <= INT64_MIN / 60) { - result = gpr_inf_past(type); - } else { - result.tv_sec = m * 60; - result.tv_nsec = 0; - } - return result; + return to_seconds_from_above_second_time(m, 60, type); } gpr_timespec gpr_time_from_hours(int64_t h, gpr_clock_type type) { - gpr_timespec result; - result.clock_type = type; - if (h >= INT64_MAX / 3600) { - result = gpr_inf_future(type); - } else if (h <= INT64_MIN / 3600) { - result = gpr_inf_past(type); - } else { - result.tv_sec = h * 3600; - result.tv_nsec = 0; - } - return result; + return to_seconds_from_above_second_time(h, 3600, type); } gpr_timespec gpr_time_add(gpr_timespec a, gpr_timespec b) { diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 32913ad4e8..a482ba43d8 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -73,6 +73,7 @@ typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type; typedef struct requested_call { requested_call_type type; + size_t cq_idx; void *tag; grpc_server *server; grpc_completion_queue *cq_bound_to_call; @@ -206,11 +207,11 @@ struct grpc_server { registered_method *registered_methods; /** one request matcher for unregistered methods */ request_matcher unregistered_request_matcher; - /** free list of available requested_calls indices */ - gpr_stack_lockfree *request_freelist; + /** free list of available requested_calls_per_cq indices */ + gpr_stack_lockfree **request_freelist_per_cq; /** requested call backing data */ - requested_call *requested_calls; - size_t max_requested_calls; + requested_call **requested_calls_per_cq; + int max_requested_calls_per_cq; gpr_atm shutdown_flag; uint8_t shutdown_published; @@ -357,7 +358,8 @@ static void request_matcher_kill_requests(grpc_exec_ctx *exec_ctx, for (size_t i = 0; i < server->cq_count; i++) { while ((request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[i])) != -1) { - fail_call(exec_ctx, server, i, &server->requested_calls[request_id], + fail_call(exec_ctx, server, i, + &server->requested_calls_per_cq[i][request_id], GRPC_ERROR_REF(error)); } } @@ -392,12 +394,16 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) { } for (i = 0; i < server->cq_count; i++) { GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server"); + if (server->started) { + gpr_stack_lockfree_destroy(server->request_freelist_per_cq[i]); + gpr_free(server->requested_calls_per_cq[i]); + } } - gpr_stack_lockfree_destroy(server->request_freelist); + gpr_free(server->request_freelist_per_cq); + gpr_free(server->requested_calls_per_cq); gpr_free(server->cqs); gpr_free(server->pollsets); gpr_free(server->shutdown_tags); - gpr_free(server->requested_calls); gpr_free(server); } @@ -460,11 +466,13 @@ static void done_request_event(grpc_exec_ctx *exec_ctx, void *req, requested_call *rc = req; grpc_server *server = rc->server; - if (rc >= server->requested_calls && - rc < server->requested_calls + server->max_requested_calls) { - GPR_ASSERT(rc - server->requested_calls <= INT_MAX); - gpr_stack_lockfree_push(server->request_freelist, - (int)(rc - server->requested_calls)); + if (rc >= server->requested_calls_per_cq[rc->cq_idx] && + rc < server->requested_calls_per_cq[rc->cq_idx] + + server->max_requested_calls_per_cq) { + GPR_ASSERT(rc - server->requested_calls_per_cq[rc->cq_idx] <= INT_MAX); + gpr_stack_lockfree_push( + server->request_freelist_per_cq[rc->cq_idx], + (int)(rc - server->requested_calls_per_cq[rc->cq_idx])); } else { gpr_free(req); } @@ -540,7 +548,7 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); publish_call(exec_ctx, server, calld, cq_idx, - &server->requested_calls[request_id]); + &server->requested_calls_per_cq[cq_idx][request_id]); return; /* early out */ } } @@ -980,8 +988,6 @@ void grpc_server_register_non_listening_completion_queue( } grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) { - size_t i; - GRPC_API_TRACE("grpc_server_create(%p, %p)", 2, (args, reserved)); grpc_server *server = gpr_malloc(sizeof(grpc_server)); @@ -999,15 +1005,7 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) { &server->root_channel_data; /* TODO(ctiller): expose a channel_arg for this */ - server->max_requested_calls = 32768; - server->request_freelist = - gpr_stack_lockfree_create(server->max_requested_calls); - for (i = 0; i < (size_t)server->max_requested_calls; i++) { - gpr_stack_lockfree_push(server->request_freelist, (int)i); - } - server->requested_calls = gpr_malloc(server->max_requested_calls * - sizeof(*server->requested_calls)); - + server->max_requested_calls_per_cq = 32768; server->channel_args = grpc_channel_args_copy(args); return server; @@ -1067,16 +1065,28 @@ void grpc_server_start(grpc_server *server) { server->started = true; size_t pollset_count = 0; server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count); + server->request_freelist_per_cq = + gpr_malloc(sizeof(*server->request_freelist_per_cq) * server->cq_count); + server->requested_calls_per_cq = + gpr_malloc(sizeof(*server->requested_calls_per_cq) * server->cq_count); for (i = 0; i < server->cq_count; i++) { if (!grpc_cq_is_non_listening_server_cq(server->cqs[i])) { server->pollsets[pollset_count++] = grpc_cq_pollset(server->cqs[i]); } + server->request_freelist_per_cq[i] = + gpr_stack_lockfree_create((size_t)server->max_requested_calls_per_cq); + for (int j = 0; j < server->max_requested_calls_per_cq; j++) { + gpr_stack_lockfree_push(server->request_freelist_per_cq[i], j); + } + server->requested_calls_per_cq[i] = + gpr_malloc((size_t)server->max_requested_calls_per_cq * + sizeof(*server->requested_calls_per_cq[i])); } request_matcher_init(&server->unregistered_request_matcher, - server->max_requested_calls, server); + (size_t)server->max_requested_calls_per_cq, server); for (registered_method *rm = server->registered_methods; rm; rm = rm->next) { - request_matcher_init(&rm->request_matcher, server->max_requested_calls, - server); + request_matcher_init(&rm->request_matcher, + (size_t)server->max_requested_calls_per_cq, server); } for (l = server->listeners; l; l = l->next) { @@ -1308,11 +1318,13 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, GRPC_ERROR_CREATE("Server Shutdown")); return GRPC_CALL_OK; } - request_id = gpr_stack_lockfree_pop(server->request_freelist); + request_id = gpr_stack_lockfree_pop(server->request_freelist_per_cq[cq_idx]); if (request_id == -1) { /* out of request ids: just fail this one */ fail_call(exec_ctx, server, cq_idx, rc, - GRPC_ERROR_CREATE("Server Shutdown")); + grpc_error_set_int(GRPC_ERROR_CREATE("Out of request ids"), + GRPC_ERROR_INT_LIMIT, + server->max_requested_calls_per_cq)); return GRPC_CALL_OK; } switch (rc->type) { @@ -1323,7 +1335,7 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, rm = &rc->data.registered.registered_method->request_matcher; break; } - server->requested_calls[request_id] = *rc; + server->requested_calls_per_cq[cq_idx][request_id] = *rc; gpr_free(rc); if (gpr_stack_lockfree_push(rm->requests_per_cq[cq_idx], request_id)) { /* this was the first queued request: we need to lock and start @@ -1347,7 +1359,7 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); publish_call(exec_ctx, server, calld, cq_idx, - &server->requested_calls[request_id]); + &server->requested_calls_per_cq[cq_idx][request_id]); } gpr_mu_lock(&server->mu_call); } @@ -1383,6 +1395,7 @@ grpc_call_error grpc_server_request_call( } grpc_cq_begin_op(cq_for_notification, tag); details->reserved = NULL; + rc->cq_idx = cq_idx; rc->type = BATCH_CALL; rc->server = server; rc->tag = tag; @@ -1431,6 +1444,7 @@ grpc_call_error grpc_server_request_registered_call( goto done; } grpc_cq_begin_op(cq_for_notification, tag); + rc->cq_idx = cq_idx; rc->type = REGISTERED_CALL; rc->server = server; rc->tag = tag; diff --git a/src/core/lib/surface/version.c b/src/core/lib/surface/version.c index 53f3c43854..1942075054 100644 --- a/src/core/lib/surface/version.c +++ b/src/core/lib/surface/version.c @@ -36,4 +36,4 @@ #include <grpc/grpc.h> -const char *grpc_version_string(void) { return "0.16.0-dev"; } +const char *grpc_version_string(void) { return "1.1.0-dev"; } diff --git a/src/core/lib/transport/connectivity_state.c b/src/core/lib/transport/connectivity_state.c index 054f112127..68d05e3a85 100644 --- a/src/core/lib/transport/connectivity_state.c +++ b/src/core/lib/transport/connectivity_state.c @@ -179,6 +179,9 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx, while ((w = tracker->watchers) != NULL) { *w->current = tracker->current_state; tracker->watchers = w->next; + if (grpc_connectivity_state_trace) { + gpr_log(GPR_DEBUG, "NOTIFY: %p", w->notify); + } grpc_exec_ctx_sched(exec_ctx, w->notify, GRPC_ERROR_REF(tracker->current_error), NULL); gpr_free(w); diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index 08c0a237c9..e33fc5c761 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -138,7 +138,7 @@ typedef struct grpc_transport_stream_op { /** If != GRPC_ERROR_NONE, cancel this stream */ grpc_error *cancel_error; - /** If != GRPC_ERROR, send grpc-status, grpc-message, and close this + /** If != GRPC_ERROR_NONE, send grpc-status, grpc-message, and close this stream for both reading and writing */ grpc_error *close_error; |