From 9f340958f6d131ae7d510270f1f60df7b277e994 Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Mon, 11 Jul 2016 15:15:40 -0700 Subject: Remove unnecessary parsing detail --- src/core/ext/transport/chttp2/transport/chttp2_transport.c | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) (limited to 'src/core/ext/transport/chttp2') diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 38e782b9b4..01a19eca05 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -1764,6 +1764,7 @@ static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx, static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_chttp2_transport *t = arg; + grpc_error *err = GRPC_ERROR_NONE; GPR_TIMER_BEGIN("reading_action.parse", 0); size_t i = 0; grpc_error *errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE, @@ -1772,15 +1773,13 @@ static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg, errors[1] = grpc_chttp2_perform_read(exec_ctx, &t->parsing, t->read_buffer.slices[i]); }; - if (i != t->read_buffer.count) { + if (errors[1] == GRPC_ERROR_NONE) { + err = GRPC_ERROR_REF(error); + } else { errors[2] = try_http_parsing(exec_ctx, t); + err = GRPC_ERROR_CREATE_REFERENCING("Failed parsing HTTP/2", errors, + GPR_ARRAY_SIZE(errors)); } - grpc_error *err = - errors[0] == GRPC_ERROR_NONE && errors[1] == GRPC_ERROR_NONE && - errors[2] == GRPC_ERROR_NONE - ? GRPC_ERROR_NONE - : GRPC_ERROR_CREATE_REFERENCING("Failed parsing HTTP/2", errors, - GPR_ARRAY_SIZE(errors)); for (i = 0; i < GPR_ARRAY_SIZE(errors); i++) { GRPC_ERROR_UNREF(errors[i]); } -- cgit v1.2.3 From 9275d4067cfd9233edf2ae9a0fe1255c8c730f19 Mon Sep 17 00:00:00 2001 From: yang-g Date: Mon, 11 Jul 2016 16:51:39 -0700 Subject: Shutdown the listeners early when destroying the tcp_server --- .../ext/transport/chttp2/server/insecure/server_chttp2.c | 1 + .../transport/chttp2/server/secure/server_secure_chttp2.c | 1 + src/core/lib/iomgr/tcp_server.h | 4 ++++ src/core/lib/iomgr/tcp_server_posix.c | 13 +++++++++++++ src/core/lib/iomgr/tcp_server_windows.c | 3 +++ 5 files changed, 22 insertions(+) (limited to 'src/core/ext/transport/chttp2') diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c index e5c987925c..054f15c8ff 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c @@ -74,6 +74,7 @@ static void start(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp, static void destroy(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp, grpc_closure *destroy_done) { grpc_tcp_server *tcp = tcpp; + grpc_tcp_server_shutdown_listeners(exec_ctx, tcp); grpc_tcp_server_unref(exec_ctx, tcp); grpc_exec_ctx_sched(exec_ctx, destroy_done, GRPC_ERROR_NONE, NULL); } diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c index c42810e913..7131cf5972 100644 --- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c +++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c @@ -166,6 +166,7 @@ static void destroy(grpc_exec_ctx *exec_ctx, grpc_server *server, void *statep, state->destroy_callback = callback; tcp = state->tcp; gpr_mu_unlock(&state->mu); + grpc_tcp_server_shutdown_listeners(exec_ctx, tcp); grpc_tcp_server_unref(exec_ctx, tcp); } diff --git a/src/core/lib/iomgr/tcp_server.h b/src/core/lib/iomgr/tcp_server.h index 875a6ca14a..5a25d39a0c 100644 --- a/src/core/lib/iomgr/tcp_server.h +++ b/src/core/lib/iomgr/tcp_server.h @@ -105,4 +105,8 @@ void grpc_tcp_server_shutdown_starting_add(grpc_tcp_server *s, a call (exec_ctx!=NULL) to shutdown_complete. */ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s); +/* Shutdown the fds of listeners. */ +void grpc_tcp_server_shutdown_listeners(grpc_exec_ctx *exec_ctx, + grpc_tcp_server *s); + #endif /* GRPC_CORE_LIB_IOMGR_TCP_SERVER_H */ diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c index d3803c3bd0..7b713723ce 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.c @@ -740,4 +740,17 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { } } +void grpc_tcp_server_shutdown_listeners(grpc_exec_ctx *exec_ctx, + grpc_tcp_server *s) { + gpr_mu_lock(&s->mu); + /* shutdown all fd's */ + if (s->active_ports) { + grpc_tcp_listener *sp; + for (sp = s->head; sp; sp = sp->next) { + grpc_fd_shutdown(exec_ctx, sp->emfd); + } + } + gpr_mu_unlock(&s->mu); +} + #endif diff --git a/src/core/lib/iomgr/tcp_server_windows.c b/src/core/lib/iomgr/tcp_server_windows.c index 7b0966704c..1b125e7005 100644 --- a/src/core/lib/iomgr/tcp_server_windows.c +++ b/src/core/lib/iomgr/tcp_server_windows.c @@ -540,4 +540,7 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, gpr_mu_unlock(&s->mu); } +void grpc_tcp_server_shutdown_listeners(grpc_exec_ctx *exec_ctx, + grpc_tcp_server *s) {} + #endif /* GPR_WINSOCK_SOCKET */ -- cgit v1.2.3 From dba5d2708c6d59a73b65d378440dc64b3af2b873 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 14 Jul 2016 13:45:05 -0700 Subject: Use handshake_manager API in client and server. --- .../chttp2/client/insecure/channel_create.c | 34 +++++++++++++----- .../chttp2/client/secure/secure_channel_create.c | 28 +++++++++++---- .../chttp2/server/insecure/server_chttp2.c | 42 ++++++++++++++++++---- .../chttp2/server/secure/server_secure_chttp2.c | 32 ++++++++++++++--- src/core/lib/channel/handshaker.h | 2 ++ 5 files changed, 111 insertions(+), 27 deletions(-) (limited to 'src/core/ext/transport/chttp2') diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c index 85f9efb3b6..648a9d90a5 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c @@ -45,6 +45,7 @@ #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/compress_filter.h" +#include "src/core/lib/channel/handshaker.h" #include "src/core/lib/channel/http_client_filter.h" #include "src/core/lib/iomgr/tcp_client.h" #include "src/core/lib/surface/api_trace.h" @@ -63,6 +64,8 @@ typedef struct { grpc_endpoint *tcp; grpc_closure connected; + + grpc_handshake_manager *handshake_mgr; } connector; static void connector_ref(grpc_connector *con) { @@ -74,6 +77,7 @@ static void connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *con) { connector *c = (connector *)con; if (gpr_unref(&c->refs)) { /* c->initial_string_buffer does not need to be destroyed */ + grpc_handshake_manager_destroy(exec_ctx, c->handshake_mgr); gpr_free(c); } } @@ -83,6 +87,19 @@ static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg, connector_unref(exec_ctx, arg); } +static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, + void *arg) { + connector *c = arg; + c->result->transport = + grpc_create_chttp2_transport(exec_ctx, c->args.channel_args, endpoint, 1); + GPR_ASSERT(c->result->transport); + grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, NULL, 0); + c->result->channel_args = grpc_channel_args_copy(c->args.channel_args); + grpc_closure *notify = c->notify; + c->notify = NULL; + grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_NONE, NULL); +} + static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { connector *c = arg; grpc_closure *notify; @@ -97,19 +114,17 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { connector_ref(arg); grpc_endpoint_write(exec_ctx, tcp, &c->initial_string_buffer, &c->initial_string_sent); + } else { + grpc_handshake_manager_do_handshake(exec_ctx, c->handshake_mgr, tcp, + c->args.deadline, on_handshake_done, + c); } - c->result->transport = - grpc_create_chttp2_transport(exec_ctx, c->args.channel_args, tcp, 1); - grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, NULL, - 0); - GPR_ASSERT(c->result->transport); - c->result->channel_args = grpc_channel_args_copy(c->args.channel_args); } else { memset(c->result, 0, sizeof(*c->result)); + notify = c->notify; + c->notify = NULL; + grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(error), NULL); } - notify = c->notify; - c->notify = NULL; - grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(error), NULL); } static void connector_shutdown(grpc_exec_ctx *exec_ctx, grpc_connector *con) {} @@ -171,6 +186,7 @@ static grpc_subchannel *client_channel_factory_create_subchannel( memset(c, 0, sizeof(*c)); c->base.vtable = &connector_vtable; gpr_ref_init(&c->refs, 1); + c->handshake_mgr = grpc_handshake_manager_create(); args->args = final_args; s = grpc_subchannel_create(exec_ctx, &c->base, args); grpc_connector_unref(exec_ctx, &c->base); diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c index 721ba82d8f..30477ded1a 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c @@ -44,6 +44,7 @@ #include "src/core/ext/client_config/resolver_registry.h" #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/handshaker.h" #include "src/core/lib/iomgr/tcp_client.h" #include "src/core/lib/security/context/security_context.h" #include "src/core/lib/security/credentials/credentials.h" @@ -69,6 +70,8 @@ typedef struct { grpc_endpoint *newly_connecting_endpoint; grpc_closure connected_closure; + + grpc_handshake_manager *handshake_mgr; } connector; static void connector_ref(grpc_connector *con) { @@ -80,6 +83,7 @@ static void connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *con) { connector *c = (connector *)con; if (gpr_unref(&c->refs)) { /* c->initial_string_buffer does not need to be destroyed */ + grpc_handshake_manager_destroy(exec_ctx, c->handshake_mgr); gpr_free(c); } } @@ -116,12 +120,23 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_NONE, NULL); } +static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, + void *arg) { + connector *c = arg; + // TODO(roth, jboeuf): Convert security connector handshaking to use new + // handshake API, and then move the code from on_secure_handshake_done() + // into this function. + grpc_channel_security_connector_do_handshake(exec_ctx, c->security_connector, + endpoint, c->args.deadline, + on_secure_handshake_done, c); +} + static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { connector *c = arg; - grpc_channel_security_connector_do_handshake( - exec_ctx, c->security_connector, c->connecting_endpoint, c->args.deadline, - on_secure_handshake_done, c); + grpc_handshake_manager_do_handshake(exec_ctx, c->handshake_mgr, + c->connecting_endpoint, c->args.deadline, + on_handshake_done, c); } static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { @@ -142,9 +157,9 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_endpoint_write(exec_ctx, tcp, &c->initial_string_buffer, &c->initial_string_sent); } else { - grpc_channel_security_connector_do_handshake( - exec_ctx, c->security_connector, tcp, c->args.deadline, - on_secure_handshake_done, c); + grpc_handshake_manager_do_handshake(exec_ctx, c->handshake_mgr, tcp, + c->args.deadline, on_handshake_done, + c); } } else { memset(c->result, 0, sizeof(*c->result)); @@ -227,6 +242,7 @@ static grpc_subchannel *client_channel_factory_create_subchannel( memset(c, 0, sizeof(*c)); c->base.vtable = &connector_vtable; c->security_connector = f->security_connector; + c->handshake_mgr = grpc_handshake_manager_create(); gpr_mu_init(&c->mu); gpr_ref_init(&c->refs, 1); args->args = final_args; diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c index e5c987925c..d3748f576d 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c @@ -38,15 +38,23 @@ #include #include #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" +#include "src/core/lib/channel/handshaker.h" #include "src/core/lib/channel/http_server_filter.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/tcp_server.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/server.h" -static void new_transport(grpc_exec_ctx *exec_ctx, void *server, - grpc_endpoint *tcp, grpc_pollset *accepting_pollset, - grpc_tcp_server_acceptor *acceptor) { +typedef struct server_connect_state { + grpc_server *server; + grpc_pollset *accepting_pollset; + grpc_tcp_server_acceptor *acceptor; + grpc_handshake_manager *handshake_mgr; +} server_connect_state; + +static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, + void *arg) { + server_connect_state *state = arg; /* * Beware that the call to grpc_create_chttp2_transport() has to happen before * grpc_tcp_server_destroy(). This is fine here, but similar code @@ -55,17 +63,37 @@ static void new_transport(grpc_exec_ctx *exec_ctx, void *server, * case. */ grpc_transport *transport = grpc_create_chttp2_transport( - exec_ctx, grpc_server_get_channel_args(server), tcp, 0); - grpc_server_setup_transport(exec_ctx, server, transport, accepting_pollset, - grpc_server_get_channel_args(server)); + exec_ctx, grpc_server_get_channel_args(state->server), endpoint, 0); + grpc_server_setup_transport(exec_ctx, state->server, transport, + state->accepting_pollset, + grpc_server_get_channel_args(state->server)); grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL, 0); + // Clean up. + grpc_handshake_manager_destroy(exec_ctx, state->handshake_mgr); + gpr_free(state); +} + +static void on_accept(grpc_exec_ctx *exec_ctx, void *server, grpc_endpoint *tcp, + grpc_pollset *accepting_pollset, + grpc_tcp_server_acceptor *acceptor) { + server_connect_state *state = gpr_malloc(sizeof(server_connect_state)); + state->server = server; + state->accepting_pollset = accepting_pollset; + state->acceptor = acceptor; + state->handshake_mgr = grpc_handshake_manager_create(); + // TODO(roth): We should really get this timeout value from channel + // args instead of hard-coding it. + const gpr_timespec deadline = gpr_time_add( + gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(120, GPR_TIMESPAN)); + grpc_handshake_manager_do_handshake(exec_ctx, state->handshake_mgr, tcp, + deadline, on_handshake_done, state); } /* Server callback: start listening on our ports */ static void start(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp, grpc_pollset **pollsets, size_t pollset_count) { grpc_tcp_server *tcp = tcpp; - grpc_tcp_server_start(exec_ctx, tcp, pollsets, pollset_count, new_transport, + grpc_tcp_server_start(exec_ctx, tcp, pollsets, pollset_count, on_accept, server); } diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c index c42810e913..57931bdb43 100644 --- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c +++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c @@ -42,6 +42,7 @@ #include #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/handshaker.h" #include "src/core/lib/channel/http_server_filter.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/resolve_address.h" @@ -68,6 +69,10 @@ typedef struct server_secure_state { typedef struct server_secure_connect { server_secure_state *state; grpc_pollset *accepting_pollset; + grpc_tcp_server_acceptor *acceptor; + gpr_timespec deadline; // FIXME: remove when we eliminate + // grpc_server_security_connector_do_handshake() + grpc_handshake_manager *handshake_mgr; } server_secure_connect; static void state_ref(server_secure_state *state) { gpr_ref(&state->refcount); } @@ -122,6 +127,19 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *statep, gpr_free(state); } +static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, + void *arg) { + server_secure_connect *state = arg; + grpc_handshake_manager_destroy(exec_ctx, state->handshake_mgr); + state->handshake_mgr = NULL; + // TODO(roth, jboeuf): Convert security connector handshaking to use new + // handshake API, and then move the code from on_secure_handshake_done() + // into this function. + grpc_server_security_connector_do_handshake( + exec_ctx, state->state->sc, state->acceptor, endpoint, state->deadline, + on_secure_handshake_done, state); +} + static void on_accept(grpc_exec_ctx *exec_ctx, void *statep, grpc_endpoint *tcp, grpc_pollset *accepting_pollset, grpc_tcp_server_acceptor *acceptor) { @@ -129,11 +147,15 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *statep, grpc_endpoint *tcp, state->state = statep; state_ref(state->state); state->accepting_pollset = accepting_pollset; - grpc_server_security_connector_do_handshake( - exec_ctx, state->state->sc, acceptor, tcp, - gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), - gpr_time_from_seconds(120, GPR_TIMESPAN)), - on_secure_handshake_done, state); + state->acceptor = acceptor; + // TODO(roth): We should really get this timeout value from channel + // args instead of hard-coding it. + state->deadline = gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), + gpr_time_from_seconds(120, GPR_TIMESPAN)); + state->handshake_mgr = grpc_handshake_manager_create(); + grpc_handshake_manager_do_handshake(exec_ctx, state->handshake_mgr, tcp, + state->deadline, on_handshake_done, + state); } /* Server callback: start listening on our ports */ diff --git a/src/core/lib/channel/handshaker.h b/src/core/lib/channel/handshaker.h index 8cb6fdbc80..9d084f0717 100644 --- a/src/core/lib/channel/handshaker.h +++ b/src/core/lib/channel/handshaker.h @@ -40,6 +40,8 @@ #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/exec_ctx.h" +// FIXME: high-level documentation + // // grpc_handshaker -- API for initial handshaking for a new connection // -- cgit v1.2.3 From 45015dc8dac51979fe7bffdc20a92e895e57180e Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Fri, 15 Jul 2016 08:48:25 -0700 Subject: Plumb channel args through handshakers. --- .../chttp2/client/insecure/channel_create.c | 12 ++-- .../chttp2/client/secure/secure_channel_create.c | 21 ++++--- .../chttp2/server/insecure/server_chttp2.c | 10 +++- .../chttp2/server/secure/server_secure_chttp2.c | 31 +++++----- src/core/lib/channel/channel_args.h | 2 + src/core/lib/channel/handshaker.c | 35 ++++++----- src/core/lib/channel/handshaker.h | 67 ++++++++++++++++------ 7 files changed, 116 insertions(+), 62 deletions(-) (limited to 'src/core/ext/transport/chttp2') diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c index 648a9d90a5..154c4493ff 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c @@ -88,13 +88,13 @@ static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg, } static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, - void *arg) { - connector *c = arg; + grpc_channel_args* args, void *user_data) { + connector *c = user_data; c->result->transport = - grpc_create_chttp2_transport(exec_ctx, c->args.channel_args, endpoint, 1); + grpc_create_chttp2_transport(exec_ctx, args, endpoint, 1); GPR_ASSERT(c->result->transport); grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, NULL, 0); - c->result->channel_args = grpc_channel_args_copy(c->args.channel_args); + c->result->channel_args = args; grpc_closure *notify = c->notify; c->notify = NULL; grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_NONE, NULL); @@ -102,7 +102,6 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { connector *c = arg; - grpc_closure *notify; grpc_endpoint *tcp = c->tcp; if (tcp != NULL) { if (!GPR_SLICE_IS_EMPTY(c->args.initial_connect_string)) { @@ -116,12 +115,13 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { &c->initial_string_sent); } else { grpc_handshake_manager_do_handshake(exec_ctx, c->handshake_mgr, tcp, + c->args.channel_args, c->args.deadline, on_handshake_done, c); } } else { memset(c->result, 0, sizeof(*c->result)); - notify = c->notify; + grpc_closure *notify = c->notify; c->notify = NULL; grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(error), NULL); } diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c index 30477ded1a..f071e31cb3 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c @@ -72,6 +72,9 @@ typedef struct { grpc_closure connected_closure; grpc_handshake_manager *handshake_mgr; + + // TODO(roth): Remove once we eliminate on_secure_handshake_done(). + grpc_channel_args* tmp_args; } connector; static void connector_ref(grpc_connector *con) { @@ -83,6 +86,7 @@ static void connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *con) { connector *c = (connector *)con; if (gpr_unref(&c->refs)) { /* c->initial_string_buffer does not need to be destroyed */ + grpc_channel_args_destroy(c->tmp_args); grpc_handshake_manager_destroy(exec_ctx, c->handshake_mgr); gpr_free(c); } @@ -93,7 +97,6 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *secure_endpoint, grpc_auth_context *auth_context) { connector *c = arg; - grpc_closure *notify; gpr_mu_lock(&c->mu); if (c->connecting_endpoint == NULL) { memset(c->result, 0, sizeof(*c->result)); @@ -113,19 +116,20 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, 0); auth_context_arg = grpc_auth_context_to_arg(auth_context); c->result->channel_args = grpc_channel_args_copy_and_add( - c->args.channel_args, &auth_context_arg, 1); + c->tmp_args, &auth_context_arg, 1); } - notify = c->notify; + grpc_closure *notify = c->notify; c->notify = NULL; grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_NONE, NULL); } static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, - void *arg) { - connector *c = arg; + grpc_channel_args* args, void *user_data) { + connector *c = user_data; // TODO(roth, jboeuf): Convert security connector handshaking to use new // handshake API, and then move the code from on_secure_handshake_done() // into this function. + c->tmp_args = args; grpc_channel_security_connector_do_handshake(exec_ctx, c->security_connector, endpoint, c->args.deadline, on_secure_handshake_done, c); @@ -135,13 +139,13 @@ static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { connector *c = arg; grpc_handshake_manager_do_handshake(exec_ctx, c->handshake_mgr, - c->connecting_endpoint, c->args.deadline, + c->connecting_endpoint, + c->args.channel_args, c->args.deadline, on_handshake_done, c); } static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { connector *c = arg; - grpc_closure *notify; grpc_endpoint *tcp = c->newly_connecting_endpoint; if (tcp != NULL) { gpr_mu_lock(&c->mu); @@ -158,12 +162,13 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { &c->initial_string_sent); } else { grpc_handshake_manager_do_handshake(exec_ctx, c->handshake_mgr, tcp, + c->args.channel_args, c->args.deadline, on_handshake_done, c); } } else { memset(c->result, 0, sizeof(*c->result)); - notify = c->notify; + grpc_closure *notify = c->notify; c->notify = NULL; grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(error), NULL); } diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c index d3748f576d..920875f694 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c @@ -37,7 +37,9 @@ #include #include #include + #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/handshaker.h" #include "src/core/lib/channel/http_server_filter.h" #include "src/core/lib/iomgr/resolve_address.h" @@ -53,8 +55,8 @@ typedef struct server_connect_state { } server_connect_state; static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, - void *arg) { - server_connect_state *state = arg; + grpc_channel_args* args, void *user_data) { + server_connect_state *state = user_data; /* * Beware that the call to grpc_create_chttp2_transport() has to happen before * grpc_tcp_server_destroy(). This is fine here, but similar code @@ -63,12 +65,13 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, * case. */ grpc_transport *transport = grpc_create_chttp2_transport( - exec_ctx, grpc_server_get_channel_args(state->server), endpoint, 0); + exec_ctx, args, endpoint, 0); grpc_server_setup_transport(exec_ctx, state->server, transport, state->accepting_pollset, grpc_server_get_channel_args(state->server)); grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL, 0); // Clean up. + grpc_channel_args_destroy(args); grpc_handshake_manager_destroy(exec_ctx, state->handshake_mgr); gpr_free(state); } @@ -86,6 +89,7 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *server, grpc_endpoint *tcp, const gpr_timespec deadline = gpr_time_add( gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(120, GPR_TIMESPAN)); grpc_handshake_manager_do_handshake(exec_ctx, state->handshake_mgr, tcp, + grpc_server_get_channel_args(server), deadline, on_handshake_done, state); } diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c index 57931bdb43..e3184bc1f9 100644 --- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c +++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c @@ -70,9 +70,11 @@ typedef struct server_secure_connect { server_secure_state *state; grpc_pollset *accepting_pollset; grpc_tcp_server_acceptor *acceptor; - gpr_timespec deadline; // FIXME: remove when we eliminate - // grpc_server_security_connector_do_handshake() grpc_handshake_manager *handshake_mgr; + // TODO(roth): Remove the following two fields when we eliminate + // grpc_server_security_connector_do_handshake(). + gpr_timespec deadline; + grpc_channel_args* args; } server_secure_connect; static void state_ref(server_secure_state *state) { gpr_ref(&state->refcount); } @@ -102,13 +104,11 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *statep, transport = grpc_create_chttp2_transport( exec_ctx, grpc_server_get_channel_args(state->state->server), secure_endpoint, 0); - grpc_channel_args *args_copy; grpc_arg args_to_add[2]; args_to_add[0] = grpc_server_credentials_to_arg(state->state->creds); args_to_add[1] = grpc_auth_context_to_arg(auth_context); - args_copy = grpc_channel_args_copy_and_add( - grpc_server_get_channel_args(state->state->server), args_to_add, - GPR_ARRAY_SIZE(args_to_add)); + grpc_channel_args *args_copy = grpc_channel_args_copy_and_add( + state->args, args_to_add, GPR_ARRAY_SIZE(args_to_add)); grpc_server_setup_transport(exec_ctx, state->state->server, transport, state->accepting_pollset, args_copy); grpc_channel_args_destroy(args_copy); @@ -123,18 +123,20 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *statep, } else { gpr_log(GPR_ERROR, "Secure transport failed with error %d", status); } + grpc_channel_args_destroy(state->args); state_unref(state->state); gpr_free(state); } static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, - void *arg) { - server_secure_connect *state = arg; - grpc_handshake_manager_destroy(exec_ctx, state->handshake_mgr); - state->handshake_mgr = NULL; + grpc_channel_args* args, void *user_data) { + server_secure_connect *state = user_data; // TODO(roth, jboeuf): Convert security connector handshaking to use new // handshake API, and then move the code from on_secure_handshake_done() // into this function. + grpc_handshake_manager_destroy(exec_ctx, state->handshake_mgr); + state->handshake_mgr = NULL; + state->args = args; grpc_server_security_connector_do_handshake( exec_ctx, state->state->sc, state->acceptor, endpoint, state->deadline, on_secure_handshake_done, state); @@ -148,14 +150,15 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *statep, grpc_endpoint *tcp, state_ref(state->state); state->accepting_pollset = accepting_pollset; state->acceptor = acceptor; + state->handshake_mgr = grpc_handshake_manager_create(); // TODO(roth): We should really get this timeout value from channel // args instead of hard-coding it. state->deadline = gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(120, GPR_TIMESPAN)); - state->handshake_mgr = grpc_handshake_manager_create(); - grpc_handshake_manager_do_handshake(exec_ctx, state->handshake_mgr, tcp, - state->deadline, on_handshake_done, - state); + grpc_handshake_manager_do_handshake( + exec_ctx, state->handshake_mgr, tcp, + grpc_server_get_channel_args(state->state->server), state->deadline, + on_handshake_done, state); } /* Server callback: start listening on our ports */ diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h index 653d04f427..aec61ee7c6 100644 --- a/src/core/lib/channel/channel_args.h +++ b/src/core/lib/channel/channel_args.h @@ -37,6 +37,8 @@ #include #include +// Channel args are intentionally immutable, to avoid the need for locking. + /** Copy the arguments in \a src into a new instance */ grpc_channel_args *grpc_channel_args_copy(const grpc_channel_args *src); diff --git a/src/core/lib/channel/handshaker.c b/src/core/lib/channel/handshaker.c index 5b8c08bc15..7dcbe1df9c 100644 --- a/src/core/lib/channel/handshaker.c +++ b/src/core/lib/channel/handshaker.c @@ -36,6 +36,7 @@ #include #include +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/handshaker.h" // @@ -60,10 +61,11 @@ void grpc_handshaker_shutdown(grpc_exec_ctx* exec_ctx, void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker, grpc_endpoint* endpoint, + grpc_channel_args* args, gpr_timespec deadline, - grpc_handshaker_done_cb cb, void* arg) { - handshaker->vtable->do_handshake(exec_ctx, handshaker, endpoint, deadline, cb, - arg); + grpc_handshaker_done_cb cb, void* user_data) { + handshaker->vtable->do_handshake(exec_ctx, handshaker, endpoint, args, + deadline, cb, user_data); } // @@ -76,9 +78,9 @@ struct grpc_handshaker_state { size_t index; // The deadline for all handshakers. gpr_timespec deadline; - // The final callback and arg to invoke after the last handshaker. + // The final callback and user_data to invoke after the last handshaker. grpc_handshaker_done_cb final_cb; - void* final_arg; + void* final_user_data; }; struct grpc_handshake_manager { @@ -126,20 +128,23 @@ void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx, // A function used as the handshaker-done callback when chaining // handshakers together. static void call_next_handshaker(grpc_exec_ctx* exec_ctx, - grpc_endpoint* endpoint, void* arg) { - grpc_handshake_manager* mgr = arg; + grpc_endpoint* endpoint, + grpc_channel_args* args, + void* user_data) { + grpc_handshake_manager* mgr = user_data; GPR_ASSERT(mgr->state != NULL); GPR_ASSERT(mgr->state->index < mgr->count); grpc_handshaker_done_cb cb = call_next_handshaker; // If this is the last handshaker, use the caller-supplied callback - // and arg instead of chaining back to this function again. + // and user_data instead of chaining back to this function again. if (mgr->state->index == mgr->count - 1) { cb = mgr->state->final_cb; - arg = mgr->state->final_arg; + user_data = mgr->state->final_user_data; } // Invoke handshaker. grpc_handshaker_do_handshake(exec_ctx, mgr->handshakers[mgr->state->index], - endpoint, mgr->state->deadline, cb, arg); + endpoint, args, mgr->state->deadline, cb, + user_data); ++mgr->state->index; // If this is the last handshaker, clean up state. if (mgr->state->index == mgr->count) { @@ -151,20 +156,22 @@ static void call_next_handshaker(grpc_exec_ctx* exec_ctx, void grpc_handshake_manager_do_handshake(grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr, grpc_endpoint* endpoint, + const grpc_channel_args* args, gpr_timespec deadline, grpc_handshaker_done_cb cb, - void* arg) { + void* user_data) { + grpc_channel_args* args_copy = grpc_channel_args_copy(args); if (mgr->count == 0) { // No handshakers registered, so we just immediately call the done // callback with the passed-in endpoint. - cb(exec_ctx, endpoint, arg); + cb(exec_ctx, endpoint, args_copy, user_data); } else { GPR_ASSERT(mgr->state == NULL); mgr->state = gpr_malloc(sizeof(struct grpc_handshaker_state)); memset(mgr->state, 0, sizeof(*mgr->state)); mgr->state->deadline = deadline; mgr->state->final_cb = cb; - mgr->state->final_arg = arg; - call_next_handshaker(exec_ctx, endpoint, mgr); + mgr->state->final_user_data = user_data; + call_next_handshaker(exec_ctx, endpoint, args_copy, mgr); } } diff --git a/src/core/lib/channel/handshaker.h b/src/core/lib/channel/handshaker.h index 9d084f0717..6a39529150 100644 --- a/src/core/lib/channel/handshaker.h +++ b/src/core/lib/channel/handshaker.h @@ -34,44 +34,62 @@ #ifndef GRPC_CORE_LIB_CHANNEL_HANDSHAKER_H #define GRPC_CORE_LIB_CHANNEL_HANDSHAKER_H +#include #include #include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/exec_ctx.h" -// FIXME: high-level documentation +/// Handshakers are used to perform initial handshakes on a connection +/// before the client sends the initial request. Some examples of what +/// a handshaker can be used for includes support for HTTP CONNECT on +/// the client side and various types of security initialization. +/// +/// In general, handshakers should be used via a handshake manager. -// -// grpc_handshaker -- API for initial handshaking for a new connection -// - -// FIXME: document +/// +/// grpc_handshaker +/// typedef struct grpc_handshaker grpc_handshaker; +/// Callback type invoked when a handshaker is done. +/// Takes ownership of \a args. typedef void (*grpc_handshaker_done_cb)(grpc_exec_ctx* exec_ctx, - grpc_endpoint* endpoint, void* arg); + grpc_endpoint* endpoint, + grpc_channel_args* args, + void* user_data); struct grpc_handshaker_vtable { + /// Destroys the handshaker. void (*destroy)(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker); + /// Shuts down the handshaker (e.g., to clean up when the operation is + /// aborted in the middle). void (*shutdown)(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker); + /// Performs handshaking. When finished, calls \a cb with \a user_data. + /// Takes ownership of \a args. void (*do_handshake)(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker, - grpc_endpoint* endpoint, gpr_timespec deadline, - grpc_handshaker_done_cb cb, void* arg); + grpc_endpoint* endpoint, grpc_channel_args* args, + gpr_timespec deadline, grpc_handshaker_done_cb cb, + void* user_data); }; +/// Base struct. To subclass, make this the first member of the +/// implementation struct. struct grpc_handshaker { const struct grpc_handshaker_vtable* vtable; }; -// Called by concrete implementations to initialize the base struct. +/// Called by concrete implementations to initialize the base struct. void grpc_handshaker_init(const struct grpc_handshaker_vtable* vtable, grpc_handshaker* handshaker); -// Convenient wrappers for invoking methods via the vtable. +/// Convenient wrappers for invoking methods via the vtable. +/// These probably do not need to be called from anywhere but +/// grpc_handshake_manager. void grpc_handshaker_destroy(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker); void grpc_handshaker_shutdown(grpc_exec_ctx* exec_ctx, @@ -79,31 +97,46 @@ void grpc_handshaker_shutdown(grpc_exec_ctx* exec_ctx, void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker, grpc_endpoint* endpoint, + grpc_channel_args* args, gpr_timespec deadline, - grpc_handshaker_done_cb cb, void* arg); + grpc_handshaker_done_cb cb, void* user_data); -// -// grpc_handshake_manager -- manages a set of handshakers -// +/// +/// grpc_handshake_manager +/// typedef struct grpc_handshake_manager grpc_handshake_manager; +/// Creates a new handshake manager. Caller takes ownership. grpc_handshake_manager* grpc_handshake_manager_create(); -// Handshakers will be invoked in the order added. +/// Adds a handshaker to the handshake manager. +/// Takes ownership of \a mgr. void grpc_handshake_manager_add(grpc_handshaker* handshaker, grpc_handshake_manager* mgr); +/// Destroys the handshake manager. void grpc_handshake_manager_destroy(grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr); +/// Shuts down the handshake manager (e.g., to clean up when the operation is +/// aborted in the middle). +/// The caller must still call grpc_handshake_manager_destroy() after +/// calling this function. void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr); +/// Invokes handshakers in the order they were added. +/// Does NOT take ownership of \a args. Instead, makes a copy before +/// invoking the first handshaker. +/// If successful, invokes \a cb with \a user_data after all handshakers +/// have completed. void grpc_handshake_manager_do_handshake(grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr, grpc_endpoint* endpoint, + const grpc_channel_args* args, gpr_timespec deadline, - grpc_handshaker_done_cb cb, void* arg); + grpc_handshaker_done_cb cb, + void* user_data); #endif /* GRPC_CORE_LIB_CHANNEL_HANDSHAKER_H */ -- cgit v1.2.3 From 5ca7e47493899b69126a2ef331936bcba37ee545 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 19 Jul 2016 09:25:49 -0700 Subject: Merge pull request #6737 from ctiller/delayed-write Delay beginning most writes until we enter poll() --- Makefile | 36 -- build.yaml | 14 - .../chttp2/client/secure/secure_channel_create.c | 6 +- .../ext/transport/chttp2/transport/chttp2_plugin.c | 3 + .../transport/chttp2/transport/chttp2_transport.c | 369 +++++++++++++++++---- src/core/ext/transport/chttp2/transport/internal.h | 65 ++-- src/core/ext/transport/chttp2/transport/parsing.c | 11 +- .../ext/transport/chttp2/transport/stream_lists.c | 20 +- src/core/ext/transport/chttp2/transport/writing.c | 16 +- src/core/lib/iomgr/endpoint.c | 4 + src/core/lib/iomgr/endpoint.h | 4 + src/core/lib/iomgr/ev_epoll_linux.c | 277 +++++++++------- src/core/lib/iomgr/ev_poll_and_epoll_posix.c | 3 + src/core/lib/iomgr/ev_poll_posix.c | 3 + src/core/lib/iomgr/ev_posix.c | 4 + src/core/lib/iomgr/ev_posix.h | 4 + src/core/lib/iomgr/exec_ctx.c | 10 +- src/core/lib/iomgr/exec_ctx.h | 6 +- src/core/lib/iomgr/iomgr.c | 3 + src/core/lib/iomgr/network_status_tracker.c | 15 +- src/core/lib/iomgr/network_status_tracker.h | 4 + src/core/lib/iomgr/tcp_posix.c | 18 +- src/core/lib/iomgr/tcp_server_posix.c | 3 +- src/core/lib/iomgr/tcp_windows.c | 13 +- src/core/lib/iomgr/workqueue.h | 39 ++- src/core/lib/iomgr/workqueue_posix.c | 8 +- src/core/lib/iomgr/workqueue_posix.h | 5 + src/core/lib/iomgr/workqueue_windows.c | 22 ++ src/core/lib/security/transport/secure_endpoint.c | 18 +- src/core/lib/surface/server.c | 76 +++-- src/core/lib/transport/connectivity_state.c | 3 + test/core/end2end/tests/high_initial_seqno.c | 6 + test/core/end2end/tests/network_status_change.c | 5 +- test/core/internal_api_canaries/iomgr.c | 13 +- test/core/iomgr/workqueue_test.c | 150 --------- test/core/util/mock_endpoint.c | 12 +- test/core/util/passthru_endpoint.c | 12 +- test/cpp/end2end/end2end_test.cc | 3 + test/cpp/qps/client_async.cc | 35 +- test/cpp/qps/gen_build_yaml.py | 11 +- test/cpp/qps/json_run_localhost.cc | 2 +- test/cpp/qps/server_async.cc | 5 +- tools/dockerfile/test/python_pyenv_x64/Dockerfile | 112 +++++++ tools/run_tests/run_tests.py | 28 +- tools/run_tests/sources_and_headers.json | 16 - tools/run_tests/tests.json | 131 ++++---- 46 files changed, 1001 insertions(+), 622 deletions(-) delete mode 100644 test/core/iomgr/workqueue_test.c create mode 100644 tools/dockerfile/test/python_pyenv_x64/Dockerfile (limited to 'src/core/ext/transport/chttp2') diff --git a/Makefile b/Makefile index 992eee102b..ed362f9752 100644 --- a/Makefile +++ b/Makefile @@ -991,7 +991,6 @@ transport_security_test: $(BINDIR)/$(CONFIG)/transport_security_test udp_server_test: $(BINDIR)/$(CONFIG)/udp_server_test uri_fuzzer_test: $(BINDIR)/$(CONFIG)/uri_fuzzer_test uri_parser_test: $(BINDIR)/$(CONFIG)/uri_parser_test -workqueue_test: $(BINDIR)/$(CONFIG)/workqueue_test alarm_cpp_test: $(BINDIR)/$(CONFIG)/alarm_cpp_test async_end2end_test: $(BINDIR)/$(CONFIG)/async_end2end_test auth_property_iterator_test: $(BINDIR)/$(CONFIG)/auth_property_iterator_test @@ -1295,7 +1294,6 @@ buildtests_c: privatelibs_c \ $(BINDIR)/$(CONFIG)/transport_security_test \ $(BINDIR)/$(CONFIG)/udp_server_test \ $(BINDIR)/$(CONFIG)/uri_parser_test \ - $(BINDIR)/$(CONFIG)/workqueue_test \ $(BINDIR)/$(CONFIG)/public_headers_must_be_c89 \ $(BINDIR)/$(CONFIG)/badreq_bad_client_test \ $(BINDIR)/$(CONFIG)/connection_prefix_bad_client_test \ @@ -1674,8 +1672,6 @@ test_c: buildtests_c $(Q) $(BINDIR)/$(CONFIG)/udp_server_test || ( echo test udp_server_test failed ; exit 1 ) $(E) "[RUN] Testing uri_parser_test" $(Q) $(BINDIR)/$(CONFIG)/uri_parser_test || ( echo test uri_parser_test failed ; exit 1 ) - $(E) "[RUN] Testing workqueue_test" - $(Q) $(BINDIR)/$(CONFIG)/workqueue_test || ( echo test workqueue_test failed ; exit 1 ) $(E) "[RUN] Testing public_headers_must_be_c89" $(Q) $(BINDIR)/$(CONFIG)/public_headers_must_be_c89 || ( echo test public_headers_must_be_c89 failed ; exit 1 ) $(E) "[RUN] Testing badreq_bad_client_test" @@ -10175,38 +10171,6 @@ endif endif -WORKQUEUE_TEST_SRC = \ - test/core/iomgr/workqueue_test.c \ - -WORKQUEUE_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(WORKQUEUE_TEST_SRC)))) -ifeq ($(NO_SECURE),true) - -# You can't build secure targets if you don't have OpenSSL. - -$(BINDIR)/$(CONFIG)/workqueue_test: openssl_dep_error - -else - - - -$(BINDIR)/$(CONFIG)/workqueue_test: $(WORKQUEUE_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a - $(E) "[LD] Linking $@" - $(Q) mkdir -p `dirname $@` - $(Q) $(LD) $(LDFLAGS) $(WORKQUEUE_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBS) $(LDLIBS_SECURE) -o $(BINDIR)/$(CONFIG)/workqueue_test - -endif - -$(OBJDIR)/$(CONFIG)/test/core/iomgr/workqueue_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a - -deps_workqueue_test: $(WORKQUEUE_TEST_OBJS:.o=.dep) - -ifneq ($(NO_SECURE),true) -ifneq ($(NO_DEPS),true) --include $(WORKQUEUE_TEST_OBJS:.o=.dep) -endif -endif - - ALARM_CPP_TEST_SRC = \ test/cpp/common/alarm_cpp_test.cc \ diff --git a/build.yaml b/build.yaml index c5d92c1e63..006d35245a 100644 --- a/build.yaml +++ b/build.yaml @@ -2430,20 +2430,6 @@ targets: - grpc - gpr_test_util - gpr -- name: workqueue_test - build: test - language: c - src: - - test/core/iomgr/workqueue_test.c - deps: - - grpc_test_util - - grpc - - gpr_test_util - - gpr - platforms: - - mac - - linux - - posix - name: alarm_cpp_test gtest: true build: test diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c index 721ba82d8f..9acacbd92d 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c @@ -91,11 +91,13 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, connector *c = arg; grpc_closure *notify; gpr_mu_lock(&c->mu); + grpc_error *error = GRPC_ERROR_NONE; if (c->connecting_endpoint == NULL) { memset(c->result, 0, sizeof(*c->result)); gpr_mu_unlock(&c->mu); } else if (status != GRPC_SECURITY_OK) { - gpr_log(GPR_ERROR, "Secure handshake failed with error %d.", status); + error = grpc_error_set_int(GRPC_ERROR_CREATE("Secure handshake failed"), + GRPC_ERROR_INT_SECURITY_STATUS, status); memset(c->result, 0, sizeof(*c->result)); c->connecting_endpoint = NULL; gpr_mu_unlock(&c->mu); @@ -113,7 +115,7 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, } notify = c->notify; c->notify = NULL; - grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, notify, error, NULL); } static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg, diff --git a/src/core/ext/transport/chttp2/transport/chttp2_plugin.c b/src/core/ext/transport/chttp2/transport/chttp2_plugin.c index bd87253ed3..7d5279b9da 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_plugin.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_plugin.c @@ -36,11 +36,14 @@ #include "src/core/lib/debug/trace.h" #include "src/core/lib/transport/metadata.h" +extern int grpc_http_write_state_trace; + void grpc_chttp2_plugin_init(void) { grpc_chttp2_base64_encode_and_huffman_compress = grpc_chttp2_base64_encode_and_huffman_compress_impl; grpc_register_tracer("http", &grpc_http_trace); grpc_register_tracer("flowctl", &grpc_flowctl_trace); + grpc_register_tracer("http_write_state", &grpc_http_write_state_trace); } void grpc_chttp2_plugin_shutdown(void) {} diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index e2dd463a77..d050467a02 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -48,6 +48,7 @@ #include "src/core/ext/transport/chttp2/transport/status_conversion.h" #include "src/core/ext/transport/chttp2/transport/timeout_encoding.h" #include "src/core/lib/http/parser.h" +#include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/string.h" #include "src/core/lib/transport/static_metadata.h" @@ -60,9 +61,9 @@ #define DEFAULT_MAX_HEADER_LIST_SIZE (16 * 1024) #define MAX_CLIENT_STREAM_ID 0x7fffffffu - int grpc_http_trace = 0; int grpc_flowctl_trace = 0; +int grpc_http_write_state_trace = 0; #define TRANSPORT_FROM_WRITING(tw) \ ((grpc_chttp2_transport *)((char *)(tw)-offsetof(grpc_chttp2_transport, \ @@ -88,10 +89,16 @@ static const grpc_transport_vtable vtable; static void writing_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); static void reading_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); static void parsing_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); +static void initiate_writing(grpc_exec_ctx *exec_ctx, void *t, + grpc_error *error); + +static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t); +static void end_waiting_for_write(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, grpc_error *error); /** Set a transport level setting, and push it to our peer */ -static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, - uint32_t value); +static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_chttp2_setting_id id, uint32_t value); /** Start disconnection chain */ static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, @@ -137,7 +144,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global); static void incoming_byte_stream_update_flow_control( - grpc_chttp2_transport_global *transport_global, + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, size_t max_size_hint, size_t have_already); static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, @@ -201,6 +208,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, gpr_free(t); } +/*#define REFCOUNTING_DEBUG 1*/ #ifdef REFCOUNTING_DEBUG #define REF_TRANSPORT(t, r) ref_transport(t, r, __FILE__, __LINE__) #define UNREF_TRANSPORT(cl, t, r) unref_transport(cl, t, r, __FILE__, __LINE__) @@ -231,7 +239,7 @@ static void ref_transport(grpc_chttp2_transport *t) { gpr_ref(&t->refs); } static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, const grpc_channel_args *channel_args, - grpc_endpoint *ep, uint8_t is_client) { + grpc_endpoint *ep, bool is_client) { size_t i; int j; @@ -273,6 +281,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_closure_init(&t->writing_action, writing_action, t); grpc_closure_init(&t->reading_action, reading_action, t); grpc_closure_init(&t->parsing_action, parsing_action, t); + grpc_closure_init(&t->initiate_writing, initiate_writing, t); gpr_slice_buffer_init(&t->parsing.qbuf); grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser); @@ -286,6 +295,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, gpr_slice_buffer_add( &t->global.qbuf, gpr_slice_from_copied_string(GRPC_CHTTP2_CLIENT_CONNECT_STRING)); + grpc_chttp2_initiate_write(exec_ctx, &t->global, false, "initial_write"); } /* 8 is a random stab in the dark as to a good initial size: it's small enough that it shouldn't waste memory for infrequently used connections, yet @@ -311,11 +321,12 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, /* configure http2 the way we like it */ if (is_client) { - push_setting(t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0); - push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0); + push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0); + push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0); } - push_setting(t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, DEFAULT_WINDOW); - push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, + push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, + DEFAULT_WINDOW); + push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, DEFAULT_MAX_HEADER_LIST_SIZE); if (channel_args) { @@ -329,7 +340,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, gpr_log(GPR_ERROR, "%s: must be an integer", GRPC_ARG_MAX_CONCURRENT_STREAMS); } else { - push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, + push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, (uint32_t)channel_args->args[i].value.integer); } } else if (0 == strcmp(channel_args->args[i].key, @@ -368,7 +379,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, gpr_log(GPR_ERROR, "%s: must be non-negative", GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER); } else { - push_setting(t, GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE, + push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE, (uint32_t)channel_args->args[i].value.integer); } } else if (0 == strcmp(channel_args->args[i].key, @@ -393,7 +404,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, gpr_log(GPR_ERROR, "%s: must be non-negative", GRPC_ARG_MAX_METADATA_SIZE); } else { - push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, + push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, (uint32_t)channel_args->args[i].value.integer); } } @@ -444,6 +455,9 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_error *error) { if (!t->closed) { + if (grpc_http_write_state_trace) { + gpr_log(GPR_DEBUG, "W:%p close transport", t); + } t->closed = 1; connectivity_state_set(exec_ctx, &t->global, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "close_transport"); @@ -590,7 +604,8 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer_destroy( &s->global.received_trailing_metadata); gpr_slice_buffer_destroy(&s->writing.flow_controlled_buffer); - GRPC_ERROR_UNREF(s->global.removal_error); + GRPC_ERROR_UNREF(s->global.read_closed_error); + GRPC_ERROR_UNREF(s->global.write_closed_error); UNREF_TRANSPORT(exec_ctx, t, "stream"); @@ -634,6 +649,36 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream( * LOCK MANAGEMENT */ +static const char *write_state_name(grpc_chttp2_write_state state) { + switch (state) { + case GRPC_CHTTP2_WRITING_INACTIVE: + return "INACTIVE"; + case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER: + return "REQUESTED[p=0]"; + case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER: + return "REQUESTED[p=1]"; + case GRPC_CHTTP2_WRITE_SCHEDULED: + return "SCHEDULED"; + case GRPC_CHTTP2_WRITING: + return "WRITING"; + case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER: + return "WRITING[p=1]"; + case GRPC_CHTTP2_WRITING_STALE_NO_POLLER: + return "WRITING[p=0]"; + } + GPR_UNREACHABLE_CODE(return "UNKNOWN"); +} + +static void set_write_state(grpc_chttp2_transport *t, + grpc_chttp2_write_state state, const char *reason) { + if (grpc_http_write_state_trace) { + gpr_log(GPR_DEBUG, "W:%p %s -> %s because %s", t, + write_state_name(t->executor.write_state), write_state_name(state), + reason); + } + t->executor.write_state = state; +} + static void finish_global_actions(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { grpc_chttp2_executor_action_header *hdr; @@ -642,13 +687,6 @@ static void finish_global_actions(grpc_exec_ctx *exec_ctx, GPR_TIMER_BEGIN("finish_global_actions", 0); for (;;) { - if (!t->executor.writing_active && !t->closed && - grpc_chttp2_unlocking_check_writes(exec_ctx, &t->global, &t->writing)) { - t->executor.writing_active = 1; - REF_TRANSPORT(t, "writing"); - prevent_endpoint_shutdown(t); - grpc_exec_ctx_sched(exec_ctx, &t->writing_action, GRPC_ERROR_NONE, NULL); - } check_read_ops(exec_ctx, &t->global); gpr_mu_lock(&t->executor.mu); @@ -669,8 +707,28 @@ static void finish_global_actions(grpc_exec_ctx *exec_ctx, continue; } else { t->executor.global_active = false; + switch (t->executor.write_state) { + case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER: + set_write_state(t, GRPC_CHTTP2_WRITE_SCHEDULED, "unlocking"); + REF_TRANSPORT(t, "initiate_writing"); + gpr_mu_unlock(&t->executor.mu); + grpc_exec_ctx_sched( + exec_ctx, &t->initiate_writing, GRPC_ERROR_NONE, + t->ep != NULL ? grpc_endpoint_get_workqueue(t->ep) : NULL); + break; + case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER: + start_writing(exec_ctx, t); + gpr_mu_unlock(&t->executor.mu); + break; + case GRPC_CHTTP2_WRITING_INACTIVE: + case GRPC_CHTTP2_WRITING: + case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER: + case GRPC_CHTTP2_WRITING_STALE_NO_POLLER: + case GRPC_CHTTP2_WRITE_SCHEDULED: + gpr_mu_unlock(&t->executor.mu); + break; + } } - gpr_mu_unlock(&t->executor.mu); break; } @@ -741,16 +799,118 @@ void grpc_chttp2_run_with_global_lock(grpc_exec_ctx *exec_ctx, * OUTPUT PROCESSING */ -void grpc_chttp2_become_writable(grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global *stream_global) { +void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_global *transport_global, + bool covered_by_poller, const char *reason) { + /* Perform state checks, and transition to a scheduled state if appropriate. + Each time we finish the global lock execution, we check if we need to + write. If we do: + - (if there is a poller surrounding the write) schedule + initiate_writing, which locks and calls initiate_writing_locked to... + - call start_writing, which verifies (under the global lock) that there + are things that need to be written by calling + grpc_chttp2_unlocking_check_writes, and if so schedules writing_action + against the current exec_ctx, to be executed OUTSIDE of the global lock + - eventually writing_action results in grpc_chttp2_terminate_writing being + called, which re-takes the global lock, updates state, checks if we need + to do *another* write immediately, and if so loops back to + start_writing. + + Current problems: + - too much lock entry/exiting + - the writing thread can become stuck indefinitely (punt through the + workqueue periodically to fix) */ + + grpc_chttp2_transport *t = TRANSPORT_FROM_GLOBAL(transport_global); + switch (t->executor.write_state) { + case GRPC_CHTTP2_WRITING_INACTIVE: + set_write_state(t, covered_by_poller + ? GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER + : GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER, + reason); + break; + case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER: + /* nothing to do: write already requested */ + break; + case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER: + if (covered_by_poller) { + /* upgrade to note poller is available to cover the write */ + set_write_state(t, GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER, reason); + } + break; + case GRPC_CHTTP2_WRITE_SCHEDULED: + /* nothing to do: write already scheduled */ + break; + case GRPC_CHTTP2_WRITING: + set_write_state(t, + covered_by_poller ? GRPC_CHTTP2_WRITING_STALE_WITH_POLLER + : GRPC_CHTTP2_WRITING_STALE_NO_POLLER, + reason); + break; + case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER: + /* nothing to do: write already requested */ + break; + case GRPC_CHTTP2_WRITING_STALE_NO_POLLER: + if (covered_by_poller) { + /* upgrade to note poller is available to cover the write */ + set_write_state(t, GRPC_CHTTP2_WRITING_STALE_WITH_POLLER, reason); + } + break; + } +} + +static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { + GPR_ASSERT(t->executor.write_state == GRPC_CHTTP2_WRITE_SCHEDULED || + t->executor.write_state == GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER); + if (!t->closed && + grpc_chttp2_unlocking_check_writes(exec_ctx, &t->global, &t->writing)) { + set_write_state(t, GRPC_CHTTP2_WRITING, "start_writing"); + REF_TRANSPORT(t, "writing"); + prevent_endpoint_shutdown(t); + grpc_exec_ctx_sched(exec_ctx, &t->writing_action, GRPC_ERROR_NONE, NULL); + } else { + if (t->closed) { + set_write_state(t, GRPC_CHTTP2_WRITING_INACTIVE, + "start_writing:transport_closed"); + } else { + set_write_state(t, GRPC_CHTTP2_WRITING_INACTIVE, + "start_writing:nothing_to_write"); + } + end_waiting_for_write(exec_ctx, t, GRPC_ERROR_CREATE("Nothing to write")); + if (t->ep && !t->endpoint_reading) { + destroy_endpoint(exec_ctx, t); + } + } +} + +static void initiate_writing_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s_unused, + void *arg_ignored) { + start_writing(exec_ctx, t); + UNREF_TRANSPORT(exec_ctx, t, "initiate_writing"); +} + +static void initiate_writing(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_chttp2_run_with_global_lock(exec_ctx, arg, NULL, initiate_writing_locked, + NULL, 0); +} + +void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, + bool covered_by_poller, const char *reason) { if (!TRANSPORT_FROM_GLOBAL(transport_global)->closed && grpc_chttp2_list_add_writable_stream(transport_global, stream_global)) { GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing"); + grpc_chttp2_initiate_write(exec_ctx, transport_global, covered_by_poller, + reason); } } -static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, - uint32_t value) { +static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_chttp2_setting_id id, uint32_t value) { const grpc_chttp2_setting_parameters *sp = &grpc_chttp2_settings_parameters[id]; uint32_t use_value = GPR_CLAMP(value, sp->min_value, sp->max_value); @@ -761,9 +921,22 @@ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, if (use_value != t->global.settings[GRPC_LOCAL_SETTINGS][id]) { t->global.settings[GRPC_LOCAL_SETTINGS][id] = use_value; t->global.dirtied_local_settings = 1; + grpc_chttp2_initiate_write(exec_ctx, &t->global, false, "push_setting"); } } +static void end_waiting_for_write(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, grpc_error *error) { + grpc_chttp2_stream_global *stream_global; + while (grpc_chttp2_list_pop_closed_waiting_for_writing(&t->global, + &stream_global)) { + fail_pending_writes(exec_ctx, &t->global, stream_global, + GRPC_ERROR_REF(error)); + GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "finish_writes"); + } + GRPC_ERROR_UNREF(error); +} + static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s_ignored, @@ -778,24 +951,32 @@ static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, grpc_chttp2_cleanup_writing(exec_ctx, &t->global, &t->writing); - grpc_chttp2_stream_global *stream_global; - while (grpc_chttp2_list_pop_closed_waiting_for_writing(&t->global, - &stream_global)) { - fail_pending_writes(exec_ctx, &t->global, stream_global, - GRPC_ERROR_REF(error)); - GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "finish_writes"); + end_waiting_for_write(exec_ctx, t, error); + + switch (t->executor.write_state) { + case GRPC_CHTTP2_WRITING_INACTIVE: + case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER: + case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER: + case GRPC_CHTTP2_WRITE_SCHEDULED: + GPR_UNREACHABLE_CODE(break); + case GRPC_CHTTP2_WRITING: + set_write_state(t, GRPC_CHTTP2_WRITING_INACTIVE, "terminate_writing"); + break; + case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER: + set_write_state(t, GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER, + "terminate_writing"); + break; + case GRPC_CHTTP2_WRITING_STALE_NO_POLLER: + set_write_state(t, GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER, + "terminate_writing"); + break; } - /* leave the writing flag up on shutdown to prevent further writes in - unlock() - from starting */ - t->executor.writing_active = 0; if (t->ep && !t->endpoint_reading) { destroy_endpoint(exec_ctx, t); } UNREF_TRANSPORT(exec_ctx, t, "writing"); - GRPC_ERROR_UNREF(error); } void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx, @@ -878,7 +1059,8 @@ static void maybe_start_some_streams( stream_global->id, STREAM_FROM_GLOBAL(stream_global)); stream_global->in_stream_map = true; transport_global->concurrent_stream_count++; - grpc_chttp2_become_writable(transport_global, stream_global); + grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, true, + "new_stream"); } /* cancel out streams that will never be started */ while (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID && @@ -1018,9 +1200,11 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, maybe_start_some_streams(exec_ctx, transport_global); } else { GPR_ASSERT(stream_global->id != 0); - grpc_chttp2_become_writable(transport_global, stream_global); + grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, + true, "op.send_initial_metadata"); } } else { + stream_global->send_trailing_metadata = NULL; grpc_chttp2_complete_closure_step( exec_ctx, transport_global, stream_global, &stream_global->send_initial_metadata_finished, @@ -1042,7 +1226,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, } else { stream_global->send_message = op->send_message; if (stream_global->id != 0) { - grpc_chttp2_become_writable(transport_global, stream_global); + grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, + true, "op.send_message"); } } } @@ -1075,6 +1260,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); } if (stream_global->write_closed) { + stream_global->send_trailing_metadata = NULL; grpc_chttp2_complete_closure_step( exec_ctx, transport_global, stream_global, &stream_global->send_trailing_metadata_finished, @@ -1085,7 +1271,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, } else if (stream_global->id != 0) { /* TODO(ctiller): check if there's flow control for any outstanding bytes before going writable */ - grpc_chttp2_become_writable(transport_global, stream_global); + grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, + true, "op.send_trailing_metadata"); } } } @@ -1106,8 +1293,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, (stream_global->incoming_frames.head == NULL || stream_global->incoming_frames.head->is_tail)) { incoming_byte_stream_update_flow_control( - transport_global, stream_global, transport_global->stream_lookahead, - 0); + exec_ctx, transport_global, stream_global, + transport_global->stream_lookahead, 0); } grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); } @@ -1135,7 +1322,8 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, sizeof(*op)); } -static void send_ping_locked(grpc_chttp2_transport *t, grpc_closure *on_recv) { +static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_closure *on_recv) { grpc_chttp2_outstanding_ping *p = gpr_malloc(sizeof(*p)); p->next = &t->global.pings; p->prev = p->next->prev; @@ -1150,6 +1338,7 @@ static void send_ping_locked(grpc_chttp2_transport *t, grpc_closure *on_recv) { p->id[7] = (uint8_t)(t->global.ping_counter & 0xff); p->on_recv = on_recv; gpr_slice_buffer_add(&t->global.qbuf, grpc_chttp2_ping_create(0, p->id)); + grpc_chttp2_initiate_write(exec_ctx, &t->global, true, "send_ping"); } static void ack_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, @@ -1209,6 +1398,7 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, close_transport = grpc_chttp2_has_streams(t) ? GRPC_ERROR_NONE : GRPC_ERROR_CREATE("GOAWAY sent"); + grpc_chttp2_initiate_write(exec_ctx, &t->global, false, "goaway_sent"); } if (op->set_accept_stream) { @@ -1226,7 +1416,7 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, } if (op->send_ping) { - send_ping_locked(t, op->send_ping); + send_ping_locked(exec_ctx, t, op->send_ping); } if (close_transport != GRPC_ERROR_NONE) { @@ -1414,6 +1604,8 @@ static void cancel_from_api(grpc_exec_ctx *exec_ctx, &transport_global->qbuf, grpc_chttp2_rst_stream_create(stream_global->id, (uint32_t)http_error, &stream_global->stats.outgoing)); + grpc_chttp2_initiate_write(exec_ctx, transport_global, false, + "rst_stream"); } const char *msg = @@ -1473,10 +1665,39 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, } } +static void add_error(grpc_error *error, grpc_error **refs, size_t *nrefs) { + if (error == GRPC_ERROR_NONE) return; + for (size_t i = 0; i < *nrefs; i++) { + if (error == refs[i]) { + return; + } + } + refs[*nrefs] = error; + ++*nrefs; +} + +static grpc_error *removal_error(grpc_error *extra_error, + grpc_chttp2_stream_global *stream_global) { + grpc_error *refs[3]; + size_t nrefs = 0; + add_error(stream_global->read_closed_error, refs, &nrefs); + add_error(stream_global->write_closed_error, refs, &nrefs); + add_error(extra_error, refs, &nrefs); + grpc_error *error = GRPC_ERROR_NONE; + if (nrefs > 0) { + error = GRPC_ERROR_CREATE_REFERENCING("Failed due to stream removal", refs, + nrefs); + } + GRPC_ERROR_UNREF(extra_error); + return error; +} + static void fail_pending_writes(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, grpc_error *error) { + error = removal_error(error, stream_global); + stream_global->send_message = NULL; grpc_chttp2_complete_closure_step( exec_ctx, transport_global, stream_global, &stream_global->send_initial_metadata_finished, GRPC_ERROR_REF(error)); @@ -1499,14 +1720,17 @@ void grpc_chttp2_mark_stream_closed( } grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); if (close_reads && !stream_global->read_closed) { + stream_global->read_closed_error = GRPC_ERROR_REF(error); stream_global->read_closed = true; stream_global->published_initial_metadata = true; stream_global->published_trailing_metadata = true; decrement_active_streams_locked(exec_ctx, transport_global, stream_global); } if (close_writes && !stream_global->write_closed) { + stream_global->write_closed_error = GRPC_ERROR_REF(error); stream_global->write_closed = true; - if (TRANSPORT_FROM_GLOBAL(transport_global)->executor.writing_active) { + if (TRANSPORT_FROM_GLOBAL(transport_global)->executor.write_state != + GRPC_CHTTP2_WRITING_INACTIVE) { GRPC_CHTTP2_STREAM_REF(stream_global, "finish_writes"); grpc_chttp2_list_add_closed_waiting_for_writing(transport_global, stream_global); @@ -1516,7 +1740,6 @@ void grpc_chttp2_mark_stream_closed( } } if (stream_global->read_closed && stream_global->write_closed) { - stream_global->removal_error = GRPC_ERROR_REF(error); if (stream_global->id != 0 && TRANSPORT_FROM_GLOBAL(transport_global)->executor.parsing_active) { grpc_chttp2_list_add_closed_waiting_for_parsing(transport_global, @@ -1524,7 +1747,8 @@ void grpc_chttp2_mark_stream_closed( } else { if (stream_global->id != 0) { remove_stream(exec_ctx, TRANSPORT_FROM_GLOBAL(transport_global), - stream_global->id, GRPC_ERROR_REF(error)); + stream_global->id, + removal_error(GRPC_ERROR_REF(error), stream_global)); } GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2"); } @@ -1649,6 +1873,8 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, 1, 1, error); + grpc_chttp2_initiate_write(exec_ctx, transport_global, false, + "close_from_api"); } typedef struct { @@ -1678,8 +1904,14 @@ static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } /** update window from a settings change */ +typedef struct { + grpc_chttp2_transport *t; + grpc_exec_ctx *exec_ctx; +} update_global_window_args; + static void update_global_window(void *args, uint32_t id, void *stream) { - grpc_chttp2_transport *t = args; + update_global_window_args *a = args; + grpc_chttp2_transport *t = a->t; grpc_chttp2_stream *s = stream; grpc_chttp2_transport_global *transport_global = &t->global; grpc_chttp2_stream_global *stream_global = &s->global; @@ -1693,7 +1925,8 @@ static void update_global_window(void *args, uint32_t id, void *stream) { is_zero = stream_global->outgoing_window <= 0; if (was_zero && !is_zero) { - grpc_chttp2_become_writable(transport_global, stream_global); + grpc_chttp2_become_writable(a->exec_ctx, transport_global, stream_global, + true, "update_global_window"); } } @@ -1801,14 +2034,19 @@ static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_transport_global *transport_global = &t->global; grpc_chttp2_transport_parsing *transport_parsing = &t->parsing; /* copy parsing qbuf to global qbuf */ - gpr_slice_buffer_move_into(&t->parsing.qbuf, &t->global.qbuf); + if (t->parsing.qbuf.count > 0) { + gpr_slice_buffer_move_into(&t->parsing.qbuf, &t->global.qbuf); + grpc_chttp2_initiate_write(exec_ctx, transport_global, false, + "parsing_qbuf"); + } /* merge stream lists */ grpc_chttp2_stream_map_move_into(&t->new_stream_map, &t->parsing_stream_map); transport_global->concurrent_stream_count = (uint32_t)grpc_chttp2_stream_map_size(&t->parsing_stream_map); if (transport_parsing->initial_window_update != 0) { + update_global_window_args args = {t, exec_ctx}; grpc_chttp2_stream_map_for_each(&t->parsing_stream_map, - update_global_window, t); + update_global_window, &args); transport_parsing->initial_window_update = 0; } /* handle higher level things */ @@ -1831,7 +2069,7 @@ static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, GPR_ASSERT(stream_global->write_closed); GPR_ASSERT(stream_global->read_closed); remove_stream(exec_ctx, t, stream_global->id, - GRPC_ERROR_REF(stream_global->removal_error)); + removal_error(GRPC_ERROR_NONE, stream_global)); GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2"); } @@ -1854,11 +2092,12 @@ static void post_reading_action_locked(grpc_exec_ctx *exec_ctx, } drop_connection(exec_ctx, t, GRPC_ERROR_REF(error)); t->endpoint_reading = 0; - if (!t->executor.writing_active && t->ep) { - grpc_endpoint_destroy(exec_ctx, t->ep); - t->ep = NULL; - /* safe as we still have a ref for read */ - UNREF_TRANSPORT(exec_ctx, t, "disconnect"); + if (grpc_http_write_state_trace) { + gpr_log(GPR_DEBUG, "R:%p -> 0 ws=%s", t, + write_state_name(t->executor.write_state)); + } + if (t->executor.write_state == GRPC_CHTTP2_WRITING_INACTIVE && t->ep) { + destroy_endpoint(exec_ctx, t); } } else if (!t->closed) { keep_reading = true; @@ -1942,7 +2181,7 @@ static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx, } static void incoming_byte_stream_update_flow_control( - grpc_chttp2_transport_global *transport_global, + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, size_t max_size_hint, size_t have_already) { uint32_t max_recv_bytes; @@ -1977,7 +2216,8 @@ static void incoming_byte_stream_update_flow_control( add_max_recv_bytes); grpc_chttp2_list_add_unannounced_incoming_window_available(transport_global, stream_global); - grpc_chttp2_become_writable(transport_global, stream_global); + grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, + false, "read_incoming_stream"); } } @@ -1999,8 +2239,9 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream_global *stream_global = &bs->stream->global; if (bs->is_tail) { - incoming_byte_stream_update_flow_control( - transport_global, stream_global, arg->max_size_hint, bs->slices.length); + incoming_byte_stream_update_flow_control(exec_ctx, transport_global, + stream_global, arg->max_size_hint, + bs->slices.length); } if (bs->slices.count > 0) { *arg->slice = gpr_slice_buffer_take_first(&bs->slices); @@ -2184,7 +2425,7 @@ static char *format_flowctl_context_var(const char *context, const char *var, if (context == NULL) { *scope = NULL; gpr_asprintf(&buf, "%s(%" PRId64 ")", var, val); - result = gpr_leftpad(buf, ' ', 40); + result = gpr_leftpad(buf, ' ', 60); gpr_free(buf); return result; } @@ -2197,7 +2438,7 @@ static char *format_flowctl_context_var(const char *context, const char *var, gpr_free(tmp); } gpr_asprintf(&buf, "%s.%s(%" PRId64 ")", underscore_pos + 1, var, val); - result = gpr_leftpad(buf, ' ', 40); + result = gpr_leftpad(buf, ' ', 60); gpr_free(buf); return result; } @@ -2230,7 +2471,7 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *phase, tmp_phase = gpr_leftpad(phase, ' ', 8); tmp_scope1 = gpr_leftpad(scope1, ' ', 11); - gpr_asprintf(&prefix, "FLOW %s: %s %s ", phase, clisvr, scope1); + gpr_asprintf(&prefix, "FLOW %s: %s %s ", tmp_phase, clisvr, scope1); gpr_free(tmp_phase); gpr_free(tmp_scope1); diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 8d79e93ceb..e1dcf5262a 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -305,6 +305,22 @@ typedef struct grpc_chttp2_executor_action_header { void *arg; } grpc_chttp2_executor_action_header; +typedef enum { + /** no writing activity */ + GRPC_CHTTP2_WRITING_INACTIVE, + /** write has been requested, but not scheduled yet */ + GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER, + GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER, + /** write has been requested and scheduled against the workqueue */ + GRPC_CHTTP2_WRITE_SCHEDULED, + /** write has been initiated after being reaped from the workqueue */ + GRPC_CHTTP2_WRITING, + /** write has been initiated, AND another write needs to be started once it's + done */ + GRPC_CHTTP2_WRITING_STALE_WITH_POLLER, + GRPC_CHTTP2_WRITING_STALE_NO_POLLER, +} grpc_chttp2_write_state; + struct grpc_chttp2_transport { grpc_transport base; /* must be first */ gpr_refcount refs; @@ -319,10 +335,10 @@ struct grpc_chttp2_transport { /** is a thread currently in the global lock */ bool global_active; - /** is a thread currently writing */ - bool writing_active; /** is a thread currently parsing */ bool parsing_active; + /** write execution state of the transport */ + grpc_chttp2_write_state write_state; grpc_chttp2_executor_action_header *pending_actions_head; grpc_chttp2_executor_action_header *pending_actions_tail; @@ -342,7 +358,8 @@ struct grpc_chttp2_transport { /** global state for reading/writing */ grpc_chttp2_transport_global global; /** state only accessible by the chain of execution that - set writing_active=1 */ + set writing_state >= GRPC_WRITING, and only by the writing closure + chain. */ grpc_chttp2_transport_writing writing; /** state only accessible by the chain of execution that set parsing_active=1 */ @@ -363,6 +380,8 @@ struct grpc_chttp2_transport { grpc_closure reading_action; /** closure to actually do parsing */ grpc_closure parsing_action; + /** closure to initiate writing */ + grpc_closure initiate_writing; /** incoming read bytes */ gpr_slice_buffer read_buffer; @@ -436,8 +455,10 @@ typedef struct { bool seen_error; bool exceeded_metadata_size; - /** the error that resulted in this stream being removed */ - grpc_error *removal_error; + /** the error that resulted in this stream being read-closed */ + grpc_error *read_closed_error; + /** the error that resulted in this stream being write-closed */ + grpc_error *write_closed_error; bool published_initial_metadata; bool published_trailing_metadata; @@ -514,15 +535,20 @@ struct grpc_chttp2_stream { }; /** Transport writing call flow: - chttp2_transport.c calls grpc_chttp2_unlocking_check_writes to see if writes - are required; - if they are, chttp2_transport.c calls grpc_chttp2_perform_writes to do the - writes. - Once writes have been completed (meaning another write could potentially be - started), - grpc_chttp2_terminate_writing is called. This will call - grpc_chttp2_cleanup_writing, at which - point the write phase is complete. */ + grpc_chttp2_initiate_write() is called anywhere that we know bytes need to + go out on the wire. + If no other write has been started, a task is enqueued onto our workqueue. + When that task executes, it obtains the global lock, and gathers the data + to write. + The global lock is dropped and we do the syscall to write. + After writing, a follow-up check is made to see if another round of writing + should be performed. + + The actual call chain is documented in the implementation of this function. + */ +void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_global *transport_global, + bool covered_by_poller, const char *reason); /** Someone is unlocking the transport mutex: check to see if writes are required, and schedule them if so */ @@ -610,9 +636,8 @@ int grpc_chttp2_list_pop_check_read_ops( void grpc_chttp2_list_add_writing_stalled_by_transport( grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing); -void grpc_chttp2_list_flush_writing_stalled_by_transport( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing, - bool is_window_available); +bool grpc_chttp2_list_flush_writing_stalled_by_transport( + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing); void grpc_chttp2_list_add_stalled_by_transport( grpc_chttp2_transport_writing *transport_writing, @@ -822,7 +847,9 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, /** add a ref to the stream and add it to the writable list; ref will be dropped in writing.c */ -void grpc_chttp2_become_writable(grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global *stream_global); +void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, + bool covered_by_poller, const char *reason); #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H */ diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c index 84eb5752f1..e1fc0ddee2 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.c +++ b/src/core/ext/transport/chttp2/transport/parsing.c @@ -154,10 +154,8 @@ void grpc_chttp2_publish_reads( transport_parsing, outgoing_window); is_zero = transport_global->outgoing_window <= 0; if (was_zero && !is_zero) { - while (grpc_chttp2_list_pop_stalled_by_transport(transport_global, - &stream_global)) { - grpc_chttp2_become_writable(transport_global, stream_global); - } + grpc_chttp2_initiate_write(exec_ctx, transport_global, false, + "new_global_flow_control"); } if (transport_parsing->incoming_window < @@ -168,6 +166,8 @@ void grpc_chttp2_publish_reads( announce_incoming_window, announce_bytes); GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("parsed", transport_parsing, incoming_window, announce_bytes); + grpc_chttp2_initiate_write(exec_ctx, transport_global, false, + "global incoming window"); } /* for each stream that saw an update, fixup global state */ @@ -190,7 +190,8 @@ void grpc_chttp2_publish_reads( outgoing_window); is_zero = stream_global->outgoing_window <= 0; if (was_zero && !is_zero) { - grpc_chttp2_become_writable(transport_global, stream_global); + grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, + false, "stream.read_flow_control"); } stream_global->max_recv_bytes -= (uint32_t)GPR_MIN( diff --git a/src/core/ext/transport/chttp2/transport/stream_lists.c b/src/core/ext/transport/chttp2/transport/stream_lists.c index 8f3ab00e6d..2eb5f5f632 100644 --- a/src/core/ext/transport/chttp2/transport/stream_lists.c +++ b/src/core/ext/transport/chttp2/transport/stream_lists.c @@ -329,6 +329,7 @@ void grpc_chttp2_list_add_writing_stalled_by_transport( grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing) { grpc_chttp2_stream *stream = STREAM_FROM_WRITING(stream_writing); + gpr_log(GPR_DEBUG, "writing stalled %d", stream->global.id); if (!stream->included[GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT]) { GRPC_CHTTP2_STREAM_REF(&stream->global, "chttp2_writing_stalled"); } @@ -336,27 +337,28 @@ void grpc_chttp2_list_add_writing_stalled_by_transport( GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT); } -void grpc_chttp2_list_flush_writing_stalled_by_transport( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing, - bool is_window_available) { +bool grpc_chttp2_list_flush_writing_stalled_by_transport( + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing) { grpc_chttp2_stream *stream; + bool out = false; grpc_chttp2_transport *transport = TRANSPORT_FROM_WRITING(transport_writing); while (stream_list_pop(transport, &stream, GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT)) { - if (is_window_available) { - grpc_chttp2_become_writable(&transport->global, &stream->global); - } else { - grpc_chttp2_list_add_stalled_by_transport(transport_writing, - &stream->writing); - } + gpr_log(GPR_DEBUG, "move %d from writing stalled to just stalled", + stream->global.id); + grpc_chttp2_list_add_stalled_by_transport(transport_writing, + &stream->writing); GRPC_CHTTP2_STREAM_UNREF(exec_ctx, &stream->global, "chttp2_writing_stalled"); + out = true; } + return out; } void grpc_chttp2_list_add_stalled_by_transport( grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing) { + gpr_log(GPR_DEBUG, "stalled %d", stream_writing->id); stream_list_add(TRANSPORT_FROM_WRITING(transport_writing), STREAM_FROM_WRITING(stream_writing), GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT); diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index b19f5f068d..e0d87725e9 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -75,9 +75,13 @@ int grpc_chttp2_unlocking_check_writes( GRPC_CHTTP2_FLOW_MOVE_TRANSPORT("write", transport_writing, outgoing_window, transport_global, outgoing_window); - bool is_window_available = transport_writing->outgoing_window > 0; - grpc_chttp2_list_flush_writing_stalled_by_transport( - exec_ctx, transport_writing, is_window_available); + if (transport_writing->outgoing_window > 0) { + while (grpc_chttp2_list_pop_stalled_by_transport(transport_global, + &stream_global)) { + grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, + false, "transport.read_flow_control"); + } + } /* for each grpc_chttp2_stream that's become writable, frame it's data (according to available window sizes) and add to the output buffer */ @@ -331,6 +335,12 @@ void grpc_chttp2_cleanup_writing( grpc_chttp2_stream_writing *stream_writing; grpc_chttp2_stream_global *stream_global; + if (grpc_chttp2_list_flush_writing_stalled_by_transport(exec_ctx, + transport_writing)) { + grpc_chttp2_initiate_write(exec_ctx, transport_global, false, + "resume_stalled_stream"); + } + while (grpc_chttp2_list_pop_written_stream( transport_global, transport_writing, &stream_global, &stream_writing)) { if (stream_writing->sent_initial_metadata) { diff --git a/src/core/lib/iomgr/endpoint.c b/src/core/lib/iomgr/endpoint.c index 1ab3733d38..f901fcf962 100644 --- a/src/core/lib/iomgr/endpoint.c +++ b/src/core/lib/iomgr/endpoint.c @@ -65,3 +65,7 @@ void grpc_endpoint_destroy(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep) { char* grpc_endpoint_get_peer(grpc_endpoint* ep) { return ep->vtable->get_peer(ep); } + +grpc_workqueue* grpc_endpoint_get_workqueue(grpc_endpoint* ep) { + return ep->vtable->get_workqueue(ep); +} diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h index f9808bbda1..894efc0b23 100644 --- a/src/core/lib/iomgr/endpoint.h +++ b/src/core/lib/iomgr/endpoint.h @@ -51,6 +51,7 @@ struct grpc_endpoint_vtable { gpr_slice_buffer *slices, grpc_closure *cb); void (*write)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, gpr_slice_buffer *slices, grpc_closure *cb); + grpc_workqueue *(*get_workqueue)(grpc_endpoint *ep); void (*add_to_pollset)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_pollset *pollset); void (*add_to_pollset_set)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, @@ -69,6 +70,9 @@ void grpc_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, char *grpc_endpoint_get_peer(grpc_endpoint *ep); +/* Retrieve a reference to the workqueue associated with this endpoint */ +grpc_workqueue *grpc_endpoint_get_workqueue(grpc_endpoint *ep); + /* Write slices out to the socket. If the connection is ready for more data after the end of the call, it diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index cf0fe736a0..6a63c4d1d1 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -57,6 +57,7 @@ #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" +#include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/block_annotate.h" @@ -113,9 +114,7 @@ struct grpc_fd { grpc_closure *read_closure; grpc_closure *write_closure; - /* The polling island to which this fd belongs to and the mutex protecting the - the field */ - gpr_mu pi_mu; + /* The polling island to which this fd belongs to (protected by mu) */ struct polling_island *polling_island; struct grpc_fd *freelist_next; @@ -152,16 +151,17 @@ static void fd_global_shutdown(void); * Polling island Declarations */ -// #define GRPC_PI_REF_COUNT_DEBUG +//#define GRPC_PI_REF_COUNT_DEBUG #ifdef GRPC_PI_REF_COUNT_DEBUG #define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__) -#define PI_UNREF(p, r) pi_unref_dbg((p), (r), __FILE__, __LINE__) +#define PI_UNREF(exec_ctx, p, r) \ + pi_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__) #else /* defined(GRPC_PI_REF_COUNT_DEBUG) */ #define PI_ADD_REF(p, r) pi_add_ref((p)) -#define PI_UNREF(p, r) pi_unref((p)) +#define PI_UNREF(exec_ctx, p, r) pi_unref((exec_ctx), (p)) #endif /* !defined(GPRC_PI_REF_COUNT_DEBUG) */ @@ -172,7 +172,7 @@ typedef struct polling_island { Once the ref count becomes zero, this structure is destroyed which means we should ensure that there is never a scenario where a PI_ADD_REF() is racing with a PI_UNREF() that just made the ref_count zero. */ - gpr_refcount ref_count; + gpr_atm ref_count; /* Pointer to the polling_island this merged into. * merged_to value is only set once in polling_island's lifetime (and that too @@ -184,6 +184,9 @@ typedef struct polling_island { * (except mu and ref_count) are invalid and must be ignored. */ gpr_atm merged_to; + /* The workqueue associated with this polling island */ + grpc_workqueue *workqueue; + /* The fd of the underlying epoll set */ int epoll_fd; @@ -191,11 +194,6 @@ typedef struct polling_island { size_t fd_cnt; size_t fd_capacity; grpc_fd **fds; - - /* Polling islands that are no longer needed are kept in a freelist so that - they can be reused. This field points to the next polling island in the - free list */ - struct polling_island *next_free; } polling_island; /******************************************************************************* @@ -253,13 +251,14 @@ struct grpc_pollset_set { * Common helpers */ -static void append_error(grpc_error **composite, grpc_error *error, +static bool append_error(grpc_error **composite, grpc_error *error, const char *desc) { - if (error == GRPC_ERROR_NONE) return; + if (error == GRPC_ERROR_NONE) return true; if (*composite == GRPC_ERROR_NONE) { *composite = GRPC_ERROR_CREATE(desc); } *composite = grpc_error_add_child(*composite, error); + return false; } /******************************************************************************* @@ -275,11 +274,8 @@ static void append_error(grpc_error **composite, grpc_error *error, threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */ static grpc_wakeup_fd polling_island_wakeup_fd; -/* Polling island freelist */ -static gpr_mu g_pi_freelist_mu; -static polling_island *g_pi_freelist = NULL; - -static void polling_island_delete(); /* Forward declaration */ +/* Forward declaration */ +static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi); #ifdef GRPC_TSAN /* Currently TSAN may incorrectly flag data races between epoll_ctl and @@ -293,28 +289,35 @@ gpr_atm g_epoll_sync; #endif /* defined(GRPC_TSAN) */ #ifdef GRPC_PI_REF_COUNT_DEBUG -void pi_add_ref(polling_island *pi); -void pi_unref(polling_island *pi); +static void pi_add_ref(polling_island *pi); +static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi); -void pi_add_ref_dbg(polling_island *pi, char *reason, char *file, int line) { - long old_cnt = gpr_atm_acq_load(&(pi->ref_count.count)); +static void pi_add_ref_dbg(polling_island *pi, char *reason, char *file, + int line) { + long old_cnt = gpr_atm_acq_load(&pi->ref_count); pi_add_ref(pi); gpr_log(GPR_DEBUG, "Add ref pi: %p, old: %ld -> new:%ld (%s) - (%s, %d)", (void *)pi, old_cnt, old_cnt + 1, reason, file, line); } -void pi_unref_dbg(polling_island *pi, char *reason, char *file, int line) { - long old_cnt = gpr_atm_acq_load(&(pi->ref_count.count)); - pi_unref(pi); +static void pi_unref_dbg(grpc_exec_ctx *exec_ctx, polling_island *pi, + char *reason, char *file, int line) { + long old_cnt = gpr_atm_acq_load(&pi->ref_count); + pi_unref(exec_ctx, pi); gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)", (void *)pi, old_cnt, (old_cnt - 1), reason, file, line); } #endif -void pi_add_ref(polling_island *pi) { gpr_ref(&pi->ref_count); } +static void pi_add_ref(polling_island *pi) { + gpr_atm_no_barrier_fetch_add(&pi->ref_count, 1); +} -void pi_unref(polling_island *pi) { - /* If ref count went to zero, delete the polling island. +static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi) { + /* If ref count went to one, we're back to just the workqueue owning a ref. + Unref the workqueue to break the loop. + + If ref count went to zero, delete the polling island. Note that this deletion not be done under a lock. Once the ref count goes to zero, we are guaranteed that no one else holds a reference to the polling island (and that there is no racing pi_add_ref() call either). @@ -322,12 +325,20 @@ void pi_unref(polling_island *pi) { Also, if we are deleting the polling island and the merged_to field is non-empty, we should remove a ref to the merged_to polling island */ - if (gpr_unref(&pi->ref_count)) { - polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to); - polling_island_delete(pi); - if (next != NULL) { - PI_UNREF(next, "pi_delete"); /* Recursive call */ + switch (gpr_atm_full_fetch_add(&pi->ref_count, -1)) { + case 2: /* last external ref: the only one now owned is by the workqueue */ + GRPC_WORKQUEUE_UNREF(exec_ctx, pi->workqueue, "polling_island"); + break; + case 1: { + polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to); + polling_island_delete(exec_ctx, pi); + if (next != NULL) { + PI_UNREF(exec_ctx, next, "pi_delete"); /* Recursive call */ + } + break; } + case 0: + GPR_UNREACHABLE_CODE(return ); } } @@ -462,69 +473,68 @@ static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd, } /* Might return NULL in case of an error */ -static polling_island *polling_island_create(grpc_fd *initial_fd, +static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx, + grpc_fd *initial_fd, grpc_error **error) { polling_island *pi = NULL; - char *err_msg; const char *err_desc = "polling_island_create"; - /* Try to get one from the polling island freelist */ - gpr_mu_lock(&g_pi_freelist_mu); - if (g_pi_freelist != NULL) { - pi = g_pi_freelist; - g_pi_freelist = g_pi_freelist->next_free; - pi->next_free = NULL; - } - gpr_mu_unlock(&g_pi_freelist_mu); + *error = GRPC_ERROR_NONE; - /* Create new polling island if we could not get one from the free list */ - if (pi == NULL) { - pi = gpr_malloc(sizeof(*pi)); - gpr_mu_init(&pi->mu); - pi->fd_cnt = 0; - pi->fd_capacity = 0; - pi->fds = NULL; - } + pi = gpr_malloc(sizeof(*pi)); + gpr_mu_init(&pi->mu); + pi->fd_cnt = 0; + pi->fd_capacity = 0; + pi->fds = NULL; + pi->epoll_fd = -1; + pi->workqueue = NULL; - gpr_ref_init(&pi->ref_count, 0); + gpr_atm_rel_store(&pi->ref_count, 0); gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL); pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC); if (pi->epoll_fd < 0) { - gpr_asprintf(&err_msg, "epoll_create1 failed with error %d (%s)", errno, - strerror(errno)); - append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc); - gpr_free(err_msg); - } else { - polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd, error); - pi->next_free = NULL; + append_error(error, GRPC_OS_ERROR(errno, "epoll_create1"), err_desc); + goto done; + } - if (initial_fd != NULL) { - /* Lock the polling island here just in case we got this structure from - the freelist and the polling island lock was not released yet (by the - code that adds the polling island to the freelist) */ - gpr_mu_lock(&pi->mu); - polling_island_add_fds_locked(pi, &initial_fd, 1, true, error); - gpr_mu_unlock(&pi->mu); - } + polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd, error); + + if (initial_fd != NULL) { + polling_island_add_fds_locked(pi, &initial_fd, 1, true, error); + } + + if (append_error(error, grpc_workqueue_create(exec_ctx, &pi->workqueue), + err_desc) && + *error == GRPC_ERROR_NONE) { + polling_island_add_fds_locked(pi, &pi->workqueue->wakeup_read_fd, 1, true, + error); + GPR_ASSERT(pi->workqueue->wakeup_read_fd->polling_island == NULL); + pi->workqueue->wakeup_read_fd->polling_island = pi; + PI_ADD_REF(pi, "fd"); } +done: + if (*error != GRPC_ERROR_NONE) { + if (pi->workqueue != NULL) { + GRPC_WORKQUEUE_UNREF(exec_ctx, pi->workqueue, "polling_island"); + } + polling_island_delete(exec_ctx, pi); + pi = NULL; + } return pi; } -static void polling_island_delete(polling_island *pi) { +static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi) { GPR_ASSERT(pi->fd_cnt == 0); - gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL); - - close(pi->epoll_fd); - pi->epoll_fd = -1; - - gpr_mu_lock(&g_pi_freelist_mu); - pi->next_free = g_pi_freelist; - g_pi_freelist = pi; - gpr_mu_unlock(&g_pi_freelist_mu); + if (pi->epoll_fd >= 0) { + close(pi->epoll_fd); + } + gpr_mu_destroy(&pi->mu); + gpr_free(pi->fds); + gpr_free(pi); } /* Attempts to gets the last polling island in the linked list (liked by the @@ -704,9 +714,6 @@ static polling_island *polling_island_merge(polling_island *p, static grpc_error *polling_island_global_init() { grpc_error *error = GRPC_ERROR_NONE; - gpr_mu_init(&g_pi_freelist_mu); - g_pi_freelist = NULL; - error = grpc_wakeup_fd_init(&polling_island_wakeup_fd); if (error == GRPC_ERROR_NONE) { error = grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd); @@ -716,18 +723,6 @@ static grpc_error *polling_island_global_init() { } static void polling_island_global_shutdown() { - polling_island *next; - gpr_mu_lock(&g_pi_freelist_mu); - gpr_mu_unlock(&g_pi_freelist_mu); - while (g_pi_freelist != NULL) { - next = g_pi_freelist->next_free; - gpr_mu_destroy(&g_pi_freelist->mu); - gpr_free(g_pi_freelist->fds); - gpr_free(g_pi_freelist); - g_pi_freelist = next; - } - gpr_mu_destroy(&g_pi_freelist_mu); - grpc_wakeup_fd_destroy(&polling_island_wakeup_fd); } @@ -845,7 +840,6 @@ static grpc_fd *fd_create(int fd, const char *name) { if (new_fd == NULL) { new_fd = gpr_malloc(sizeof(grpc_fd)); gpr_mu_init(&new_fd->mu); - gpr_mu_init(&new_fd->pi_mu); } /* Note: It is not really needed to get the new_fd->mu lock here. If this is a @@ -896,6 +890,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, const char *reason) { bool is_fd_closed = false; grpc_error *error = GRPC_ERROR_NONE; + polling_island *unref_pi = NULL; gpr_mu_lock(&fd->mu); fd->on_done_closure = on_done; @@ -923,21 +918,26 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, - Unlock the latest polling island - Set fd->polling_island to NULL (but remove the ref on the polling island before doing this.) */ - gpr_mu_lock(&fd->pi_mu); if (fd->polling_island != NULL) { polling_island *pi_latest = polling_island_lock(fd->polling_island); polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed, &error); gpr_mu_unlock(&pi_latest->mu); - PI_UNREF(fd->polling_island, "fd_orphan"); + unref_pi = fd->polling_island; fd->polling_island = NULL; } - gpr_mu_unlock(&fd->pi_mu); grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, error, NULL); gpr_mu_unlock(&fd->mu); UNREF_BY(fd, 2, reason); /* Drop the reference */ + if (unref_pi != NULL) { + /* Unref stale polling island here, outside the fd lock above. + The polling island owns a workqueue which owns an fd, and unreffing + inside the lock can cause an eventual lock loop that makes TSAN very + unhappy. */ + PI_UNREF(exec_ctx, unref_pi, "fd_orphan"); + } GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error)); } @@ -1037,6 +1037,17 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_mu_unlock(&fd->mu); } +static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { + gpr_mu_lock(&fd->mu); + grpc_workqueue *workqueue = NULL; + if (fd->polling_island != NULL) { + workqueue = + GRPC_WORKQUEUE_REF(fd->polling_island->workqueue, "get_workqueue"); + } + gpr_mu_unlock(&fd->mu); + return workqueue; +} + /******************************************************************************* * Pollset Definitions */ @@ -1227,9 +1238,10 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { gpr_mu_unlock(&fd->mu); } -static void pollset_release_polling_island(grpc_pollset *ps, char *reason) { +static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx, + grpc_pollset *ps, char *reason) { if (ps->polling_island != NULL) { - PI_UNREF(ps->polling_island, reason); + PI_UNREF(exec_ctx, ps->polling_island, reason); } ps->polling_island = NULL; } @@ -1242,7 +1254,7 @@ static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx, pollset->finish_shutdown_called = true; /* Release the ref and set pollset->polling_island to NULL */ - pollset_release_polling_island(pollset, "ps_shutdown"); + pollset_release_polling_island(exec_ctx, pollset, "ps_shutdown"); grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL); } @@ -1281,7 +1293,7 @@ static void pollset_reset(grpc_pollset *pollset) { pollset->finish_shutdown_called = false; pollset->kicked_without_pollers = false; pollset->shutdown_done = NULL; - pollset_release_polling_island(pollset, "ps_reset"); + GPR_ASSERT(pollset->polling_island == NULL); } #define GRPC_EPOLL_MAX_EVENTS 1000 @@ -1309,7 +1321,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, this function (i.e pollset_work_and_unlock()) is called */ if (pollset->polling_island == NULL) { - pollset->polling_island = polling_island_create(NULL, error); + pollset->polling_island = polling_island_create(exec_ctx, NULL, error); if (pollset->polling_island == NULL) { GPR_TIMER_END("pollset_work_and_unlock", 0); return; /* Fatal error. We cannot continue */ @@ -1329,7 +1341,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, /* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the polling island to be deleted */ PI_ADD_REF(pi, "ps"); - PI_UNREF(pollset->polling_island, "ps"); + PI_UNREF(exec_ctx, pollset->polling_island, "ps"); pollset->polling_island = pi; } @@ -1400,7 +1412,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, that we got before releasing the polling island lock). This is because pollset->polling_island pointer might get udpated in other parts of the code when there is an island merge while we are doing epoll_wait() above */ - PI_UNREF(pi, "ps_work"); + PI_UNREF(exec_ctx, pi, "ps_work"); GPR_TIMER_END("pollset_work_and_unlock", 0); } @@ -1517,10 +1529,11 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_error *error = GRPC_ERROR_NONE; gpr_mu_lock(&pollset->mu); - gpr_mu_lock(&fd->pi_mu); + gpr_mu_lock(&fd->mu); polling_island *pi_new = NULL; +retry: /* 1) If fd->polling_island and pollset->polling_island are both non-NULL and * equal, do nothing. * 2) If fd->polling_island and pollset->polling_island are both NULL, create @@ -1535,15 +1548,44 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, * polling_island fields in both fd and pollset to point to the merged * polling island. */ + + if (fd->orphaned) { + gpr_mu_unlock(&fd->mu); + gpr_mu_unlock(&pollset->mu); + /* early out */ + return; + } + if (fd->polling_island == pollset->polling_island) { pi_new = fd->polling_island; if (pi_new == NULL) { - pi_new = polling_island_create(fd, &error); - - GRPC_POLLING_TRACE( - "pollset_add_fd: Created new polling island. pi_new: %p (fd: %d, " - "pollset: %p)", - (void *)pi_new, fd->fd, (void *)pollset); + /* Unlock before creating a new polling island: the polling island will + create a workqueue which creates a file descriptor, and holding an fd + lock here can eventually cause a loop to appear to TSAN (making it + unhappy). We don't think it's a real loop (there's an epoch point where + that loop possibility disappears), but the advantages of keeping TSAN + happy outweigh any performance advantage we might have by keeping the + lock held. */ + gpr_mu_unlock(&fd->mu); + pi_new = polling_island_create(exec_ctx, fd, &error); + gpr_mu_lock(&fd->mu); + /* Need to reverify any assumptions made between the initial lock and + getting to this branch: if they've changed, we need to throw away our + work and figure things out again. */ + if (fd->polling_island != NULL) { + GRPC_POLLING_TRACE( + "pollset_add_fd: Raced creating new polling island. pi_new: %p " + "(fd: %d, pollset: %p)", + (void *)pi_new, fd->fd, (void *)pollset); + PI_ADD_REF(pi_new, "dance_of_destruction"); + PI_UNREF(exec_ctx, pi_new, "dance_of_destruction"); + goto retry; + } else { + GRPC_POLLING_TRACE( + "pollset_add_fd: Created new polling island. pi_new: %p (fd: %d, " + "pollset: %p)", + (void *)pi_new, fd->fd, (void *)pollset); + } } } else if (fd->polling_island == NULL) { pi_new = polling_island_lock(pollset->polling_island); @@ -1579,7 +1621,7 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (fd->polling_island != pi_new) { PI_ADD_REF(pi_new, "fd"); if (fd->polling_island != NULL) { - PI_UNREF(fd->polling_island, "fd"); + PI_UNREF(exec_ctx, fd->polling_island, "fd"); } fd->polling_island = pi_new; } @@ -1587,13 +1629,15 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (pollset->polling_island != pi_new) { PI_ADD_REF(pi_new, "ps"); if (pollset->polling_island != NULL) { - PI_UNREF(pollset->polling_island, "ps"); + PI_UNREF(exec_ctx, pollset->polling_island, "ps"); } pollset->polling_island = pi_new; } - gpr_mu_unlock(&fd->pi_mu); + gpr_mu_unlock(&fd->mu); gpr_mu_unlock(&pollset->mu); + + GRPC_LOG_IF_ERROR("pollset_add_fd", error); } /******************************************************************************* @@ -1744,9 +1788,9 @@ static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, void *grpc_fd_get_polling_island(grpc_fd *fd) { polling_island *pi; - gpr_mu_lock(&fd->pi_mu); + gpr_mu_lock(&fd->mu); pi = fd->polling_island; - gpr_mu_unlock(&fd->pi_mu); + gpr_mu_unlock(&fd->mu); return pi; } @@ -1794,6 +1838,7 @@ static const grpc_event_engine_vtable vtable = { .fd_notify_on_read = fd_notify_on_read, .fd_notify_on_write = fd_notify_on_write, .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset, + .fd_get_workqueue = fd_get_workqueue, .pollset_init = pollset_init, .pollset_shutdown = pollset_shutdown, diff --git a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c index 9e306af5fa..c2107e5e39 100644 --- a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c +++ b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c @@ -725,6 +725,8 @@ static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher, GRPC_FD_UNREF(fd, "poll"); } +static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { return NULL; } + /******************************************************************************* * pollset_posix.c */ @@ -2006,6 +2008,7 @@ static const grpc_event_engine_vtable vtable = { .fd_notify_on_read = fd_notify_on_read, .fd_notify_on_write = fd_notify_on_write, .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset, + .fd_get_workqueue = fd_get_workqueue, .pollset_init = pollset_init, .pollset_shutdown = pollset_shutdown, diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index 45c0a5e954..4b593f4b2c 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -617,6 +617,8 @@ static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher, GRPC_FD_UNREF(fd, "poll"); } +static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { return NULL; } + /******************************************************************************* * pollset_posix.c */ @@ -1234,6 +1236,7 @@ static const grpc_event_engine_vtable vtable = { .fd_notify_on_read = fd_notify_on_read, .fd_notify_on_write = fd_notify_on_write, .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset, + .fd_get_workqueue = fd_get_workqueue, .pollset_init = pollset_init, .pollset_shutdown = pollset_shutdown, diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index a3c1e9db9a..6536672685 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -148,6 +148,10 @@ grpc_fd *grpc_fd_create(int fd, const char *name) { return g_event_engine->fd_create(fd, name); } +grpc_workqueue *grpc_fd_get_workqueue(grpc_fd *fd) { + return g_event_engine->fd_get_workqueue(fd); +} + int grpc_fd_wrapped_fd(grpc_fd *fd) { return g_event_engine->fd_wrapped_fd(fd); } diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index 579c84ef70..c2aa1756ea 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -56,6 +56,7 @@ typedef struct grpc_event_engine_vtable { void (*fd_notify_on_write)(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure); bool (*fd_is_shutdown)(grpc_fd *fd); + grpc_workqueue *(*fd_get_workqueue)(grpc_fd *fd); grpc_pollset *(*fd_get_read_notifier_pollset)(grpc_exec_ctx *exec_ctx, grpc_fd *fd); @@ -107,6 +108,9 @@ const char *grpc_get_poll_strategy_name(); This takes ownership of closing fd. */ grpc_fd *grpc_fd_create(int fd, const char *name); +/* Get a workqueue that's associated with this fd */ +grpc_workqueue *grpc_fd_get_workqueue(grpc_fd *fd); + /* Return the wrapped fd, or -1 if it has been released or closed. */ int grpc_fd_wrapped_fd(grpc_fd *fd); diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c index c44aafcddf..ac7785ec13 100644 --- a/src/core/lib/iomgr/exec_ctx.c +++ b/src/core/lib/iomgr/exec_ctx.c @@ -37,6 +37,7 @@ #include #include +#include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/profiling/timers.h" bool grpc_exec_ctx_ready_to_finish(grpc_exec_ctx *exec_ctx) { @@ -85,14 +86,17 @@ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) { void grpc_exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_error *error, grpc_workqueue *offload_target_or_null) { - GPR_ASSERT(offload_target_or_null == NULL); - grpc_closure_list_append(&exec_ctx->closure_list, closure, error); + if (offload_target_or_null == NULL) { + grpc_closure_list_append(&exec_ctx->closure_list, closure, error); + } else { + grpc_workqueue_enqueue(exec_ctx, offload_target_or_null, closure, error); + GRPC_WORKQUEUE_UNREF(exec_ctx, offload_target_or_null, "exec_ctx_sched"); + } } void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx, grpc_closure_list *list, grpc_workqueue *offload_target_or_null) { - GPR_ASSERT(offload_target_or_null == NULL); grpc_closure_list_move(list, &exec_ctx->closure_list); } diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h index 38f27d9b13..917f332f03 100644 --- a/src/core/lib/iomgr/exec_ctx.h +++ b/src/core/lib/iomgr/exec_ctx.h @@ -93,7 +93,11 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx); /** Finish any pending work for a grpc_exec_ctx. Must be called before * the instance is destroyed, or work may be lost. */ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx); -/** Add a closure to be executed at the next flush/finish point */ +/** Add a closure to be executed in the future. + If \a offload_target_or_null is NULL, the closure will be executed at the + next exec_ctx.{finish,flush} point. + If \a offload_target_or_null is non-NULL, the closure will be scheduled + against the workqueue, and a reference to the workqueue will be consumed. */ void grpc_exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_error *error, grpc_workqueue *offload_target_or_null); diff --git a/src/core/lib/iomgr/iomgr.c b/src/core/lib/iomgr/iomgr.c index 89292a153e..d67d388b8c 100644 --- a/src/core/lib/iomgr/iomgr.c +++ b/src/core/lib/iomgr/iomgr.c @@ -45,6 +45,7 @@ #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/iomgr_internal.h" +#include "src/core/lib/iomgr/network_status_tracker.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/support/env.h" #include "src/core/lib/support/string.h" @@ -62,6 +63,7 @@ void grpc_iomgr_init(void) { grpc_timer_list_init(gpr_now(GPR_CLOCK_MONOTONIC)); g_root_object.next = g_root_object.prev = &g_root_object; g_root_object.name = "root"; + grpc_network_status_init(); grpc_iomgr_platform_init(); } @@ -140,6 +142,7 @@ void grpc_iomgr_shutdown(void) { grpc_iomgr_platform_shutdown(); grpc_exec_ctx_global_shutdown(); + grpc_network_status_shutdown(); gpr_mu_destroy(&g_mu); gpr_cv_destroy(&g_rcv); } diff --git a/src/core/lib/iomgr/network_status_tracker.c b/src/core/lib/iomgr/network_status_tracker.c index 38a1c9b7d4..b4bb7e3cf7 100644 --- a/src/core/lib/iomgr/network_status_tracker.c +++ b/src/core/lib/iomgr/network_status_tracker.c @@ -42,10 +42,16 @@ typedef struct endpoint_ll_node { static endpoint_ll_node *head = NULL; static gpr_mu g_endpoint_mutex; -static bool g_init_done = false; -void grpc_initialize_network_status_monitor() { - g_init_done = true; +void grpc_network_status_shutdown(void) { + if (head != NULL) { + gpr_log(GPR_ERROR, + "Memory leaked as all network endpoints were not shut down"); + } + gpr_mu_destroy(&g_endpoint_mutex); +} + +void grpc_network_status_init(void) { gpr_mu_init(&g_endpoint_mutex); // TODO(makarandd): Install callback with OS to monitor network status. } @@ -60,9 +66,6 @@ void grpc_destroy_network_status_monitor() { } void grpc_network_status_register_endpoint(grpc_endpoint *ep) { - if (!g_init_done) { - grpc_initialize_network_status_monitor(); - } gpr_mu_lock(&g_endpoint_mutex); if (head == NULL) { head = (endpoint_ll_node *)gpr_malloc(sizeof(endpoint_ll_node)); diff --git a/src/core/lib/iomgr/network_status_tracker.h b/src/core/lib/iomgr/network_status_tracker.h index 74a1aa8135..67cb645f44 100644 --- a/src/core/lib/iomgr/network_status_tracker.h +++ b/src/core/lib/iomgr/network_status_tracker.h @@ -35,7 +35,11 @@ #define GRPC_CORE_LIB_IOMGR_NETWORK_STATUS_TRACKER_H #include "src/core/lib/iomgr/endpoint.h" +void grpc_network_status_init(void); +void grpc_network_status_shutdown(void); + void grpc_network_status_register_endpoint(grpc_endpoint *ep); void grpc_network_status_unregister_endpoint(grpc_endpoint *ep); void grpc_network_status_shutdown_all_endpoints(); + #endif /* GRPC_CORE_LIB_IOMGR_NETWORK_STATUS_TRACKER_H */ diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index 2ab45e33ce..ec21e03944 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -284,7 +284,7 @@ static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, } /* returns true if done, false if pending; if returning true, *error is set */ -#define MAX_WRITE_IOVEC 16 +#define MAX_WRITE_IOVEC 1024 static bool tcp_flush(grpc_tcp *tcp, grpc_error **error) { struct msghdr msg; struct iovec iov[MAX_WRITE_IOVEC]; @@ -450,9 +450,19 @@ static char *tcp_get_peer(grpc_endpoint *ep) { return gpr_strdup(tcp->peer_string); } -static const grpc_endpoint_vtable vtable = { - tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set, - tcp_shutdown, tcp_destroy, tcp_get_peer}; +static grpc_workqueue *tcp_get_workqueue(grpc_endpoint *ep) { + grpc_tcp *tcp = (grpc_tcp *)ep; + return grpc_fd_get_workqueue(tcp->em_fd); +} + +static const grpc_endpoint_vtable vtable = {tcp_read, + tcp_write, + tcp_get_workqueue, + tcp_add_to_pollset, + tcp_add_to_pollset_set, + tcp_shutdown, + tcp_destroy, + tcp_get_peer}; grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size, const char *peer_string) { diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c index d3803c3bd0..cb2ff782d6 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.c @@ -491,7 +491,8 @@ static grpc_error *clone_port(grpc_tcp_listener *listener, unsigned count) { } for (unsigned i = 0; i < count; i++) { - int fd, port; + int fd = -1; + int port = -1; grpc_dualstack_mode dsmode; err = grpc_create_dualstack_socket(&listener->addr.sockaddr, SOCK_STREAM, 0, &dsmode, &fd); diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c index 37ab59021e..35054c42b5 100644 --- a/src/core/lib/iomgr/tcp_windows.c +++ b/src/core/lib/iomgr/tcp_windows.c @@ -389,9 +389,16 @@ static char *win_get_peer(grpc_endpoint *ep) { return gpr_strdup(tcp->peer_string); } -static grpc_endpoint_vtable vtable = { - win_read, win_write, win_add_to_pollset, win_add_to_pollset_set, - win_shutdown, win_destroy, win_get_peer}; +static grpc_workqueue *win_get_workqueue(grpc_endpoint *ep) { return NULL; } + +static grpc_endpoint_vtable vtable = {win_read, + win_write, + win_get_workqueue, + win_add_to_pollset, + win_add_to_pollset_set, + win_shutdown, + win_destroy, + win_get_peer}; grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) { grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp)); diff --git a/src/core/lib/iomgr/workqueue.h b/src/core/lib/iomgr/workqueue.h index 5cc40eea50..7156e490d7 100644 --- a/src/core/lib/iomgr/workqueue.h +++ b/src/core/lib/iomgr/workqueue.h @@ -38,6 +38,7 @@ #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/pollset.h" +#include "src/core/lib/iomgr/pollset_set.h" #ifdef GPR_POSIX_SOCKET #include "src/core/lib/iomgr/workqueue_posix.h" @@ -49,35 +50,45 @@ /* grpc_workqueue is forward declared in exec_ctx.h */ -/** Create a work queue */ -grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx, - grpc_workqueue **workqueue); - +/* Deprecated: do not use. + This has *already* been removed in a future commit. */ void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue); -#define GRPC_WORKQUEUE_REFCOUNT_DEBUG +/* Reference counting functions. Use the macro's always + (GRPC_WORKQUEUE_{REF,UNREF}). + + Pass in a descriptive reason string for reffing/unreffing as the last + argument to each macro. When GRPC_WORKQUEUE_REFCOUNT_DEBUG is defined, that + string will be printed alongside the refcount. When it is not defined, the + string will be discarded at compilation time. */ + +//#define GRPC_WORKQUEUE_REFCOUNT_DEBUG #ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG #define GRPC_WORKQUEUE_REF(p, r) \ - grpc_workqueue_ref((p), __FILE__, __LINE__, (r)) -#define GRPC_WORKQUEUE_UNREF(cl, p, r) \ - grpc_workqueue_unref((cl), (p), __FILE__, __LINE__, (r)) + (grpc_workqueue_ref((p), __FILE__, __LINE__, (r)), (p)) +#define GRPC_WORKQUEUE_UNREF(exec_ctx, p, r) \ + grpc_workqueue_unref((exec_ctx), (p), __FILE__, __LINE__, (r)) void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line, const char *reason); void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, const char *file, int line, const char *reason); #else -#define GRPC_WORKQUEUE_REF(p, r) grpc_workqueue_ref((p)) +#define GRPC_WORKQUEUE_REF(p, r) (grpc_workqueue_ref((p)), (p)) #define GRPC_WORKQUEUE_UNREF(cl, p, r) grpc_workqueue_unref((cl), (p)) void grpc_workqueue_ref(grpc_workqueue *workqueue); void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue); #endif -/** Bind this workqueue to a pollset */ -void grpc_workqueue_add_to_pollset(grpc_exec_ctx *exec_ctx, - grpc_workqueue *workqueue, - grpc_pollset *pollset); +/** Add a work item to a workqueue. Items added to a work queue will be started + in approximately the order they were enqueued, on some thread that may or + may not be the current thread. Successive closures enqueued onto a workqueue + MAY be executed concurrently. + + It is generally more expensive to add a closure to a workqueue than to the + execution context, both in terms of CPU work and in execution latency. -/** Add a work item to a workqueue */ + Use work queues when it's important that other threads be given a chance to + tackle some workload. */ void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, grpc_closure *closure, grpc_error *error); diff --git a/src/core/lib/iomgr/workqueue_posix.c b/src/core/lib/iomgr/workqueue_posix.c index 45e0f6063b..e0d6dac230 100644 --- a/src/core/lib/iomgr/workqueue_posix.c +++ b/src/core/lib/iomgr/workqueue_posix.c @@ -70,7 +70,7 @@ grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx, static void workqueue_destroy(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { - GPR_ASSERT(grpc_closure_list_empty(workqueue->closure_list)); + grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL); grpc_fd_shutdown(exec_ctx, workqueue->wakeup_read_fd); } @@ -100,12 +100,6 @@ void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { } } -void grpc_workqueue_add_to_pollset(grpc_exec_ctx *exec_ctx, - grpc_workqueue *workqueue, - grpc_pollset *pollset) { - grpc_pollset_add_fd(exec_ctx, pollset, workqueue->wakeup_read_fd); -} - void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { gpr_mu_lock(&workqueue->mu); grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL); diff --git a/src/core/lib/iomgr/workqueue_posix.h b/src/core/lib/iomgr/workqueue_posix.h index dcb47e7b59..0f26ba58e2 100644 --- a/src/core/lib/iomgr/workqueue_posix.h +++ b/src/core/lib/iomgr/workqueue_posix.h @@ -50,4 +50,9 @@ struct grpc_workqueue { grpc_closure read_closure; }; +/** Create a work queue. Returns an error if creation fails. If creation + succeeds, sets *workqueue to point to it. */ +grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx, + grpc_workqueue **workqueue); + #endif /* GRPC_CORE_LIB_IOMGR_WORKQUEUE_POSIX_H */ diff --git a/src/core/lib/iomgr/workqueue_windows.c b/src/core/lib/iomgr/workqueue_windows.c index 275f040b1c..23e2dea185 100644 --- a/src/core/lib/iomgr/workqueue_windows.c +++ b/src/core/lib/iomgr/workqueue_windows.c @@ -37,4 +37,26 @@ #include "src/core/lib/iomgr/workqueue.h" +// Minimal implementation of grpc_workqueue for Windows +// Works by directly enqueuing workqueue items onto the current execution +// context, which is at least correct, if not performant or in the spirit of +// workqueues. + +void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {} + +#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG +void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line, + const char *reason) {} +void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, + const char *file, int line, const char *reason) {} +#else +void grpc_workqueue_ref(grpc_workqueue *workqueue) {} +void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {} +#endif + +void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, + grpc_closure *closure, grpc_error *error) { + grpc_exec_ctx_sched(exec_ctx, closure, error, NULL); +} + #endif /* GPR_WINDOWS */ diff --git a/src/core/lib/security/transport/secure_endpoint.c b/src/core/lib/security/transport/secure_endpoint.c index 7650d68e89..bc50f9d1b0 100644 --- a/src/core/lib/security/transport/secure_endpoint.c +++ b/src/core/lib/security/transport/secure_endpoint.c @@ -360,11 +360,19 @@ static char *endpoint_get_peer(grpc_endpoint *secure_ep) { return grpc_endpoint_get_peer(ep->wrapped_ep); } -static const grpc_endpoint_vtable vtable = { - endpoint_read, endpoint_write, - endpoint_add_to_pollset, endpoint_add_to_pollset_set, - endpoint_shutdown, endpoint_destroy, - endpoint_get_peer}; +static grpc_workqueue *endpoint_get_workqueue(grpc_endpoint *secure_ep) { + secure_endpoint *ep = (secure_endpoint *)secure_ep; + return grpc_endpoint_get_workqueue(ep->wrapped_ep); +} + +static const grpc_endpoint_vtable vtable = {endpoint_read, + endpoint_write, + endpoint_get_workqueue, + endpoint_add_to_pollset, + endpoint_add_to_pollset_set, + endpoint_shutdown, + endpoint_destroy, + endpoint_get_peer}; grpc_endpoint *grpc_secure_endpoint_create( struct tsi_frame_protector *protector, grpc_endpoint *transport, diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index def6e5068b..2f108af48a 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -73,6 +73,7 @@ typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type; typedef struct requested_call { requested_call_type type; + size_t cq_idx; void *tag; grpc_server *server; grpc_completion_queue *cq_bound_to_call; @@ -206,11 +207,11 @@ struct grpc_server { registered_method *registered_methods; /** one request matcher for unregistered methods */ request_matcher unregistered_request_matcher; - /** free list of available requested_calls indices */ - gpr_stack_lockfree *request_freelist; + /** free list of available requested_calls_per_cq indices */ + gpr_stack_lockfree **request_freelist_per_cq; /** requested call backing data */ - requested_call *requested_calls; - size_t max_requested_calls; + requested_call **requested_calls_per_cq; + int max_requested_calls_per_cq; gpr_atm shutdown_flag; uint8_t shutdown_published; @@ -357,7 +358,8 @@ static void request_matcher_kill_requests(grpc_exec_ctx *exec_ctx, for (size_t i = 0; i < server->cq_count; i++) { while ((request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[i])) != -1) { - fail_call(exec_ctx, server, i, &server->requested_calls[request_id], + fail_call(exec_ctx, server, i, + &server->requested_calls_per_cq[i][request_id], GRPC_ERROR_REF(error)); } } @@ -392,12 +394,16 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) { } for (i = 0; i < server->cq_count; i++) { GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server"); + if (server->started) { + gpr_stack_lockfree_destroy(server->request_freelist_per_cq[i]); + gpr_free(server->requested_calls_per_cq[i]); + } } - gpr_stack_lockfree_destroy(server->request_freelist); + gpr_free(server->request_freelist_per_cq); + gpr_free(server->requested_calls_per_cq); gpr_free(server->cqs); gpr_free(server->pollsets); gpr_free(server->shutdown_tags); - gpr_free(server->requested_calls); gpr_free(server); } @@ -460,11 +466,13 @@ static void done_request_event(grpc_exec_ctx *exec_ctx, void *req, requested_call *rc = req; grpc_server *server = rc->server; - if (rc >= server->requested_calls && - rc < server->requested_calls + server->max_requested_calls) { - GPR_ASSERT(rc - server->requested_calls <= INT_MAX); - gpr_stack_lockfree_push(server->request_freelist, - (int)(rc - server->requested_calls)); + if (rc >= server->requested_calls_per_cq[rc->cq_idx] && + rc < server->requested_calls_per_cq[rc->cq_idx] + + server->max_requested_calls_per_cq) { + GPR_ASSERT(rc - server->requested_calls_per_cq[rc->cq_idx] <= INT_MAX); + gpr_stack_lockfree_push( + server->request_freelist_per_cq[rc->cq_idx], + (int)(rc - server->requested_calls_per_cq[rc->cq_idx])); } else { gpr_free(req); } @@ -540,7 +548,7 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); publish_call(exec_ctx, server, calld, cq_idx, - &server->requested_calls[request_id]); + &server->requested_calls_per_cq[cq_idx][request_id]); return; /* early out */ } } @@ -979,8 +987,6 @@ void grpc_server_register_non_listening_completion_queue( } grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) { - size_t i; - GRPC_API_TRACE("grpc_server_create(%p, %p)", 2, (args, reserved)); grpc_server *server = gpr_malloc(sizeof(grpc_server)); @@ -998,15 +1004,7 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) { &server->root_channel_data; /* TODO(ctiller): expose a channel_arg for this */ - server->max_requested_calls = 32768; - server->request_freelist = - gpr_stack_lockfree_create(server->max_requested_calls); - for (i = 0; i < (size_t)server->max_requested_calls; i++) { - gpr_stack_lockfree_push(server->request_freelist, (int)i); - } - server->requested_calls = gpr_malloc(server->max_requested_calls * - sizeof(*server->requested_calls)); - + server->max_requested_calls_per_cq = 32768; server->channel_args = grpc_channel_args_copy(args); return server; @@ -1066,16 +1064,28 @@ void grpc_server_start(grpc_server *server) { server->started = true; size_t pollset_count = 0; server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count); + server->request_freelist_per_cq = + gpr_malloc(sizeof(*server->request_freelist_per_cq) * server->cq_count); + server->requested_calls_per_cq = + gpr_malloc(sizeof(*server->requested_calls_per_cq) * server->cq_count); for (i = 0; i < server->cq_count; i++) { if (!grpc_cq_is_non_listening_server_cq(server->cqs[i])) { server->pollsets[pollset_count++] = grpc_cq_pollset(server->cqs[i]); } + server->request_freelist_per_cq[i] = + gpr_stack_lockfree_create((size_t)server->max_requested_calls_per_cq); + for (int j = 0; j < server->max_requested_calls_per_cq; j++) { + gpr_stack_lockfree_push(server->request_freelist_per_cq[i], j); + } + server->requested_calls_per_cq[i] = + gpr_malloc((size_t)server->max_requested_calls_per_cq * + sizeof(*server->requested_calls_per_cq[i])); } request_matcher_init(&server->unregistered_request_matcher, - server->max_requested_calls, server); + (size_t)server->max_requested_calls_per_cq, server); for (registered_method *rm = server->registered_methods; rm; rm = rm->next) { - request_matcher_init(&rm->request_matcher, server->max_requested_calls, - server); + request_matcher_init(&rm->request_matcher, + (size_t)server->max_requested_calls_per_cq, server); } for (l = server->listeners; l; l = l->next) { @@ -1307,11 +1317,13 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, GRPC_ERROR_CREATE("Server Shutdown")); return GRPC_CALL_OK; } - request_id = gpr_stack_lockfree_pop(server->request_freelist); + request_id = gpr_stack_lockfree_pop(server->request_freelist_per_cq[cq_idx]); if (request_id == -1) { /* out of request ids: just fail this one */ fail_call(exec_ctx, server, cq_idx, rc, - GRPC_ERROR_CREATE("Server Shutdown")); + grpc_error_set_int(GRPC_ERROR_CREATE("Out of request ids"), + GRPC_ERROR_INT_LIMIT, + server->max_requested_calls_per_cq)); return GRPC_CALL_OK; } switch (rc->type) { @@ -1322,7 +1334,7 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, rm = &rc->data.registered.registered_method->request_matcher; break; } - server->requested_calls[request_id] = *rc; + server->requested_calls_per_cq[cq_idx][request_id] = *rc; gpr_free(rc); if (gpr_stack_lockfree_push(rm->requests_per_cq[cq_idx], request_id)) { /* this was the first queued request: we need to lock and start @@ -1346,7 +1358,7 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); publish_call(exec_ctx, server, calld, cq_idx, - &server->requested_calls[request_id]); + &server->requested_calls_per_cq[cq_idx][request_id]); } gpr_mu_lock(&server->mu_call); } @@ -1382,6 +1394,7 @@ grpc_call_error grpc_server_request_call( } grpc_cq_begin_op(cq_for_notification, tag); details->reserved = NULL; + rc->cq_idx = cq_idx; rc->type = BATCH_CALL; rc->server = server; rc->tag = tag; @@ -1430,6 +1443,7 @@ grpc_call_error grpc_server_request_registered_call( goto done; } grpc_cq_begin_op(cq_for_notification, tag); + rc->cq_idx = cq_idx; rc->type = REGISTERED_CALL; rc->server = server; rc->tag = tag; diff --git a/src/core/lib/transport/connectivity_state.c b/src/core/lib/transport/connectivity_state.c index 054f112127..68d05e3a85 100644 --- a/src/core/lib/transport/connectivity_state.c +++ b/src/core/lib/transport/connectivity_state.c @@ -179,6 +179,9 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx, while ((w = tracker->watchers) != NULL) { *w->current = tracker->current_state; tracker->watchers = w->next; + if (grpc_connectivity_state_trace) { + gpr_log(GPR_DEBUG, "NOTIFY: %p", w->notify); + } grpc_exec_ctx_sched(exec_ctx, w->notify, GRPC_ERROR_REF(tracker->current_error), NULL); gpr_free(w); diff --git a/test/core/end2end/tests/high_initial_seqno.c b/test/core/end2end/tests/high_initial_seqno.c index 50e3c9cb89..db45f5eb5a 100644 --- a/test/core/end2end/tests/high_initial_seqno.c +++ b/test/core/end2end/tests/high_initial_seqno.c @@ -203,6 +203,12 @@ static void simple_request_body(grpc_end2end_test_fixture f) { grpc_call_destroy(c); grpc_call_destroy(s); + /* TODO(ctiller): this rate limits the test, and it should be removed when + retry has been implemented; until then cross-thread chatter + may result in some requests needing to be cancelled due to + seqno exhaustion. */ + cq_verify_empty(cqv); + cq_verifier_destroy(cqv); } diff --git a/test/core/end2end/tests/network_status_change.c b/test/core/end2end/tests/network_status_change.c index 10207844ab..39ddc13754 100644 --- a/test/core/end2end/tests/network_status_change.c +++ b/test/core/end2end/tests/network_status_change.c @@ -186,9 +186,10 @@ static void test_invoke_network_status_change(grpc_end2end_test_config config) { GPR_ASSERT(GRPC_CALL_OK == error); cq_expect_completion(cqv, tag(102), 1); + cq_verify(cqv); + // Simulate the network loss event grpc_network_status_shutdown_all_endpoints(); - cq_verify(cqv); op = ops; op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; @@ -205,7 +206,7 @@ static void test_invoke_network_status_change(grpc_end2end_test_config config) { op++; error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(103), NULL); GPR_ASSERT(GRPC_CALL_OK == error); - void shutdown_all_endpoints(); + cq_expect_completion(cqv, tag(103), 1); cq_expect_completion(cqv, tag(1), 1); cq_verify(cqv); diff --git a/test/core/internal_api_canaries/iomgr.c b/test/core/internal_api_canaries/iomgr.c index 5e86c42309..27d630623e 100644 --- a/test/core/internal_api_canaries/iomgr.c +++ b/test/core/internal_api_canaries/iomgr.c @@ -77,11 +77,14 @@ static void test_code(void) { /* endpoint.h */ grpc_endpoint endpoint; - grpc_endpoint_vtable vtable = { - grpc_endpoint_read, grpc_endpoint_write, - grpc_endpoint_add_to_pollset, grpc_endpoint_add_to_pollset_set, - grpc_endpoint_shutdown, grpc_endpoint_destroy, - grpc_endpoint_get_peer}; + grpc_endpoint_vtable vtable = {grpc_endpoint_read, + grpc_endpoint_write, + grpc_endpoint_get_workqueue, + grpc_endpoint_add_to_pollset, + grpc_endpoint_add_to_pollset_set, + grpc_endpoint_shutdown, + grpc_endpoint_destroy, + grpc_endpoint_get_peer}; endpoint.vtable = &vtable; grpc_endpoint_read(&exec_ctx, &endpoint, NULL, NULL); diff --git a/test/core/iomgr/workqueue_test.c b/test/core/iomgr/workqueue_test.c deleted file mode 100644 index 76ecfae74b..0000000000 --- a/test/core/iomgr/workqueue_test.c +++ /dev/null @@ -1,150 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include "src/core/lib/iomgr/workqueue.h" - -#include -#include -#include - -#include "test/core/util/test_config.h" - -static gpr_mu *g_mu; -static grpc_pollset *g_pollset; - -static void must_succeed(grpc_exec_ctx *exec_ctx, void *p, grpc_error *error) { - GPR_ASSERT(error == GRPC_ERROR_NONE); - gpr_mu_lock(g_mu); - *(int *)p = 1; - GPR_ASSERT( - GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL))); - gpr_mu_unlock(g_mu); -} - -static void test_ref_unref(void) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_workqueue *wq; - GPR_ASSERT(GRPC_LOG_IF_ERROR("grpc_workqueue_create", - grpc_workqueue_create(&exec_ctx, &wq))); - GRPC_WORKQUEUE_REF(wq, "test"); - GRPC_WORKQUEUE_UNREF(&exec_ctx, wq, "test"); - GRPC_WORKQUEUE_UNREF(&exec_ctx, wq, "destroy"); - grpc_exec_ctx_finish(&exec_ctx); -} - -static void test_add_closure(void) { - grpc_closure c; - int done = 0; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_workqueue *wq; - GPR_ASSERT(GRPC_LOG_IF_ERROR("grpc_workqueue_create", - grpc_workqueue_create(&exec_ctx, &wq))); - gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5); - grpc_pollset_worker *worker = NULL; - grpc_closure_init(&c, must_succeed, &done); - - grpc_workqueue_enqueue(&exec_ctx, wq, &c, GRPC_ERROR_NONE); - grpc_workqueue_add_to_pollset(&exec_ctx, wq, g_pollset); - - gpr_mu_lock(g_mu); - GPR_ASSERT(!done); - while (!done) { - GPR_ASSERT(GRPC_LOG_IF_ERROR( - "pollset_work", - grpc_pollset_work(&exec_ctx, g_pollset, &worker, - gpr_now(deadline.clock_type), deadline))); - } - gpr_mu_unlock(g_mu); - grpc_exec_ctx_finish(&exec_ctx); - GPR_ASSERT(done); - - GRPC_WORKQUEUE_UNREF(&exec_ctx, wq, "destroy"); - grpc_exec_ctx_finish(&exec_ctx); -} - -static void test_flush(void) { - grpc_closure c; - int done = 0; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_workqueue *wq; - GPR_ASSERT(GRPC_LOG_IF_ERROR("grpc_workqueue_create", - grpc_workqueue_create(&exec_ctx, &wq))); - gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5); - grpc_pollset_worker *worker = NULL; - grpc_closure_init(&c, must_succeed, &done); - - grpc_exec_ctx_sched(&exec_ctx, &c, GRPC_ERROR_NONE, NULL); - grpc_workqueue_flush(&exec_ctx, wq); - grpc_workqueue_add_to_pollset(&exec_ctx, wq, g_pollset); - - gpr_mu_lock(g_mu); - GPR_ASSERT(!done); - while (!done) { - GPR_ASSERT(GRPC_LOG_IF_ERROR( - "pollset_work", - grpc_pollset_work(&exec_ctx, g_pollset, &worker, - gpr_now(deadline.clock_type), deadline))); - } - gpr_mu_unlock(g_mu); - grpc_exec_ctx_finish(&exec_ctx); - GPR_ASSERT(done); - - GRPC_WORKQUEUE_UNREF(&exec_ctx, wq, "destroy"); - grpc_exec_ctx_finish(&exec_ctx); -} - -static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, - grpc_error *error) { - grpc_pollset_destroy(p); -} - -int main(int argc, char **argv) { - grpc_closure destroyed; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_test_init(argc, argv); - grpc_init(); - g_pollset = gpr_malloc(grpc_pollset_size()); - grpc_pollset_init(g_pollset, &g_mu); - - test_ref_unref(); - test_add_closure(); - test_flush(); - - grpc_closure_init(&destroyed, destroy_pollset, g_pollset); - grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed); - grpc_exec_ctx_finish(&exec_ctx); - grpc_shutdown(); - - gpr_free(g_pollset); - return 0; -} diff --git a/test/core/util/mock_endpoint.c b/test/core/util/mock_endpoint.c index ed9545e9df..13e0e918fb 100644 --- a/test/core/util/mock_endpoint.c +++ b/test/core/util/mock_endpoint.c @@ -95,9 +95,17 @@ static char *me_get_peer(grpc_endpoint *ep) { return gpr_strdup("fake:mock_endpoint"); } +static grpc_workqueue *me_get_workqueue(grpc_endpoint *ep) { return NULL; } + static const grpc_endpoint_vtable vtable = { - me_read, me_write, me_add_to_pollset, me_add_to_pollset_set, - me_shutdown, me_destroy, me_get_peer, + me_read, + me_write, + me_get_workqueue, + me_add_to_pollset, + me_add_to_pollset_set, + me_shutdown, + me_destroy, + me_get_peer, }; grpc_endpoint *grpc_mock_endpoint_create(void (*on_write)(gpr_slice slice)) { diff --git a/test/core/util/passthru_endpoint.c b/test/core/util/passthru_endpoint.c index a39f3dd66e..7ed9e97bd6 100644 --- a/test/core/util/passthru_endpoint.c +++ b/test/core/util/passthru_endpoint.c @@ -140,9 +140,17 @@ static char *me_get_peer(grpc_endpoint *ep) { return gpr_strdup("fake:mock_endpoint"); } +static grpc_workqueue *me_get_workqueue(grpc_endpoint *ep) { return NULL; } + static const grpc_endpoint_vtable vtable = { - me_read, me_write, me_add_to_pollset, me_add_to_pollset_set, - me_shutdown, me_destroy, me_get_peer, + me_read, + me_write, + me_get_workqueue, + me_add_to_pollset, + me_add_to_pollset_set, + me_shutdown, + me_destroy, + me_get_peer, }; static void half_init(half *m, passthru_endpoint *parent) { diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 354a59cedd..0f87ae3e44 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -1166,6 +1166,9 @@ TEST_P(ProxyEnd2endTest, HugeResponse) { request.mutable_param()->set_response_message_length(kResponseSize); ClientContext context; + std::chrono::system_clock::time_point deadline = + std::chrono::system_clock::now() + std::chrono::seconds(20); + context.set_deadline(deadline); Status s = stub_->Echo(&context, request, &response); EXPECT_EQ(kResponseSize, response.message().size()); EXPECT_TRUE(s.ok()); diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 1507d1e3d6..5dd0bd8533 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -206,21 +206,30 @@ class AsyncClient : public ClientImpl { void* got_tag; bool ok; - if (cli_cqs_[thread_idx]->Next(&got_tag, &ok)) { - // Got a regular event, so process it - ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); - if (!ctx->RunNextState(ok, histogram)) { - // The RPC and callback are done, so clone the ctx - // and kickstart the new one - auto clone = ctx->StartNewClone(); - clone->Start(cli_cqs_[thread_idx].get()); - // delete the old version - delete ctx; + switch (cli_cqs_[thread_idx]->AsyncNext( + &got_tag, &ok, + std::chrono::system_clock::now() + std::chrono::milliseconds(10))) { + case CompletionQueue::SHUTDOWN: + return false; + case CompletionQueue::GOT_EVENT: { + // Got a regular event, so process it + ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); + if (!ctx->RunNextState(ok, histogram)) { + // The RPC and callback are done, so clone the ctx + // and kickstart the new one + auto clone = ctx->StartNewClone(); + clone->Start(cli_cqs_[thread_idx].get()); + // delete the old version + delete ctx; + } + return true; } - return true; - } else { // queue is shutting down - return false; + case CompletionQueue::TIMEOUT: + // TODO(ctiller): do something here to track how frequently we pass + // through this codepath. + return true; } + GPR_UNREACHABLE_CODE(return false); } protected: diff --git a/test/cpp/qps/gen_build_yaml.py b/test/cpp/qps/gen_build_yaml.py index 34b8151441..4ff4e44b8b 100755 --- a/test/cpp/qps/gen_build_yaml.py +++ b/test/cpp/qps/gen_build_yaml.py @@ -45,9 +45,10 @@ import performance.scenario_config as scenario_config def _scenario_json_string(scenario_json): # tweak parameters to get fast test times - scenario_json['warmup_seconds'] = 1 + scenario_json['warmup_seconds'] = 0 scenario_json['benchmark_seconds'] = 1 - return json.dumps(scenario_config.remove_nonproto_fields(scenario_json)) + scenarios_json = {'scenarios': [scenario_config.remove_nonproto_fields(scenario_json)]} + return json.dumps(scenarios_json) def threads_of_type(scenario_json, path): d = scenario_json @@ -72,8 +73,7 @@ print yaml.dump({ { 'name': 'json_run_localhost', 'shortname': 'json_run_localhost:%s' % scenario_json['name'], - 'args': ['--scenario_json', - pipes.quote(_scenario_json_string(scenario_json))], + 'args': ['--scenarios_json', _scenario_json_string(scenario_json)], 'ci_platforms': ['linux', 'mac', 'posix', 'windows'], 'platforms': ['linux', 'mac', 'posix', 'windows'], 'flaky': False, @@ -81,7 +81,8 @@ print yaml.dump({ 'boringssl': True, 'defaults': 'boringssl', 'cpu_cost': guess_cpu(scenario_json), - 'exclude_configs': [] + 'exclude_configs': [], + 'timeout_seconds': 3*60 } for scenario_json in scenario_config.CXXLanguage().scenarios() ] diff --git a/test/cpp/qps/json_run_localhost.cc b/test/cpp/qps/json_run_localhost.cc index 6545dc2917..74e40fbf1a 100644 --- a/test/cpp/qps/json_run_localhost.cc +++ b/test/cpp/qps/json_run_localhost.cc @@ -75,7 +75,7 @@ int main(int argc, char** argv) { for (int i = 1; i < argc; i++) { args.push_back(argv[i]); } - SubProcess(args).Join(); + GPR_ASSERT(SubProcess(args).Join() == 0); for (auto it = jobs.begin(); it != jobs.end(); ++it) { (*it)->Interrupt(); diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index c9954d0d02..73ca19148b 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -102,7 +102,7 @@ class AsyncQpsServerTest : public Server { auto process_rpc_bound = std::bind(process_rpc, config.payload_config(), _1, _2); - for (int i = 0; i < 10000 / num_threads; i++) { + for (int i = 0; i < 15000; i++) { for (int j = 0; j < num_threads; j++) { if (request_unary_function) { auto request_unary = @@ -132,7 +132,8 @@ class AsyncQpsServerTest : public Server { for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) { (*ss)->set_shutdown(); } - server_->Shutdown(); + server_->Shutdown(std::chrono::system_clock::now() + + std::chrono::seconds(3)); for (auto thr = threads_.begin(); thr != threads_.end(); thr++) { thr->join(); } diff --git a/tools/dockerfile/test/python_pyenv_x64/Dockerfile b/tools/dockerfile/test/python_pyenv_x64/Dockerfile new file mode 100644 index 0000000000..abb5f3c89b --- /dev/null +++ b/tools/dockerfile/test/python_pyenv_x64/Dockerfile @@ -0,0 +1,112 @@ +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +FROM debian:jessie + +# Install Git and basic packages. +RUN apt-get update && apt-get install -y \ + autoconf \ + autotools-dev \ + build-essential \ + bzip2 \ + ccache \ + curl \ + gcc \ + gcc-multilib \ + git \ + golang \ + gyp \ + lcov \ + libc6 \ + libc6-dbg \ + libc6-dev \ + libgtest-dev \ + libtool \ + make \ + perl \ + strace \ + python-dev \ + python-setuptools \ + python-yaml \ + telnet \ + unzip \ + wget \ + zip && apt-get clean + +#================ +# Build profiling +RUN apt-get update && apt-get install -y time && apt-get clean + +#==================== +# Python dependencies + +# Install dependencies + +RUN apt-get update && apt-get install -y \ + python-all-dev \ + python3-all-dev \ + python-pip + +# Install Python packages from PyPI +RUN pip install pip --upgrade +RUN pip install virtualenv +RUN pip install futures==2.2.0 enum34==1.0.4 protobuf==3.0.0a2 six==1.10.0 + +# Install dependencies for pyenv +RUN apt-get update && apt-get install -y \ + libbz2-dev \ + libncurses5-dev \ + libncursesw5-dev \ + libreadline-dev \ + libsqlite3-dev \ + libssl-dev \ + llvm \ + mercurial \ + zlib1g-dev && apt-get clean + +# Install Pyenv and dev Python versions 3.5 and 3.6 +RUN curl -L https://raw.githubusercontent.com/yyuu/pyenv-installer/master/bin/pyenv-installer | bash +RUN pyenv update +RUN pyenv install 3.5-dev +RUN pyenv install 3.6-dev +RUN pyenv local 3.5-dev 3.6-dev + +# Prepare ccache +RUN ln -s /usr/bin/ccache /usr/local/bin/gcc +RUN ln -s /usr/bin/ccache /usr/local/bin/g++ +RUN ln -s /usr/bin/ccache /usr/local/bin/cc +RUN ln -s /usr/bin/ccache /usr/local/bin/c++ +RUN ln -s /usr/bin/ccache /usr/local/bin/clang +RUN ln -s /usr/bin/ccache /usr/local/bin/clang++ + + +RUN mkdir /var/local/jenkins + +# Define the default command. +CMD ["bash"] diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py index 95d53e5f9e..7dd9e12614 100755 --- a/tools/run_tests/run_tests.py +++ b/tools/run_tests/run_tests.py @@ -39,6 +39,7 @@ import json import multiprocessing import os import os.path +import pipes import platform import random import re @@ -72,6 +73,9 @@ def platform_string(): return jobset.platform_string() +_DEFAULT_TIMEOUT_SECONDS = 5 * 60 + + # SimpleConfig: just compile with CONFIG=config, and run the binary to test class Config(object): @@ -84,7 +88,7 @@ class Config(object): self.tool_prefix = tool_prefix self.timeout_multiplier = timeout_multiplier - def job_spec(self, cmdline, timeout_seconds=5*60, + def job_spec(self, cmdline, timeout_seconds=_DEFAULT_TIMEOUT_SECONDS, shortname=None, environ={}, cpu_cost=1.0, flaky=False): """Construct a jobset.JobSpec for a test under this config @@ -159,7 +163,7 @@ class CLanguage(object): env={'GRPC_DEFAULT_SSL_ROOTS_FILE_PATH': _ROOT + '/src/core/lib/tsi/test_creds/ca.pem', 'GRPC_POLL_STRATEGY': polling_strategy} - shortname_ext = '' if polling_strategy=='all' else ' polling=%s' % polling_strategy + shortname_ext = '' if polling_strategy=='all' else ' GRPC_POLL_STRATEGY=%s' % polling_strategy if self.config.build_config in target['exclude_configs']: continue if self.platform == 'windows': @@ -190,28 +194,26 @@ class CLanguage(object): assert line[1] == ' ' test = base + line.strip() cmdline = [binary] + ['--gtest_filter=%s' % test] - out.append(self.config.job_spec(cmdline, [binary], - shortname='%s:%s %s' % (binary, test, shortname_ext), + out.append(self.config.job_spec(cmdline, + shortname='%s --gtest_filter=%s %s' % (binary, test, shortname_ext), cpu_cost=target['cpu_cost'], environ=env)) else: cmdline = [binary] + target['args'] - out.append(self.config.job_spec(cmdline, [binary], - shortname=' '.join(cmdline) + shortname_ext, + out.append(self.config.job_spec(cmdline, + shortname=' '.join( + pipes.quote(arg) + for arg in cmdline) + + shortname_ext, cpu_cost=target['cpu_cost'], flaky=target.get('flaky', False), + timeout_seconds=target.get('timeout_seconds', _DEFAULT_TIMEOUT_SECONDS), environ=env)) elif self.args.regex == '.*' or self.platform == 'windows': print '\nWARNING: binary not found, skipping', binary return sorted(out) def make_targets(self): - test_regex = self.args.regex - if self.platform != 'windows' and self.args.regex != '.*': - # use the regex to minimize the number of things to build - return [os.path.basename(target['name']) - for target in get_c_tests(False, self.test_lang) - if re.search(test_regex, '/' + target['name'])] if self.platform == 'windows': # don't build tools on windows just yet return ['buildtests_%s' % self.make_target] @@ -1283,8 +1285,6 @@ def _build_and_run( jobset.message( 'FLAKE', '%s [%d/%d runs flaked]' % (k, num_failures, num_runs), do_newline=True) - else: - jobset.message('PASSED', k, do_newline=True) finally: for antagonist in antagonists: antagonist.kill() diff --git a/tools/run_tests/sources_and_headers.json b/tools/run_tests/sources_and_headers.json index cdbc254f43..e3cfd55cd6 100644 --- a/tools/run_tests/sources_and_headers.json +++ b/tools/run_tests/sources_and_headers.json @@ -1818,22 +1818,6 @@ "third_party": false, "type": "target" }, - { - "deps": [ - "gpr", - "gpr_test_util", - "grpc", - "grpc_test_util" - ], - "headers": [], - "language": "c", - "name": "workqueue_test", - "src": [ - "test/core/iomgr/workqueue_test.c" - ], - "third_party": false, - "type": "target" - }, { "deps": [ "gpr", diff --git a/tools/run_tests/tests.json b/tools/run_tests/tests.json index 93d42e3454..d94301b946 100644 --- a/tools/run_tests/tests.json +++ b/tools/run_tests/tests.json @@ -1935,25 +1935,6 @@ "windows" ] }, - { - "args": [], - "ci_platforms": [ - "linux", - "mac", - "posix" - ], - "cpu_cost": 1.0, - "exclude_configs": [], - "flaky": false, - "gtest": false, - "language": "c", - "name": "workqueue_test", - "platforms": [ - "linux", - "mac", - "posix" - ] - }, { "args": [], "ci_platforms": [ @@ -27153,8 +27134,8 @@ }, { "args": [ - "--scenario_json", - "'{\"name\": \"cpp_generic_async_streaming_ping_pong_secure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"server_type\": \"ASYNC_GENERIC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}'" + "--scenarios_json", + "{\"scenarios\": [{\"name\": \"cpp_generic_async_streaming_ping_pong_secure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"server_type\": \"ASYNC_GENERIC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}]}" ], "boringssl": true, "ci_platforms": [ @@ -27175,12 +27156,13 @@ "posix", "windows" ], - "shortname": "json_run_localhost:cpp_generic_async_streaming_ping_pong_secure" + "shortname": "json_run_localhost:cpp_generic_async_streaming_ping_pong_secure", + "timeout_seconds": 180 }, { "args": [ - "--scenario_json", - "'{\"name\": \"cpp_protobuf_async_streaming_ping_pong_secure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}'" + "--scenarios_json", + "{\"scenarios\": [{\"name\": \"cpp_protobuf_async_streaming_ping_pong_secure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}]}" ], "boringssl": true, "ci_platforms": [ @@ -27201,12 +27183,13 @@ "posix", "windows" ], - "shortname": "json_run_localhost:cpp_protobuf_async_streaming_ping_pong_secure" + "shortname": "json_run_localhost:cpp_protobuf_async_streaming_ping_pong_secure", + "timeout_seconds": 180 }, { "args": [ - "--scenario_json", - "'{\"name\": \"cpp_protobuf_async_unary_ping_pong_secure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"UNARY\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}'" + "--scenarios_json", + "{\"scenarios\": [{\"name\": \"cpp_protobuf_async_unary_ping_pong_secure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"UNARY\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}]}" ], "boringssl": true, "ci_platforms": [ @@ -27227,12 +27210,13 @@ "posix", "windows" ], - "shortname": "json_run_localhost:cpp_protobuf_async_unary_ping_pong_secure" + "shortname": "json_run_localhost:cpp_protobuf_async_unary_ping_pong_secure", + "timeout_seconds": 180 }, { "args": [ - "--scenario_json", - "'{\"name\": \"cpp_protobuf_sync_unary_ping_pong_secure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"server_type\": \"SYNC_SERVER\"}, \"client_config\": {\"client_type\": \"SYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"UNARY\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}'" + "--scenarios_json", + "{\"scenarios\": [{\"name\": \"cpp_protobuf_sync_unary_ping_pong_secure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"server_type\": \"SYNC_SERVER\"}, \"client_config\": {\"client_type\": \"SYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"UNARY\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}]}" ], "boringssl": true, "ci_platforms": [ @@ -27253,12 +27237,13 @@ "posix", "windows" ], - "shortname": "json_run_localhost:cpp_protobuf_sync_unary_ping_pong_secure" + "shortname": "json_run_localhost:cpp_protobuf_sync_unary_ping_pong_secure", + "timeout_seconds": 180 }, { "args": [ - "--scenario_json", - "'{\"name\": \"cpp_protobuf_async_unary_qps_unconstrained_secure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 0, \"core_limit\": 0, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"UNARY\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}'" + "--scenarios_json", + "{\"scenarios\": [{\"name\": \"cpp_protobuf_async_unary_qps_unconstrained_secure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 0, \"core_limit\": 0, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"UNARY\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}]}" ], "boringssl": true, "ci_platforms": [ @@ -27279,12 +27264,13 @@ "posix", "windows" ], - "shortname": "json_run_localhost:cpp_protobuf_async_unary_qps_unconstrained_secure" + "shortname": "json_run_localhost:cpp_protobuf_async_unary_qps_unconstrained_secure", + "timeout_seconds": 180 }, { "args": [ - "--scenario_json", - "'{\"name\": \"cpp_protobuf_async_streaming_qps_unconstrained_secure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 0, \"core_limit\": 0, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}'" + "--scenarios_json", + "{\"scenarios\": [{\"name\": \"cpp_protobuf_async_streaming_qps_unconstrained_secure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 0, \"core_limit\": 0, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}]}" ], "boringssl": true, "ci_platforms": [ @@ -27305,12 +27291,13 @@ "posix", "windows" ], - "shortname": "json_run_localhost:cpp_protobuf_async_streaming_qps_unconstrained_secure" + "shortname": "json_run_localhost:cpp_protobuf_async_streaming_qps_unconstrained_secure", + "timeout_seconds": 180 }, { "args": [ - "--scenario_json", - "'{\"name\": \"cpp_generic_async_streaming_qps_unconstrained_secure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 0, \"core_limit\": 0, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"server_type\": \"ASYNC_GENERIC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}'" + "--scenarios_json", + "{\"scenarios\": [{\"name\": \"cpp_generic_async_streaming_qps_unconstrained_secure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 0, \"core_limit\": 0, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"server_type\": \"ASYNC_GENERIC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}]}" ], "boringssl": true, "ci_platforms": [ @@ -27331,12 +27318,13 @@ "posix", "windows" ], - "shortname": "json_run_localhost:cpp_generic_async_streaming_qps_unconstrained_secure" + "shortname": "json_run_localhost:cpp_generic_async_streaming_qps_unconstrained_secure", + "timeout_seconds": 180 }, { "args": [ - "--scenario_json", - "'{\"name\": \"cpp_generic_async_streaming_qps_one_server_core_secure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"server_type\": \"ASYNC_GENERIC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}'" + "--scenarios_json", + "{\"scenarios\": [{\"name\": \"cpp_generic_async_streaming_qps_one_server_core_secure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"server_type\": \"ASYNC_GENERIC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}]}" ], "boringssl": true, "ci_platforms": [ @@ -27357,12 +27345,13 @@ "posix", "windows" ], - "shortname": "json_run_localhost:cpp_generic_async_streaming_qps_one_server_core_secure" + "shortname": "json_run_localhost:cpp_generic_async_streaming_qps_one_server_core_secure", + "timeout_seconds": 180 }, { "args": [ - "--scenario_json", - "'{\"name\": \"cpp_generic_async_streaming_ping_pong_insecure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": null, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"server_type\": \"ASYNC_GENERIC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}'" + "--scenarios_json", + "{\"scenarios\": [{\"name\": \"cpp_generic_async_streaming_ping_pong_insecure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": null, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"server_type\": \"ASYNC_GENERIC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}]}" ], "boringssl": true, "ci_platforms": [ @@ -27383,12 +27372,13 @@ "posix", "windows" ], - "shortname": "json_run_localhost:cpp_generic_async_streaming_ping_pong_insecure" + "shortname": "json_run_localhost:cpp_generic_async_streaming_ping_pong_insecure", + "timeout_seconds": 180 }, { "args": [ - "--scenario_json", - "'{\"name\": \"cpp_protobuf_async_streaming_ping_pong_insecure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": null, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}'" + "--scenarios_json", + "{\"scenarios\": [{\"name\": \"cpp_protobuf_async_streaming_ping_pong_insecure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": null, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}]}" ], "boringssl": true, "ci_platforms": [ @@ -27409,12 +27399,13 @@ "posix", "windows" ], - "shortname": "json_run_localhost:cpp_protobuf_async_streaming_ping_pong_insecure" + "shortname": "json_run_localhost:cpp_protobuf_async_streaming_ping_pong_insecure", + "timeout_seconds": 180 }, { "args": [ - "--scenario_json", - "'{\"name\": \"cpp_protobuf_async_unary_ping_pong_insecure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": null, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"UNARY\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}'" + "--scenarios_json", + "{\"scenarios\": [{\"name\": \"cpp_protobuf_async_unary_ping_pong_insecure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": null, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"UNARY\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}]}" ], "boringssl": true, "ci_platforms": [ @@ -27435,12 +27426,13 @@ "posix", "windows" ], - "shortname": "json_run_localhost:cpp_protobuf_async_unary_ping_pong_insecure" + "shortname": "json_run_localhost:cpp_protobuf_async_unary_ping_pong_insecure", + "timeout_seconds": 180 }, { "args": [ - "--scenario_json", - "'{\"name\": \"cpp_protobuf_sync_unary_ping_pong_insecure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": null, \"server_type\": \"SYNC_SERVER\"}, \"client_config\": {\"client_type\": \"SYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"UNARY\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}'" + "--scenarios_json", + "{\"scenarios\": [{\"name\": \"cpp_protobuf_sync_unary_ping_pong_insecure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": null, \"server_type\": \"SYNC_SERVER\"}, \"client_config\": {\"client_type\": \"SYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"UNARY\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}]}" ], "boringssl": true, "ci_platforms": [ @@ -27461,12 +27453,13 @@ "posix", "windows" ], - "shortname": "json_run_localhost:cpp_protobuf_sync_unary_ping_pong_insecure" + "shortname": "json_run_localhost:cpp_protobuf_sync_unary_ping_pong_insecure", + "timeout_seconds": 180 }, { "args": [ - "--scenario_json", - "'{\"name\": \"cpp_protobuf_async_unary_qps_unconstrained_insecure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 0, \"core_limit\": 0, \"security_params\": null, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"UNARY\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}'" + "--scenarios_json", + "{\"scenarios\": [{\"name\": \"cpp_protobuf_async_unary_qps_unconstrained_insecure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 0, \"core_limit\": 0, \"security_params\": null, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"UNARY\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}]}" ], "boringssl": true, "ci_platforms": [ @@ -27487,12 +27480,13 @@ "posix", "windows" ], - "shortname": "json_run_localhost:cpp_protobuf_async_unary_qps_unconstrained_insecure" + "shortname": "json_run_localhost:cpp_protobuf_async_unary_qps_unconstrained_insecure", + "timeout_seconds": 180 }, { "args": [ - "--scenario_json", - "'{\"name\": \"cpp_protobuf_async_streaming_qps_unconstrained_insecure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 0, \"core_limit\": 0, \"security_params\": null, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}'" + "--scenarios_json", + "{\"scenarios\": [{\"name\": \"cpp_protobuf_async_streaming_qps_unconstrained_insecure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 0, \"core_limit\": 0, \"security_params\": null, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}]}" ], "boringssl": true, "ci_platforms": [ @@ -27513,12 +27507,13 @@ "posix", "windows" ], - "shortname": "json_run_localhost:cpp_protobuf_async_streaming_qps_unconstrained_insecure" + "shortname": "json_run_localhost:cpp_protobuf_async_streaming_qps_unconstrained_insecure", + "timeout_seconds": 180 }, { "args": [ - "--scenario_json", - "'{\"name\": \"cpp_generic_async_streaming_qps_unconstrained_insecure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 0, \"core_limit\": 0, \"security_params\": null, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"server_type\": \"ASYNC_GENERIC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}'" + "--scenarios_json", + "{\"scenarios\": [{\"name\": \"cpp_generic_async_streaming_qps_unconstrained_insecure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 0, \"core_limit\": 0, \"security_params\": null, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"server_type\": \"ASYNC_GENERIC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}]}" ], "boringssl": true, "ci_platforms": [ @@ -27539,12 +27534,13 @@ "posix", "windows" ], - "shortname": "json_run_localhost:cpp_generic_async_streaming_qps_unconstrained_insecure" + "shortname": "json_run_localhost:cpp_generic_async_streaming_qps_unconstrained_insecure", + "timeout_seconds": 180 }, { "args": [ - "--scenario_json", - "'{\"name\": \"cpp_generic_async_streaming_qps_one_server_core_insecure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": null, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"server_type\": \"ASYNC_GENERIC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}'" + "--scenarios_json", + "{\"scenarios\": [{\"name\": \"cpp_generic_async_streaming_qps_one_server_core_insecure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": null, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"server_type\": \"ASYNC_GENERIC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}]}" ], "boringssl": true, "ci_platforms": [ @@ -27565,7 +27561,8 @@ "posix", "windows" ], - "shortname": "json_run_localhost:cpp_generic_async_streaming_qps_one_server_core_insecure" + "shortname": "json_run_localhost:cpp_generic_async_streaming_qps_one_server_core_insecure", + "timeout_seconds": 180 }, { "args": [ -- cgit v1.2.3 From b3ce178b28ebb842408f8cdc5b116816a0171095 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Wed, 20 Jul 2016 09:25:25 -0700 Subject: clang-format --- .../chttp2/client/insecure/channel_create.c | 9 ++++----- .../chttp2/client/secure/secure_channel_create.c | 22 ++++++++++------------ .../chttp2/server/insecure/server_chttp2.c | 6 +++--- .../chttp2/server/secure/server_secure_chttp2.c | 4 ++-- src/core/lib/channel/handshaker.c | 14 +++++--------- src/core/lib/channel/handshaker.h | 11 ++++------- 6 files changed, 28 insertions(+), 38 deletions(-) (limited to 'src/core/ext/transport/chttp2') diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c index 154c4493ff..162cc6bd6a 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c @@ -88,7 +88,7 @@ static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg, } static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, - grpc_channel_args* args, void *user_data) { + grpc_channel_args *args, void *user_data) { connector *c = user_data; c->result->transport = grpc_create_chttp2_transport(exec_ctx, args, endpoint, 1); @@ -114,10 +114,9 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_endpoint_write(exec_ctx, tcp, &c->initial_string_buffer, &c->initial_string_sent); } else { - grpc_handshake_manager_do_handshake(exec_ctx, c->handshake_mgr, tcp, - c->args.channel_args, - c->args.deadline, on_handshake_done, - c); + grpc_handshake_manager_do_handshake( + exec_ctx, c->handshake_mgr, tcp, c->args.channel_args, + c->args.deadline, on_handshake_done, c); } } else { memset(c->result, 0, sizeof(*c->result)); diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c index 4554c33c8c..dde35f253d 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c @@ -74,7 +74,7 @@ typedef struct { grpc_handshake_manager *handshake_mgr; // TODO(roth): Remove once we eliminate on_secure_handshake_done(). - grpc_channel_args* tmp_args; + grpc_channel_args *tmp_args; } connector; static void connector_ref(grpc_connector *con) { @@ -117,8 +117,8 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, NULL, 0); auth_context_arg = grpc_auth_context_to_arg(auth_context); - c->result->channel_args = grpc_channel_args_copy_and_add( - c->tmp_args, &auth_context_arg, 1); + c->result->channel_args = + grpc_channel_args_copy_and_add(c->tmp_args, &auth_context_arg, 1); } grpc_closure *notify = c->notify; c->notify = NULL; @@ -126,7 +126,7 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, } static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, - grpc_channel_args* args, void *user_data) { + grpc_channel_args *args, void *user_data) { connector *c = user_data; // TODO(roth, jboeuf): Convert security connector handshaking to use new // handshake API, and then move the code from on_secure_handshake_done() @@ -140,10 +140,9 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { connector *c = arg; - grpc_handshake_manager_do_handshake(exec_ctx, c->handshake_mgr, - c->connecting_endpoint, - c->args.channel_args, c->args.deadline, - on_handshake_done, c); + grpc_handshake_manager_do_handshake( + exec_ctx, c->handshake_mgr, c->connecting_endpoint, c->args.channel_args, + c->args.deadline, on_handshake_done, c); } static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { @@ -163,10 +162,9 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_endpoint_write(exec_ctx, tcp, &c->initial_string_buffer, &c->initial_string_sent); } else { - grpc_handshake_manager_do_handshake(exec_ctx, c->handshake_mgr, tcp, - c->args.channel_args, - c->args.deadline, on_handshake_done, - c); + grpc_handshake_manager_do_handshake( + exec_ctx, c->handshake_mgr, tcp, c->args.channel_args, + c->args.deadline, on_handshake_done, c); } } else { memset(c->result, 0, sizeof(*c->result)); diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c index 920875f694..b8d816c127 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c @@ -55,7 +55,7 @@ typedef struct server_connect_state { } server_connect_state; static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, - grpc_channel_args* args, void *user_data) { + grpc_channel_args *args, void *user_data) { server_connect_state *state = user_data; /* * Beware that the call to grpc_create_chttp2_transport() has to happen before @@ -64,8 +64,8 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, * (as in server_secure_chttp2.c) needs to add synchronization to avoid this * case. */ - grpc_transport *transport = grpc_create_chttp2_transport( - exec_ctx, args, endpoint, 0); + grpc_transport *transport = + grpc_create_chttp2_transport(exec_ctx, args, endpoint, 0); grpc_server_setup_transport(exec_ctx, state->server, transport, state->accepting_pollset, grpc_server_get_channel_args(state->server)); diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c index e3184bc1f9..bc714f4e5b 100644 --- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c +++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c @@ -74,7 +74,7 @@ typedef struct server_secure_connect { // TODO(roth): Remove the following two fields when we eliminate // grpc_server_security_connector_do_handshake(). gpr_timespec deadline; - grpc_channel_args* args; + grpc_channel_args *args; } server_secure_connect; static void state_ref(server_secure_state *state) { gpr_ref(&state->refcount); } @@ -129,7 +129,7 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *statep, } static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, - grpc_channel_args* args, void *user_data) { + grpc_channel_args *args, void *user_data) { server_secure_connect *state = user_data; // TODO(roth, jboeuf): Convert security connector handshaking to use new // handshake API, and then move the code from on_secure_handshake_done() diff --git a/src/core/lib/channel/handshaker.c b/src/core/lib/channel/handshaker.c index 7dcbe1df9c..9191ae6339 100644 --- a/src/core/lib/channel/handshaker.c +++ b/src/core/lib/channel/handshaker.c @@ -129,8 +129,7 @@ void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx, // handshakers together. static void call_next_handshaker(grpc_exec_ctx* exec_ctx, grpc_endpoint* endpoint, - grpc_channel_args* args, - void* user_data) { + grpc_channel_args* args, void* user_data) { grpc_handshake_manager* mgr = user_data; GPR_ASSERT(mgr->state != NULL); GPR_ASSERT(mgr->state->index < mgr->count); @@ -153,13 +152,10 @@ static void call_next_handshaker(grpc_exec_ctx* exec_ctx, } } -void grpc_handshake_manager_do_handshake(grpc_exec_ctx* exec_ctx, - grpc_handshake_manager* mgr, - grpc_endpoint* endpoint, - const grpc_channel_args* args, - gpr_timespec deadline, - grpc_handshaker_done_cb cb, - void* user_data) { +void grpc_handshake_manager_do_handshake( + grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr, + grpc_endpoint* endpoint, const grpc_channel_args* args, + gpr_timespec deadline, grpc_handshaker_done_cb cb, void* user_data) { grpc_channel_args* args_copy = grpc_channel_args_copy(args); if (mgr->count == 0) { // No handshakers registered, so we just immediately call the done diff --git a/src/core/lib/channel/handshaker.h b/src/core/lib/channel/handshaker.h index 6a39529150..b8aa78c245 100644 --- a/src/core/lib/channel/handshaker.h +++ b/src/core/lib/channel/handshaker.h @@ -131,12 +131,9 @@ void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx, /// invoking the first handshaker. /// If successful, invokes \a cb with \a user_data after all handshakers /// have completed. -void grpc_handshake_manager_do_handshake(grpc_exec_ctx* exec_ctx, - grpc_handshake_manager* mgr, - grpc_endpoint* endpoint, - const grpc_channel_args* args, - gpr_timespec deadline, - grpc_handshaker_done_cb cb, - void* user_data); +void grpc_handshake_manager_do_handshake( + grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr, + grpc_endpoint* endpoint, const grpc_channel_args* args, + gpr_timespec deadline, grpc_handshaker_done_cb cb, void* user_data); #endif /* GRPC_CORE_LIB_CHANNEL_HANDSHAKER_H */ -- cgit v1.2.3 From 5682a52cee47d6bbdd71bf1426cd603b306cc2b7 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Wed, 20 Jul 2016 09:54:41 -0700 Subject: Add acceptor parameter. --- .../ext/transport/chttp2/client/insecure/channel_create.c | 2 +- .../transport/chttp2/client/secure/secure_channel_create.c | 4 ++-- .../ext/transport/chttp2/server/insecure/server_chttp2.c | 3 ++- .../transport/chttp2/server/secure/server_secure_chttp2.c | 2 +- src/core/lib/channel/handshaker.c | 13 +++++++++---- src/core/lib/channel/handshaker.h | 12 +++++++++--- 6 files changed, 24 insertions(+), 12 deletions(-) (limited to 'src/core/ext/transport/chttp2') diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c index 162cc6bd6a..645a011748 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c @@ -116,7 +116,7 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { } else { grpc_handshake_manager_do_handshake( exec_ctx, c->handshake_mgr, tcp, c->args.channel_args, - c->args.deadline, on_handshake_done, c); + c->args.deadline, NULL /* acceptor */, on_handshake_done, c); } } else { memset(c->result, 0, sizeof(*c->result)); diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c index dde35f253d..01d949add3 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c @@ -142,7 +142,7 @@ static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg, connector *c = arg; grpc_handshake_manager_do_handshake( exec_ctx, c->handshake_mgr, c->connecting_endpoint, c->args.channel_args, - c->args.deadline, on_handshake_done, c); + c->args.deadline, NULL /* acceptor */, on_handshake_done, c); } static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { @@ -164,7 +164,7 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { } else { grpc_handshake_manager_do_handshake( exec_ctx, c->handshake_mgr, tcp, c->args.channel_args, - c->args.deadline, on_handshake_done, c); + c->args.deadline, NULL /* acceptor */, on_handshake_done, c); } } else { memset(c->result, 0, sizeof(*c->result)); diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c index b8d816c127..8dac63c33b 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c @@ -90,7 +90,8 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *server, grpc_endpoint *tcp, gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(120, GPR_TIMESPAN)); grpc_handshake_manager_do_handshake(exec_ctx, state->handshake_mgr, tcp, grpc_server_get_channel_args(server), - deadline, on_handshake_done, state); + deadline, acceptor, on_handshake_done, + state); } /* Server callback: start listening on our ports */ diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c index bc714f4e5b..2b25fa09e6 100644 --- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c +++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c @@ -158,7 +158,7 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *statep, grpc_endpoint *tcp, grpc_handshake_manager_do_handshake( exec_ctx, state->handshake_mgr, tcp, grpc_server_get_channel_args(state->state->server), state->deadline, - on_handshake_done, state); + acceptor, on_handshake_done, state); } /* Server callback: start listening on our ports */ diff --git a/src/core/lib/channel/handshaker.c b/src/core/lib/channel/handshaker.c index 9191ae6339..9db04440ee 100644 --- a/src/core/lib/channel/handshaker.c +++ b/src/core/lib/channel/handshaker.c @@ -63,9 +63,10 @@ void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx, grpc_endpoint* endpoint, grpc_channel_args* args, gpr_timespec deadline, + grpc_tcp_server_acceptor *acceptor, grpc_handshaker_done_cb cb, void* user_data) { handshaker->vtable->do_handshake(exec_ctx, handshaker, endpoint, args, - deadline, cb, user_data); + deadline, acceptor, cb, user_data); } // @@ -78,6 +79,8 @@ struct grpc_handshaker_state { size_t index; // The deadline for all handshakers. gpr_timespec deadline; + // The acceptor to call the handshakers with. + grpc_tcp_server_acceptor *acceptor; // The final callback and user_data to invoke after the last handshaker. grpc_handshaker_done_cb final_cb; void* final_user_data; @@ -142,8 +145,8 @@ static void call_next_handshaker(grpc_exec_ctx* exec_ctx, } // Invoke handshaker. grpc_handshaker_do_handshake(exec_ctx, mgr->handshakers[mgr->state->index], - endpoint, args, mgr->state->deadline, cb, - user_data); + endpoint, args, mgr->state->deadline, + mgr->state->acceptor, cb, user_data); ++mgr->state->index; // If this is the last handshaker, clean up state. if (mgr->state->index == mgr->count) { @@ -155,7 +158,8 @@ static void call_next_handshaker(grpc_exec_ctx* exec_ctx, void grpc_handshake_manager_do_handshake( grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr, grpc_endpoint* endpoint, const grpc_channel_args* args, - gpr_timespec deadline, grpc_handshaker_done_cb cb, void* user_data) { + gpr_timespec deadline, grpc_tcp_server_acceptor *acceptor, + grpc_handshaker_done_cb cb, void* user_data) { grpc_channel_args* args_copy = grpc_channel_args_copy(args); if (mgr->count == 0) { // No handshakers registered, so we just immediately call the done @@ -166,6 +170,7 @@ void grpc_handshake_manager_do_handshake( mgr->state = gpr_malloc(sizeof(struct grpc_handshaker_state)); memset(mgr->state, 0, sizeof(*mgr->state)); mgr->state->deadline = deadline; + mgr->state->acceptor = acceptor; mgr->state->final_cb = cb; mgr->state->final_user_data = user_data; call_next_handshaker(exec_ctx, endpoint, args_copy, mgr); diff --git a/src/core/lib/channel/handshaker.h b/src/core/lib/channel/handshaker.h index b8aa78c245..b1e91dba4f 100644 --- a/src/core/lib/channel/handshaker.h +++ b/src/core/lib/channel/handshaker.h @@ -40,6 +40,7 @@ #include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/tcp_server.h" /// Handshakers are used to perform initial handshakes on a connection /// before the client sends the initial request. Some examples of what @@ -71,10 +72,12 @@ struct grpc_handshaker_vtable { /// Performs handshaking. When finished, calls \a cb with \a user_data. /// Takes ownership of \a args. + /// \a acceptor will be NULL for client-side handshakers. void (*do_handshake)(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker, grpc_endpoint* endpoint, grpc_channel_args* args, - gpr_timespec deadline, grpc_handshaker_done_cb cb, - void* user_data); + gpr_timespec deadline, + grpc_tcp_server_acceptor* acceptor, + grpc_handshaker_done_cb cb, void* user_data); }; /// Base struct. To subclass, make this the first member of the @@ -99,6 +102,7 @@ void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx, grpc_endpoint* endpoint, grpc_channel_args* args, gpr_timespec deadline, + grpc_tcp_server_acceptor* acceptor, grpc_handshaker_done_cb cb, void* user_data); /// @@ -129,11 +133,13 @@ void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx, /// Invokes handshakers in the order they were added. /// Does NOT take ownership of \a args. Instead, makes a copy before /// invoking the first handshaker. +/// \a acceptor will be NULL for client-side handshakers. /// If successful, invokes \a cb with \a user_data after all handshakers /// have completed. void grpc_handshake_manager_do_handshake( grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr, grpc_endpoint* endpoint, const grpc_channel_args* args, - gpr_timespec deadline, grpc_handshaker_done_cb cb, void* user_data); + gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor, + grpc_handshaker_done_cb cb, void* user_data); #endif /* GRPC_CORE_LIB_CHANNEL_HANDSHAKER_H */ -- cgit v1.2.3 From b16b1f29c3aa7238814cd70a4abcee0f35dc6b9f Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 21 Jul 2016 11:24:32 -0700 Subject: clang-format --- src/core/ext/transport/chttp2/server/insecure/server_chttp2.c | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) (limited to 'src/core/ext/transport/chttp2') diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c index 8dac63c33b..016ce110fe 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c @@ -88,10 +88,9 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *server, grpc_endpoint *tcp, // args instead of hard-coding it. const gpr_timespec deadline = gpr_time_add( gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(120, GPR_TIMESPAN)); - grpc_handshake_manager_do_handshake(exec_ctx, state->handshake_mgr, tcp, - grpc_server_get_channel_args(server), - deadline, acceptor, on_handshake_done, - state); + grpc_handshake_manager_do_handshake( + exec_ctx, state->handshake_mgr, tcp, grpc_server_get_channel_args(server), + deadline, acceptor, on_handshake_done, state); } /* Server callback: start listening on our ports */ -- cgit v1.2.3