diff options
author | 2018-11-16 10:58:12 -0800 | |
---|---|---|
committer | 2018-11-16 11:11:04 -0800 | |
commit | fc332d2c9247832af90792a59ff6d391e84bc8ae (patch) | |
tree | 4bd1db687960ca851f87d237a36f55190ac52f27 /src/core/ext/transport | |
parent | 0eb9a3e783237cd46c8ba6d3b33228f537cafbfc (diff) | |
parent | 9cfacc48ee2e9f8db083d578c84881551734b1f0 (diff) |
Merge master
Diffstat (limited to 'src/core/ext/transport')
19 files changed, 1180 insertions, 981 deletions
diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.cc b/src/core/ext/transport/chttp2/client/chttp2_connector.cc index e7522ffba8..60a32022f5 100644 --- a/src/core/ext/transport/chttp2/client/chttp2_connector.cc +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.cc @@ -117,6 +117,8 @@ static void on_handshake_done(void* arg, grpc_error* error) { c->args.interested_parties); c->result->transport = grpc_create_chttp2_transport(args->args, args->endpoint, true); + c->result->socket_uuid = + grpc_chttp2_transport_get_socket_uuid(c->result->transport); GPR_ASSERT(c->result->transport); // TODO(roth): We ideally want to wait until we receive HTTP/2 // settings from the server before we consider the connection @@ -158,12 +160,11 @@ static void on_handshake_done(void* arg, grpc_error* error) { static void start_handshake_locked(chttp2_connector* c) { c->handshake_mgr = grpc_handshake_manager_create(); grpc_handshakers_add(HANDSHAKER_CLIENT, c->args.channel_args, - c->handshake_mgr); + c->args.interested_parties, c->handshake_mgr); grpc_endpoint_add_to_pollset_set(c->endpoint, c->args.interested_parties); grpc_handshake_manager_do_handshake( - c->handshake_mgr, c->args.interested_parties, c->endpoint, - c->args.channel_args, c->args.deadline, nullptr /* acceptor */, - on_handshake_done, c); + c->handshake_mgr, c->endpoint, c->args.channel_args, c->args.deadline, + nullptr /* acceptor */, on_handshake_done, c); c->endpoint = nullptr; // Endpoint handed off to handshake manager. } @@ -211,9 +212,17 @@ static void chttp2_connector_connect(grpc_connector* con, GRPC_CLOSURE_INIT(&c->connected, connected, c, grpc_schedule_on_exec_ctx); GPR_ASSERT(!c->connecting); c->connecting = true; - grpc_tcp_client_connect(&c->connected, &c->endpoint, args->interested_parties, - args->channel_args, &addr, args->deadline); + grpc_closure* closure = &c->connected; + grpc_endpoint** ep = &c->endpoint; gpr_mu_unlock(&c->mu); + // In some implementations, the closure can be flushed before + // grpc_tcp_client_connect and since the closure requires access to c->mu, + // this can result in a deadlock. Refer + // https://github.com/grpc/grpc/issues/16427 + // grpc_tcp_client_connect would fill c->endpoint with proper contents and we + // make sure that we would still exist at that point by taking a ref. + grpc_tcp_client_connect(closure, ep, args->interested_parties, + args->channel_args, &addr, args->deadline); } static const grpc_connector_vtable chttp2_connector_vtable = { diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc index 5ce73a95d7..e73eee4353 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc @@ -27,7 +27,6 @@ #include "src/core/ext/filters/client_channel/client_channel.h" #include "src/core/ext/filters/client_channel/resolver_registry.h" -#include "src/core/ext/filters/client_channel/uri_parser.h" #include "src/core/ext/transport/chttp2/client/chttp2_connector.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gprpp/memory.h" @@ -39,6 +38,7 @@ #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/channel.h" +#include "src/core/lib/uri/uri_parser.h" static void client_channel_factory_ref( grpc_client_channel_factory* cc_factory) {} diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.cc b/src/core/ext/transport/chttp2/server/chttp2_server.cc index 3f8a26ae32..33d2b22aa5 100644 --- a/src/core/ext/transport/chttp2/server/chttp2_server.cc +++ b/src/core/ext/transport/chttp2/server/chttp2_server.cc @@ -37,8 +37,10 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/handshaker.h" #include "src/core/lib/channel/handshaker_registry.h" +#include "src/core/lib/gpr/host_port.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/resolve_address.h" +#include "src/core/lib/iomgr/resource_quota.h" #include "src/core/lib/iomgr/tcp_server.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/surface/api_trace.h" @@ -53,6 +55,8 @@ typedef struct { grpc_closure tcp_server_shutdown_complete; grpc_closure* server_destroy_listener_done; grpc_handshake_manager* pending_handshake_mgrs; + grpc_core::RefCountedPtr<grpc_core::channelz::ListenSocketNode> + channelz_listen_socket; } server_state; typedef struct { @@ -67,6 +71,7 @@ typedef struct { grpc_timer timer; grpc_closure on_timeout; grpc_closure on_receive_settings; + grpc_pollset_set* interested_parties; } server_connection_state; static void server_connection_state_unref( @@ -76,6 +81,9 @@ static void server_connection_state_unref( GRPC_CHTTP2_UNREF_TRANSPORT(connection_state->transport, "receive settings timeout"); } + grpc_pollset_set_del_pollset(connection_state->interested_parties, + connection_state->accepting_pollset); + grpc_pollset_set_destroy(connection_state->interested_parties); gpr_free(connection_state); } } @@ -108,9 +116,16 @@ static void on_handshake_done(void* arg, grpc_error* error) { server_connection_state* connection_state = static_cast<server_connection_state*>(args->user_data); gpr_mu_lock(&connection_state->svr_state->mu); + grpc_resource_user* resource_user = grpc_server_get_default_resource_user( + connection_state->svr_state->server); if (error != GRPC_ERROR_NONE || connection_state->svr_state->shutdown) { const char* error_str = grpc_error_string(error); gpr_log(GPR_DEBUG, "Handshaking failed: %s", error_str); + grpc_resource_user* resource_user = grpc_server_get_default_resource_user( + connection_state->svr_state->server); + if (resource_user != nullptr) { + grpc_resource_user_free(resource_user, GRPC_RESOURCE_QUOTA_CHANNEL_SIZE); + } if (error == GRPC_ERROR_NONE && args->endpoint != nullptr) { // We were shut down after handshaking completed successfully, so // destroy the endpoint here. @@ -129,11 +144,12 @@ static void on_handshake_done(void* arg, grpc_error* error) { // handshaker may have handed off the connection to some external // code, so we can just clean up here without creating a transport. if (args->endpoint != nullptr) { - grpc_transport* transport = - grpc_create_chttp2_transport(args->args, args->endpoint, false); + grpc_transport* transport = grpc_create_chttp2_transport( + args->args, args->endpoint, false, resource_user); grpc_server_setup_transport( connection_state->svr_state->server, transport, - connection_state->accepting_pollset, args->args); + connection_state->accepting_pollset, args->args, + grpc_chttp2_transport_get_socket_uuid(transport), resource_user); // Use notify_on_receive_settings callback to enforce the // handshake deadline. connection_state->transport = @@ -152,6 +168,11 @@ static void on_handshake_done(void* arg, grpc_error* error) { connection_state, grpc_schedule_on_exec_ctx); grpc_timer_init(&connection_state->timer, connection_state->deadline, &connection_state->on_timeout); + } else { + if (resource_user != nullptr) { + grpc_resource_user_free(resource_user, + GRPC_RESOURCE_QUOTA_CHANNEL_SIZE); + } } } grpc_handshake_manager_pending_list_remove( @@ -176,6 +197,20 @@ static void on_accept(void* arg, grpc_endpoint* tcp, gpr_free(acceptor); return; } + grpc_resource_user* resource_user = + grpc_server_get_default_resource_user(state->server); + if (resource_user != nullptr && + !grpc_resource_user_safe_alloc(resource_user, + GRPC_RESOURCE_QUOTA_CHANNEL_SIZE)) { + gpr_log( + GPR_ERROR, + "Memory quota exhausted, rejecting the connection, no handshaking."); + gpr_mu_unlock(&state->mu); + grpc_endpoint_shutdown(tcp, GRPC_ERROR_NONE); + grpc_endpoint_destroy(tcp); + gpr_free(acceptor); + return; + } grpc_handshake_manager* handshake_mgr = grpc_handshake_manager_create(); grpc_handshake_manager_pending_list_add(&state->pending_handshake_mgrs, handshake_mgr); @@ -189,7 +224,11 @@ static void on_accept(void* arg, grpc_endpoint* tcp, connection_state->accepting_pollset = accepting_pollset; connection_state->acceptor = acceptor; connection_state->handshake_mgr = handshake_mgr; + connection_state->interested_parties = grpc_pollset_set_create(); + grpc_pollset_set_add_pollset(connection_state->interested_parties, + connection_state->accepting_pollset); grpc_handshakers_add(HANDSHAKER_SERVER, state->args, + connection_state->interested_parties, connection_state->handshake_mgr); const grpc_arg* timeout_arg = grpc_channel_args_find(state->args, GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS); @@ -197,10 +236,10 @@ static void on_accept(void* arg, grpc_endpoint* tcp, grpc_core::ExecCtx::Get()->Now() + grpc_channel_arg_get_integer(timeout_arg, {120 * GPR_MS_PER_SEC, 1, INT_MAX}); - grpc_handshake_manager_do_handshake( - connection_state->handshake_mgr, nullptr /* interested_parties */, tcp, - state->args, connection_state->deadline, acceptor, on_handshake_done, - connection_state); + grpc_handshake_manager_do_handshake(connection_state->handshake_mgr, tcp, + state->args, connection_state->deadline, + acceptor, on_handshake_done, + connection_state); } /* Server callback: start listening on our ports */ @@ -223,6 +262,7 @@ static void tcp_server_shutdown_complete(void* arg, grpc_error* error) { GPR_ASSERT(state->shutdown); grpc_handshake_manager_pending_list_shutdown_all( state->pending_handshake_mgrs, GRPC_ERROR_REF(error)); + state->channelz_listen_socket.reset(); gpr_mu_unlock(&state->mu); // Flush queued work before destroying handshaker factory, since that // may do a synchronous unref. @@ -262,6 +302,8 @@ grpc_error* grpc_chttp2_server_add_port(grpc_server* server, const char* addr, server_state* state = nullptr; grpc_error** errors = nullptr; size_t naddrs = 0; + const grpc_arg* arg = nullptr; + intptr_t socket_uuid = 0; *port_num = -1; @@ -323,9 +365,17 @@ grpc_error* grpc_chttp2_server_add_port(grpc_server* server, const char* addr, } grpc_resolved_addresses_destroy(resolved); + arg = grpc_channel_args_find(args, GRPC_ARG_ENABLE_CHANNELZ); + if (grpc_channel_arg_get_bool(arg, GRPC_ENABLE_CHANNELZ_DEFAULT)) { + state->channelz_listen_socket = + grpc_core::MakeRefCounted<grpc_core::channelz::ListenSocketNode>( + grpc_core::UniquePtr<char>(gpr_strdup(addr))); + socket_uuid = state->channelz_listen_socket->uuid(); + } + /* Register with the server only upon success */ grpc_server_add_listener(server, state, server_start_listener, - server_destroy_listener); + server_destroy_listener, socket_uuid); goto done; /* Error path: cleanup and return */ diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc index e4bd91d07b..b9024a87e2 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc @@ -61,7 +61,7 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server* server, grpc_endpoint_add_to_pollset(server_endpoint, pollsets[i]); } - grpc_server_setup_transport(server, transport, nullptr, server_args); + grpc_server_setup_transport(server, transport, nullptr, server_args, 0); grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); } diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 8e07e3e4f9..da29ff1b37 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -54,6 +54,7 @@ #include "src/core/lib/transport/timeout_encoding.h" #include "src/core/lib/transport/transport.h" #include "src/core/lib/transport/transport_impl.h" +#include "src/core/lib/uri/uri_parser.h" #define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024) #define MAX_WINDOW 0x7fffffffu @@ -155,51 +156,55 @@ bool g_flow_control_enabled = true; * CONSTRUCTION/DESTRUCTION/REFCOUNTING */ -static void destruct_transport(grpc_chttp2_transport* t) { +grpc_chttp2_transport::~grpc_chttp2_transport() { gpr_log(GPR_INFO, "destruct transport %p", t); size_t i; - grpc_endpoint_destroy(t->ep); + if (channelz_socket != nullptr) { + channelz_socket.reset(); + } + + grpc_endpoint_destroy(ep); - grpc_slice_buffer_destroy_internal(&t->qbuf); + grpc_slice_buffer_destroy_internal(&qbuf); - grpc_slice_buffer_destroy_internal(&t->outbuf); - grpc_chttp2_hpack_compressor_destroy(&t->hpack_compressor); + grpc_slice_buffer_destroy_internal(&outbuf); + grpc_chttp2_hpack_compressor_destroy(&hpack_compressor); - grpc_core::ContextList::Execute(t->cl, nullptr, GRPC_ERROR_NONE); + grpc_core::ContextList::Execute(cl, nullptr, GRPC_ERROR_NONE); + grpc_slice_buffer_destroy_internal(&read_buffer); + grpc_chttp2_hpack_parser_destroy(&hpack_parser); + grpc_chttp2_goaway_parser_destroy(&goaway_parser); - grpc_slice_buffer_destroy_internal(&t->read_buffer); - grpc_chttp2_hpack_parser_destroy(&t->hpack_parser); - grpc_chttp2_goaway_parser_destroy(&t->goaway_parser); for (i = 0; i < STREAM_LIST_COUNT; i++) { - GPR_ASSERT(t->lists[i].head == nullptr); - GPR_ASSERT(t->lists[i].tail == nullptr); + GPR_ASSERT(lists[i].head == nullptr); + GPR_ASSERT(lists[i].tail == nullptr); } - GRPC_ERROR_UNREF(t->goaway_error); + GRPC_ERROR_UNREF(goaway_error); - GPR_ASSERT(grpc_chttp2_stream_map_size(&t->stream_map) == 0); + GPR_ASSERT(grpc_chttp2_stream_map_size(&stream_map) == 0); - grpc_chttp2_stream_map_destroy(&t->stream_map); - grpc_connectivity_state_destroy(&t->channel_callback.state_tracker); + grpc_chttp2_stream_map_destroy(&stream_map); + grpc_connectivity_state_destroy(&channel_callback.state_tracker); - GRPC_COMBINER_UNREF(t->combiner, "chttp2_transport"); + GRPC_COMBINER_UNREF(combiner, "chttp2_transport"); - cancel_pings(t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed")); + cancel_pings(this, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed")); - while (t->write_cb_pool) { - grpc_chttp2_write_cb* next = t->write_cb_pool->next; - gpr_free(t->write_cb_pool); - t->write_cb_pool = next; + while (write_cb_pool) { + grpc_chttp2_write_cb* next = write_cb_pool->next; + gpr_free(write_cb_pool); + write_cb_pool = next; } - t->flow_control.Destroy(); + flow_control.Destroy(); - GRPC_ERROR_UNREF(t->closed_with_error); - gpr_free(t->ping_acks); - gpr_free(t->peer_string); - gpr_free(t); + GRPC_ERROR_UNREF(closed_with_error); + gpr_free(ping_acks); + gpr_free(peer_string); } #ifndef NDEBUG @@ -211,7 +216,8 @@ void grpc_chttp2_unref_transport(grpc_chttp2_transport* t, const char* reason, t, val, val - 1, reason, file, line); } if (!gpr_unref(&t->refs)) return; - destruct_transport(t); + t->~grpc_chttp2_transport(); + gpr_free(t); } void grpc_chttp2_ref_transport(grpc_chttp2_transport* t, const char* reason, @@ -226,7 +232,8 @@ void grpc_chttp2_ref_transport(grpc_chttp2_transport* t, const char* reason, #else void grpc_chttp2_unref_transport(grpc_chttp2_transport* t) { if (!gpr_unref(&t->refs)) return; - destruct_transport(t); + t->~grpc_chttp2_transport(); + gpr_free(t); } void grpc_chttp2_ref_transport(grpc_chttp2_transport* t) { gpr_ref(&t->refs); } @@ -234,35 +241,178 @@ void grpc_chttp2_ref_transport(grpc_chttp2_transport* t) { gpr_ref(&t->refs); } static const grpc_transport_vtable* get_vtable(void); -static void init_transport(grpc_chttp2_transport* t, - const grpc_channel_args* channel_args, - grpc_endpoint* ep, bool is_client) { +/* Returns whether bdp is enabled */ +static bool read_channel_args(grpc_chttp2_transport* t, + const grpc_channel_args* channel_args, + bool is_client) { + bool enable_bdp = true; + bool channelz_enabled = GRPC_ENABLE_CHANNELZ_DEFAULT; size_t i; int j; - GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) == - GRPC_CHTTP2_CLIENT_CONNECT_STRLEN); - - t->base.vtable = get_vtable(); - t->ep = ep; - /* one ref is for destroy */ - gpr_ref_init(&t->refs, 1); - t->combiner = grpc_combiner_create(); - t->peer_string = grpc_endpoint_get_peer(ep); - t->endpoint_reading = 1; - t->next_stream_id = is_client ? 1 : 2; - t->is_client = is_client; - t->deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0; - t->is_first_frame = true; - grpc_connectivity_state_init( - &t->channel_callback.state_tracker, GRPC_CHANNEL_READY, - is_client ? "client_transport" : "server_transport"); - - grpc_slice_buffer_init(&t->qbuf); - - grpc_slice_buffer_init(&t->outbuf); - grpc_chttp2_hpack_compressor_init(&t->hpack_compressor); + for (i = 0; i < channel_args->num_args; i++) { + if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER)) { + const grpc_integer_options options = {-1, 0, INT_MAX}; + const int value = + grpc_channel_arg_get_integer(&channel_args->args[i], options); + if (value >= 0) { + if ((t->next_stream_id & 1) != (value & 1)) { + gpr_log(GPR_ERROR, "%s: low bit must be %d on %s", + GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER, t->next_stream_id & 1, + is_client ? "client" : "server"); + } else { + t->next_stream_id = static_cast<uint32_t>(value); + } + } + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER)) { + const grpc_integer_options options = {-1, 0, INT_MAX}; + const int value = + grpc_channel_arg_get_integer(&channel_args->args[i], options); + if (value >= 0) { + grpc_chttp2_hpack_compressor_set_max_usable_size( + &t->hpack_compressor, static_cast<uint32_t>(value)); + } + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA)) { + t->ping_policy.max_pings_without_data = grpc_channel_arg_get_integer( + &channel_args->args[i], + {g_default_max_pings_without_data, 0, INT_MAX}); + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_MAX_PING_STRIKES)) { + t->ping_policy.max_ping_strikes = grpc_channel_arg_get_integer( + &channel_args->args[i], {g_default_max_ping_strikes, 0, INT_MAX}); + } else if (0 == + strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS)) { + t->ping_policy.min_sent_ping_interval_without_data = + grpc_channel_arg_get_integer( + &channel_args->args[i], + grpc_integer_options{ + g_default_min_sent_ping_interval_without_data_ms, 0, + INT_MAX}); + } else if (0 == + strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS)) { + t->ping_policy.min_recv_ping_interval_without_data = + grpc_channel_arg_get_integer( + &channel_args->args[i], + grpc_integer_options{ + g_default_min_recv_ping_interval_without_data_ms, 0, + INT_MAX}); + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE)) { + t->write_buffer_size = static_cast<uint32_t>(grpc_channel_arg_get_integer( + &channel_args->args[i], {0, 0, MAX_WRITE_BUFFER_SIZE})); + } else if (0 == + strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_BDP_PROBE)) { + enable_bdp = grpc_channel_arg_get_bool(&channel_args->args[i], true); + } else if (0 == + strcmp(channel_args->args[i].key, GRPC_ARG_KEEPALIVE_TIME_MS)) { + const int value = grpc_channel_arg_get_integer( + &channel_args->args[i], + grpc_integer_options{t->is_client + ? g_default_client_keepalive_time_ms + : g_default_server_keepalive_time_ms, + 1, INT_MAX}); + t->keepalive_time = value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value; + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) { + const int value = grpc_channel_arg_get_integer( + &channel_args->args[i], + grpc_integer_options{t->is_client + ? g_default_client_keepalive_timeout_ms + : g_default_server_keepalive_timeout_ms, + 0, INT_MAX}); + t->keepalive_timeout = value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value; + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) { + t->keepalive_permit_without_calls = static_cast<uint32_t>( + grpc_channel_arg_get_integer(&channel_args->args[i], {0, 0, 1})); + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_OPTIMIZATION_TARGET)) { + if (channel_args->args[i].type != GRPC_ARG_STRING) { + gpr_log(GPR_ERROR, "%s should be a string", + GRPC_ARG_OPTIMIZATION_TARGET); + } else if (0 == strcmp(channel_args->args[i].value.string, "blend")) { + t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY; + } else if (0 == strcmp(channel_args->args[i].value.string, "latency")) { + t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY; + } else if (0 == + strcmp(channel_args->args[i].value.string, "throughput")) { + t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT; + } else { + gpr_log(GPR_ERROR, "%s value '%s' unknown, assuming 'blend'", + GRPC_ARG_OPTIMIZATION_TARGET, + channel_args->args[i].value.string); + } + } else if (0 == + strcmp(channel_args->args[i].key, GRPC_ARG_ENABLE_CHANNELZ)) { + channelz_enabled = grpc_channel_arg_get_bool( + &channel_args->args[i], GRPC_ENABLE_CHANNELZ_DEFAULT); + } else { + static const struct { + const char* channel_arg_name; + grpc_chttp2_setting_id setting_id; + grpc_integer_options integer_options; + bool availability[2] /* server, client */; + } settings_map[] = {{GRPC_ARG_MAX_CONCURRENT_STREAMS, + GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, + {-1, 0, INT32_MAX}, + {true, false}}, + {GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER, + GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE, + {-1, 0, INT32_MAX}, + {true, true}}, + {GRPC_ARG_MAX_METADATA_SIZE, + GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, + {-1, 0, INT32_MAX}, + {true, true}}, + {GRPC_ARG_HTTP2_MAX_FRAME_SIZE, + GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE, + {-1, 16384, 16777215}, + {true, true}}, + {GRPC_ARG_HTTP2_ENABLE_TRUE_BINARY, + GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA, + {1, 0, 1}, + {true, true}}, + {GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES, + GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, + {-1, 5, INT32_MAX}, + {true, true}}}; + for (j = 0; j < static_cast<int> GPR_ARRAY_SIZE(settings_map); j++) { + if (0 == strcmp(channel_args->args[i].key, + settings_map[j].channel_arg_name)) { + if (!settings_map[j].availability[is_client]) { + gpr_log(GPR_DEBUG, "%s is not available on %s", + settings_map[j].channel_arg_name, + is_client ? "clients" : "servers"); + } else { + int value = grpc_channel_arg_get_integer( + &channel_args->args[i], settings_map[j].integer_options); + if (value >= 0) { + queue_setting_update(t, settings_map[j].setting_id, + static_cast<uint32_t>(value)); + } + } + break; + } + } + } + } + if (channelz_enabled) { + // TODO(ncteisen): add an API to endpoint to query for local addr, and pass + // it in here, so SocketNode knows its own address. + t->channelz_socket = + grpc_core::MakeRefCounted<grpc_core::channelz::SocketNode>( + grpc_core::UniquePtr<char>(), + grpc_core::UniquePtr<char>(gpr_strdup(t->peer_string))); + } + return enable_bdp; +} +static void init_transport_closures(grpc_chttp2_transport* t) { GRPC_CLOSURE_INIT(&t->read_action_locked, read_action_locked, t, grpc_combiner_scheduler(t->combiner)); GRPC_CLOSURE_INIT(&t->benign_reclaimer_locked, benign_reclaimer_locked, t, @@ -290,56 +440,9 @@ static void init_transport(grpc_chttp2_transport* t, GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired_locked, keepalive_watchdog_fired_locked, t, grpc_combiner_scheduler(t->combiner)); +} - t->goaway_error = GRPC_ERROR_NONE; - grpc_chttp2_goaway_parser_init(&t->goaway_parser); - grpc_chttp2_hpack_parser_init(&t->hpack_parser); - - grpc_slice_buffer_init(&t->read_buffer); - - /* 8 is a random stab in the dark as to a good initial size: it's small enough - that it shouldn't waste memory for infrequently used connections, yet - large enough that the exponential growth should happen nicely when it's - needed. - TODO(ctiller): tune this */ - grpc_chttp2_stream_map_init(&t->stream_map, 8); - - /* copy in initial settings to all setting sets */ - for (i = 0; i < GRPC_CHTTP2_NUM_SETTINGS; i++) { - for (j = 0; j < GRPC_NUM_SETTING_SETS; j++) { - t->settings[j][i] = grpc_chttp2_settings_parameters[i].default_value; - } - } - t->dirtied_local_settings = 1; - /* Hack: it's common for implementations to assume 65536 bytes initial send - window -- this should by rights be 0 */ - t->force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE; - t->sent_local_settings = 0; - t->write_buffer_size = grpc_core::chttp2::kDefaultWindow; - - if (is_client) { - grpc_slice_buffer_add(&t->outbuf, grpc_slice_from_copied_string( - GRPC_CHTTP2_CLIENT_CONNECT_STRING)); - } - - /* configure http2 the way we like it */ - if (is_client) { - queue_setting_update(t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0); - queue_setting_update(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0); - } - queue_setting_update(t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, - DEFAULT_MAX_HEADER_LIST_SIZE); - queue_setting_update(t, GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA, - 1); - - t->ping_policy.max_pings_without_data = g_default_max_pings_without_data; - t->ping_policy.min_sent_ping_interval_without_data = - g_default_min_sent_ping_interval_without_data_ms; - t->ping_policy.max_ping_strikes = g_default_max_ping_strikes; - t->ping_policy.min_recv_ping_interval_without_data = - g_default_min_recv_ping_interval_without_data_ms; - - /* Keepalive setting */ +static void init_transport_keepalive_settings(grpc_chttp2_transport* t) { if (t->is_client) { t->keepalive_time = g_default_client_keepalive_time_ms == INT_MAX ? GRPC_MILLIS_INF_FUTURE @@ -359,205 +462,122 @@ static void init_transport(grpc_chttp2_transport* t, t->keepalive_permit_without_calls = g_default_server_keepalive_permit_without_calls; } +} - t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY; +static void configure_transport_ping_policy(grpc_chttp2_transport* t) { + t->ping_policy.max_pings_without_data = g_default_max_pings_without_data; + t->ping_policy.min_sent_ping_interval_without_data = + g_default_min_sent_ping_interval_without_data_ms; + t->ping_policy.max_ping_strikes = g_default_max_ping_strikes; + t->ping_policy.min_recv_ping_interval_without_data = + g_default_min_recv_ping_interval_without_data_ms; +} - bool enable_bdp = true; +static void init_keepalive_pings_if_enabled(grpc_chttp2_transport* t) { + if (t->keepalive_time != GRPC_MILLIS_INF_FUTURE) { + t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; + GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); + grpc_timer_init(&t->keepalive_ping_timer, + grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, + &t->init_keepalive_ping_locked); + } else { + /* Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no + inflight keeaplive timers */ + t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED; + } +} - if (channel_args) { - for (i = 0; i < channel_args->num_args; i++) { - if (0 == strcmp(channel_args->args[i].key, - GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER)) { - const grpc_integer_options options = {-1, 0, INT_MAX}; - const int value = - grpc_channel_arg_get_integer(&channel_args->args[i], options); - if (value >= 0) { - if ((t->next_stream_id & 1) != (value & 1)) { - gpr_log(GPR_ERROR, "%s: low bit must be %d on %s", - GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER, - t->next_stream_id & 1, is_client ? "client" : "server"); - } else { - t->next_stream_id = static_cast<uint32_t>(value); - } - } - } else if (0 == strcmp(channel_args->args[i].key, - GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER)) { - const grpc_integer_options options = {-1, 0, INT_MAX}; - const int value = - grpc_channel_arg_get_integer(&channel_args->args[i], options); - if (value >= 0) { - grpc_chttp2_hpack_compressor_set_max_usable_size( - &t->hpack_compressor, static_cast<uint32_t>(value)); - } - } else if (0 == strcmp(channel_args->args[i].key, - GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA)) { - t->ping_policy.max_pings_without_data = grpc_channel_arg_get_integer( - &channel_args->args[i], - {g_default_max_pings_without_data, 0, INT_MAX}); - } else if (0 == strcmp(channel_args->args[i].key, - GRPC_ARG_HTTP2_MAX_PING_STRIKES)) { - t->ping_policy.max_ping_strikes = grpc_channel_arg_get_integer( - &channel_args->args[i], {g_default_max_ping_strikes, 0, INT_MAX}); - } else if (0 == - strcmp( - channel_args->args[i].key, - GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS)) { - t->ping_policy.min_sent_ping_interval_without_data = - grpc_channel_arg_get_integer( - &channel_args->args[i], - grpc_integer_options{ - g_default_min_sent_ping_interval_without_data_ms, 0, - INT_MAX}); - } else if (0 == - strcmp( - channel_args->args[i].key, - GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS)) { - t->ping_policy.min_recv_ping_interval_without_data = - grpc_channel_arg_get_integer( - &channel_args->args[i], - grpc_integer_options{ - g_default_min_recv_ping_interval_without_data_ms, 0, - INT_MAX}); - } else if (0 == strcmp(channel_args->args[i].key, - GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE)) { - t->write_buffer_size = - static_cast<uint32_t>(grpc_channel_arg_get_integer( - &channel_args->args[i], {0, 0, MAX_WRITE_BUFFER_SIZE})); - } else if (0 == - strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_BDP_PROBE)) { - enable_bdp = grpc_channel_arg_get_bool(&channel_args->args[i], true); - } else if (0 == strcmp(channel_args->args[i].key, - GRPC_ARG_KEEPALIVE_TIME_MS)) { - const int value = grpc_channel_arg_get_integer( - &channel_args->args[i], - grpc_integer_options{t->is_client - ? g_default_client_keepalive_time_ms - : g_default_server_keepalive_time_ms, - 1, INT_MAX}); - t->keepalive_time = value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value; - } else if (0 == strcmp(channel_args->args[i].key, - GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) { - const int value = grpc_channel_arg_get_integer( - &channel_args->args[i], - grpc_integer_options{t->is_client - ? g_default_client_keepalive_timeout_ms - : g_default_server_keepalive_timeout_ms, - 0, INT_MAX}); - t->keepalive_timeout = - value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value; - } else if (0 == strcmp(channel_args->args[i].key, - GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) { - t->keepalive_permit_without_calls = static_cast<uint32_t>( - grpc_channel_arg_get_integer(&channel_args->args[i], {0, 0, 1})); - } else if (0 == strcmp(channel_args->args[i].key, - GRPC_ARG_OPTIMIZATION_TARGET)) { - if (channel_args->args[i].type != GRPC_ARG_STRING) { - gpr_log(GPR_ERROR, "%s should be a string", - GRPC_ARG_OPTIMIZATION_TARGET); - } else if (0 == strcmp(channel_args->args[i].value.string, "blend")) { - t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY; - } else if (0 == strcmp(channel_args->args[i].value.string, "latency")) { - t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY; - } else if (0 == - strcmp(channel_args->args[i].value.string, "throughput")) { - t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT; - } else { - gpr_log(GPR_ERROR, "%s value '%s' unknown, assuming 'blend'", - GRPC_ARG_OPTIMIZATION_TARGET, - channel_args->args[i].value.string); - } - } else { - static const struct { - const char* channel_arg_name; - grpc_chttp2_setting_id setting_id; - grpc_integer_options integer_options; - bool availability[2] /* server, client */; - } settings_map[] = { - {GRPC_ARG_MAX_CONCURRENT_STREAMS, - GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, - {-1, 0, INT32_MAX}, - {true, false}}, - {GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER, - GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE, - {-1, 0, INT32_MAX}, - {true, true}}, - {GRPC_ARG_MAX_METADATA_SIZE, - GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, - {-1, 0, INT32_MAX}, - {true, true}}, - {GRPC_ARG_HTTP2_MAX_FRAME_SIZE, - GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE, - {-1, 16384, 16777215}, - {true, true}}, - {GRPC_ARG_HTTP2_ENABLE_TRUE_BINARY, - GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA, - {1, 0, 1}, - {true, true}}, - {GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES, - GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, - {-1, 5, INT32_MAX}, - {true, true}}}; - for (j = 0; j < static_cast<int> GPR_ARRAY_SIZE(settings_map); j++) { - if (0 == strcmp(channel_args->args[i].key, - settings_map[j].channel_arg_name)) { - if (!settings_map[j].availability[is_client]) { - gpr_log(GPR_DEBUG, "%s is not available on %s", - settings_map[j].channel_arg_name, - is_client ? "clients" : "servers"); - } else { - int value = grpc_channel_arg_get_integer( - &channel_args->args[i], settings_map[j].integer_options); - if (value >= 0) { - queue_setting_update(t, settings_map[j].setting_id, - static_cast<uint32_t>(value)); - } - } - break; - } - } - } +grpc_chttp2_transport::grpc_chttp2_transport( + const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client, + grpc_resource_user* resource_user) + : ep(ep), + peer_string(grpc_endpoint_get_peer(ep)), + resource_user(resource_user), + combiner(grpc_combiner_create()), + is_client(is_client), + next_stream_id(is_client ? 1 : 2), + deframe_state(is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0) { + GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) == + GRPC_CHTTP2_CLIENT_CONNECT_STRLEN); + base.vtable = get_vtable(); + /* one ref is for destroy */ + gpr_ref_init(&refs, 1); + /* 8 is a random stab in the dark as to a good initial size: it's small enough + that it shouldn't waste memory for infrequently used connections, yet + large enough that the exponential growth should happen nicely when it's + needed. + TODO(ctiller): tune this */ + grpc_chttp2_stream_map_init(&stream_map, 8); + + grpc_slice_buffer_init(&read_buffer); + grpc_connectivity_state_init( + &channel_callback.state_tracker, GRPC_CHANNEL_READY, + is_client ? "client_transport" : "server_transport"); + grpc_slice_buffer_init(&outbuf); + if (is_client) { + grpc_slice_buffer_add(&outbuf, grpc_slice_from_copied_string( + GRPC_CHTTP2_CLIENT_CONNECT_STRING)); + } + grpc_chttp2_hpack_compressor_init(&hpack_compressor); + grpc_slice_buffer_init(&qbuf); + /* copy in initial settings to all setting sets */ + size_t i; + int j; + for (i = 0; i < GRPC_CHTTP2_NUM_SETTINGS; i++) { + for (j = 0; j < GRPC_NUM_SETTING_SETS; j++) { + settings[j][i] = grpc_chttp2_settings_parameters[i].default_value; } } + grpc_chttp2_hpack_parser_init(&hpack_parser); + grpc_chttp2_goaway_parser_init(&goaway_parser); + + init_transport_closures(this); + + /* configure http2 the way we like it */ + if (is_client) { + queue_setting_update(this, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0); + queue_setting_update(this, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0); + } + queue_setting_update(this, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, + DEFAULT_MAX_HEADER_LIST_SIZE); + queue_setting_update(this, + GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA, 1); + + configure_transport_ping_policy(this); + init_transport_keepalive_settings(this); + + bool enable_bdp = true; + if (channel_args) { + enable_bdp = read_channel_args(this, channel_args, is_client); + } if (g_flow_control_enabled) { - t->flow_control.Init<grpc_core::chttp2::TransportFlowControl>(t, - enable_bdp); + flow_control.Init<grpc_core::chttp2::TransportFlowControl>(this, + enable_bdp); } else { - t->flow_control.Init<grpc_core::chttp2::TransportFlowControlDisabled>(t); + flow_control.Init<grpc_core::chttp2::TransportFlowControlDisabled>(this); enable_bdp = false; } /* No pings allowed before receiving a header or data frame. */ - t->ping_state.pings_before_data_required = 0; - t->ping_state.is_delayed_ping_timer_set = false; - t->ping_state.last_ping_sent_time = GRPC_MILLIS_INF_PAST; + ping_state.pings_before_data_required = 0; + ping_state.is_delayed_ping_timer_set = false; + ping_state.last_ping_sent_time = GRPC_MILLIS_INF_PAST; - t->ping_recv_state.last_ping_recv_time = GRPC_MILLIS_INF_PAST; - t->ping_recv_state.ping_strikes = 0; + ping_recv_state.last_ping_recv_time = GRPC_MILLIS_INF_PAST; + ping_recv_state.ping_strikes = 0; - /* Start keepalive pings */ - if (t->keepalive_time != GRPC_MILLIS_INF_FUTURE) { - t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; - GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); - grpc_timer_init(&t->keepalive_ping_timer, - grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, - &t->init_keepalive_ping_locked); - } else { - /* Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no - inflight keeaplive timers */ - t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED; - } + init_keepalive_pings_if_enabled(this); if (enable_bdp) { - GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping"); - schedule_bdp_ping_locked(t); - - grpc_chttp2_act_on_flowctl_action(t->flow_control->PeriodicUpdate(), t, + GRPC_CHTTP2_REF_TRANSPORT(this, "bdp_ping"); + schedule_bdp_ping_locked(this); + grpc_chttp2_act_on_flowctl_action(flow_control->PeriodicUpdate(), this, nullptr); } - grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE); - post_benign_reclaimer(t); + grpc_chttp2_initiate_write(this, GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE); + post_benign_reclaimer(this); } static void destroy_transport_locked(void* tp, grpc_error* error) { @@ -567,6 +587,7 @@ static void destroy_transport_locked(void* tp, grpc_error* error) { t, grpc_error_set_int( GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"), GRPC_ERROR_INT_OCCURRED_DURING_WRITE, t->write_state)); + // Must be the last line. GRPC_CHTTP2_UNREF_TRANSPORT(t, "destroy"); } @@ -651,103 +672,108 @@ void grpc_chttp2_stream_unref(grpc_chttp2_stream* s) { } #endif -static int init_stream(grpc_transport* gt, grpc_stream* gs, - grpc_stream_refcount* refcount, const void* server_data, - gpr_arena* arena) { - GPR_TIMER_SCOPE("init_stream", 0); - grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt); - grpc_chttp2_stream* s = reinterpret_cast<grpc_chttp2_stream*>(gs); - - s->t = t; - s->refcount = refcount; +grpc_chttp2_stream::grpc_chttp2_stream(grpc_chttp2_transport* t, + grpc_stream_refcount* refcount, + const void* server_data, + gpr_arena* arena) + : t(t), refcount(refcount), metadata_buffer{{arena}, {arena}} { /* We reserve one 'active stream' that's dropped when the stream is read-closed. The others are for Chttp2IncomingByteStreams that are actively reading */ - GRPC_CHTTP2_STREAM_REF(s, "chttp2"); - - grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0], arena); - grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[1], arena); - grpc_chttp2_data_parser_init(&s->data_parser); - grpc_slice_buffer_init(&s->flow_controlled_buffer); - s->deadline = GRPC_MILLIS_INF_FUTURE; - GRPC_CLOSURE_INIT(&s->complete_fetch_locked, complete_fetch_locked, s, - grpc_schedule_on_exec_ctx); - grpc_slice_buffer_init(&s->unprocessed_incoming_frames_buffer); - s->unprocessed_incoming_frames_buffer_cached_length = 0; - grpc_slice_buffer_init(&s->frame_storage); - grpc_slice_buffer_init(&s->compressed_data_buffer); - grpc_slice_buffer_init(&s->decompressed_data_buffer); - s->pending_byte_stream = false; - s->decompressed_header_bytes = 0; - GRPC_CLOSURE_INIT(&s->reset_byte_stream, reset_byte_stream, s, - grpc_combiner_scheduler(t->combiner)); - + GRPC_CHTTP2_STREAM_REF(this, "chttp2"); GRPC_CHTTP2_REF_TRANSPORT(t, "stream"); if (server_data) { - s->id = static_cast<uint32_t>((uintptr_t)server_data); - *t->accepting_stream = s; - grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); + id = static_cast<uint32_t>((uintptr_t)server_data); + *t->accepting_stream = this; + grpc_chttp2_stream_map_add(&t->stream_map, id, this); post_destructive_reclaimer(t); } - if (t->flow_control->flow_control_enabled()) { - s->flow_control.Init<grpc_core::chttp2::StreamFlowControl>( + flow_control.Init<grpc_core::chttp2::StreamFlowControl>( static_cast<grpc_core::chttp2::TransportFlowControl*>( t->flow_control.get()), - s); + this); } else { - s->flow_control.Init<grpc_core::chttp2::StreamFlowControlDisabled>(); + flow_control.Init<grpc_core::chttp2::StreamFlowControlDisabled>(); } - return 0; + grpc_slice_buffer_init(&frame_storage); + grpc_slice_buffer_init(&unprocessed_incoming_frames_buffer); + grpc_slice_buffer_init(&flow_controlled_buffer); + grpc_slice_buffer_init(&compressed_data_buffer); + grpc_slice_buffer_init(&decompressed_data_buffer); + + GRPC_CLOSURE_INIT(&complete_fetch_locked, ::complete_fetch_locked, this, + grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&reset_byte_stream, ::reset_byte_stream, this, + grpc_combiner_scheduler(t->combiner)); } -static void destroy_stream_locked(void* sp, grpc_error* error) { - GPR_TIMER_SCOPE("destroy_stream", 0); - grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(sp); - grpc_chttp2_transport* t = s->t; +grpc_chttp2_stream::~grpc_chttp2_stream() { + if (t->channelz_socket != nullptr) { + if ((t->is_client && eos_received) || (!t->is_client && eos_sent)) { + t->channelz_socket->RecordStreamSucceeded(); + } else { + t->channelz_socket->RecordStreamFailed(); + } + } - GPR_ASSERT((s->write_closed && s->read_closed) || s->id == 0); - if (s->id != 0) { - GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, s->id) == nullptr); + GPR_ASSERT((write_closed && read_closed) || id == 0); + if (id != 0) { + GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, id) == nullptr); } - grpc_slice_buffer_destroy_internal(&s->unprocessed_incoming_frames_buffer); - grpc_slice_buffer_destroy_internal(&s->frame_storage); - grpc_slice_buffer_destroy_internal(&s->compressed_data_buffer); - grpc_slice_buffer_destroy_internal(&s->decompressed_data_buffer); + grpc_slice_buffer_destroy_internal(&unprocessed_incoming_frames_buffer); + grpc_slice_buffer_destroy_internal(&frame_storage); + grpc_slice_buffer_destroy_internal(&compressed_data_buffer); + grpc_slice_buffer_destroy_internal(&decompressed_data_buffer); - grpc_chttp2_list_remove_stalled_by_transport(t, s); - grpc_chttp2_list_remove_stalled_by_stream(t, s); + grpc_chttp2_list_remove_stalled_by_transport(t, this); + grpc_chttp2_list_remove_stalled_by_stream(t, this); for (int i = 0; i < STREAM_LIST_COUNT; i++) { - if (GPR_UNLIKELY(s->included[i])) { + if (GPR_UNLIKELY(included[i])) { gpr_log(GPR_ERROR, "%s stream %d still included in list %d", - t->is_client ? "client" : "server", s->id, i); + t->is_client ? "client" : "server", id, i); abort(); } } - GPR_ASSERT(s->send_initial_metadata_finished == nullptr); - GPR_ASSERT(s->fetching_send_message == nullptr); - GPR_ASSERT(s->send_trailing_metadata_finished == nullptr); - GPR_ASSERT(s->recv_initial_metadata_ready == nullptr); - GPR_ASSERT(s->recv_message_ready == nullptr); - GPR_ASSERT(s->recv_trailing_metadata_finished == nullptr); - grpc_chttp2_data_parser_destroy(&s->data_parser); - grpc_chttp2_incoming_metadata_buffer_destroy(&s->metadata_buffer[0]); - grpc_chttp2_incoming_metadata_buffer_destroy(&s->metadata_buffer[1]); - grpc_slice_buffer_destroy_internal(&s->flow_controlled_buffer); - GRPC_ERROR_UNREF(s->read_closed_error); - GRPC_ERROR_UNREF(s->write_closed_error); - GRPC_ERROR_UNREF(s->byte_stream_error); + GPR_ASSERT(send_initial_metadata_finished == nullptr); + GPR_ASSERT(fetching_send_message == nullptr); + GPR_ASSERT(send_trailing_metadata_finished == nullptr); + GPR_ASSERT(recv_initial_metadata_ready == nullptr); + GPR_ASSERT(recv_message_ready == nullptr); + GPR_ASSERT(recv_trailing_metadata_finished == nullptr); + grpc_slice_buffer_destroy_internal(&flow_controlled_buffer); + GRPC_ERROR_UNREF(read_closed_error); + GRPC_ERROR_UNREF(write_closed_error); + GRPC_ERROR_UNREF(byte_stream_error); + + flow_control.Destroy(); - s->flow_control.Destroy(); + if (t->resource_user != nullptr) { + grpc_resource_user_free(t->resource_user, GRPC_RESOURCE_QUOTA_CALL_SIZE); + } GRPC_CHTTP2_UNREF_TRANSPORT(t, "stream"); + GRPC_CLOSURE_SCHED(destroy_stream_arg, GRPC_ERROR_NONE); +} - GRPC_CLOSURE_SCHED(s->destroy_stream_arg, GRPC_ERROR_NONE); +static int init_stream(grpc_transport* gt, grpc_stream* gs, + grpc_stream_refcount* refcount, const void* server_data, + gpr_arena* arena) { + GPR_TIMER_SCOPE("init_stream", 0); + grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt); + new (gs) grpc_chttp2_stream(t, refcount, server_data, arena); + return 0; +} + +static void destroy_stream_locked(void* sp, grpc_error* error) { + GPR_TIMER_SCOPE("destroy_stream", 0); + grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(sp); + s->~grpc_chttp2_stream(); } static void destroy_stream(grpc_transport* gt, grpc_stream* gs, @@ -783,7 +809,21 @@ grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t, if (t->channel_callback.accept_stream == nullptr) { return nullptr; } - grpc_chttp2_stream* accepting; + // Don't accept the stream if memory quota doesn't allow. Note that we should + // simply refuse the stream here instead of canceling the stream after it's + // accepted since the latter will create the call which costs much memory. + if (t->resource_user != nullptr && + !grpc_resource_user_safe_alloc(t->resource_user, + GRPC_RESOURCE_QUOTA_CALL_SIZE)) { + gpr_log(GPR_ERROR, "Memory exhausted, rejecting the stream."); + grpc_slice_buffer_add( + &t->qbuf, + grpc_chttp2_rst_stream_create( + id, static_cast<uint32_t>(GRPC_HTTP2_REFUSED_STREAM), nullptr)); + grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM); + return nullptr; + } + grpc_chttp2_stream* accepting = nullptr; GPR_ASSERT(t->accepting_stream == nullptr); t->accepting_stream = &accepting; t->channel_callback.accept_stream(t->channel_callback.accept_stream_user_data, @@ -1401,6 +1441,9 @@ static void perform_stream_op_locked(void* stream_op, } if (op->send_initial_metadata) { + if (t->is_client && t->channelz_socket != nullptr) { + t->channelz_socket->RecordStreamStartedFromLocal(); + } GRPC_STATS_INC_HTTP2_OP_SEND_INITIAL_METADATA(); GPR_ASSERT(s->send_initial_metadata_finished == nullptr); on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE; @@ -1486,6 +1529,7 @@ static void perform_stream_op_locked(void* stream_op, if (op->send_message) { GRPC_STATS_INC_HTTP2_OP_SEND_MESSAGE(); + t->num_messages_in_next_write++; GRPC_STATS_INC_HTTP2_SEND_MESSAGE_SIZE( op->payload->send_message.send_message->length()); on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE; @@ -1670,8 +1714,8 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, gpr_free(str); } - op->handler_private.extra_arg = gs; GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op"); + op->handler_private.extra_arg = gs; GRPC_CLOSURE_SCHED( GRPC_CLOSURE_INIT(&op->handler_private.closure, perform_stream_op_locked, op, grpc_combiner_scheduler(t->combiner)), @@ -2097,8 +2141,7 @@ void grpc_chttp2_fake_status(grpc_chttp2_transport* t, grpc_chttp2_stream* s, "add_status_message", grpc_chttp2_incoming_metadata_buffer_replace_or_add( &s->metadata_buffer[1], - grpc_mdelem_from_slices(GRPC_MDSTR_GRPC_MESSAGE, - grpc_slice_ref_internal(slice)))); + grpc_mdelem_create(GRPC_MDSTR_GRPC_MESSAGE, slice, nullptr))); } s->published_metadata[1] = GRPC_METADATA_SYNTHESIZED_FROM_FAKE; grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s); @@ -2678,6 +2721,7 @@ static void init_keepalive_ping_locked(void* arg, grpc_error* error) { grpc_chttp2_stream_map_size(&t->stream_map) > 0) { t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING; GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end"); + grpc_timer_init_unset(&t->keepalive_watchdog_timer); send_keepalive_ping_locked(t); grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING); } else { @@ -2701,6 +2745,9 @@ static void start_keepalive_ping_locked(void* arg, grpc_error* error) { if (error != GRPC_ERROR_NONE) { return; } + if (t->channelz_socket != nullptr) { + t->channelz_socket->RecordKeepaliveSent(); + } GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog"); grpc_timer_init(&t->keepalive_watchdog_timer, grpc_core::ExecCtx::Get()->Now() + t->keepalive_timeout, @@ -2728,10 +2775,10 @@ static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error) { if (error == GRPC_ERROR_NONE) { t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; close_transport_locked( - t, - grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "keepalive watchdog timeout"), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL)); + t, grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "keepalive watchdog timeout"), + GRPC_ERROR_INT_GRPC_STATUS, + GRPC_STATUS_UNAVAILABLE)); } } else { /* The watchdog timer should have been cancelled by @@ -2897,17 +2944,20 @@ bool Chttp2IncomingByteStream::Next(size_t max_size_hint, } } +void Chttp2IncomingByteStream::MaybeCreateStreamDecompressionCtx() { + if (!stream_->stream_decompression_ctx) { + stream_->stream_decompression_ctx = grpc_stream_compression_context_create( + stream_->stream_decompression_method); + } +} + grpc_error* Chttp2IncomingByteStream::Pull(grpc_slice* slice) { GPR_TIMER_SCOPE("incoming_byte_stream_pull", 0); grpc_error* error; if (stream_->unprocessed_incoming_frames_buffer.length > 0) { if (!stream_->unprocessed_incoming_frames_decompressed) { bool end_of_context; - if (!stream_->stream_decompression_ctx) { - stream_->stream_decompression_ctx = - grpc_stream_compression_context_create( - stream_->stream_decompression_method); - } + MaybeCreateStreamDecompressionCtx(); if (!grpc_stream_decompress(stream_->stream_decompression_ctx, &stream_->unprocessed_incoming_frames_buffer, &stream_->decompressed_data_buffer, nullptr, @@ -3138,11 +3188,21 @@ static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream), static const grpc_transport_vtable* get_vtable(void) { return &vtable; } +intptr_t grpc_chttp2_transport_get_socket_uuid(grpc_transport* transport) { + grpc_chttp2_transport* t = + reinterpret_cast<grpc_chttp2_transport*>(transport); + if (t->channelz_socket != nullptr) { + return t->channelz_socket->uuid(); + } else { + return 0; + } +} + grpc_transport* grpc_create_chttp2_transport( - const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client) { - grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>( - gpr_zalloc(sizeof(grpc_chttp2_transport))); - init_transport(t, channel_args, ep, is_client); + const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client, + grpc_resource_user* resource_user) { + auto t = new (gpr_malloc(sizeof(grpc_chttp2_transport))) + grpc_chttp2_transport(channel_args, ep, is_client, resource_user); return &t->base; } diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.h b/src/core/ext/transport/chttp2/transport/chttp2_transport.h index 9d55b3f4b0..b3fe1c082e 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.h +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.h @@ -32,7 +32,10 @@ extern grpc_core::DebugOnlyTraceFlag grpc_trace_chttp2_refcount; extern bool g_flow_control_enabled; grpc_transport* grpc_create_chttp2_transport( - const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client); + const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client, + grpc_resource_user* resource_user = nullptr); + +intptr_t grpc_chttp2_transport_get_socket_uuid(grpc_transport* transport); /// Takes ownership of \a read_buffer, which (if non-NULL) contains /// leftover bytes previously read from the endpoint (e.g., by handshakers). diff --git a/src/core/ext/transport/chttp2/transport/frame_data.cc b/src/core/ext/transport/chttp2/transport/frame_data.cc index f8f06f6789..1de00735cf 100644 --- a/src/core/ext/transport/chttp2/transport/frame_data.cc +++ b/src/core/ext/transport/chttp2/transport/frame_data.cc @@ -32,18 +32,12 @@ #include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/transport/transport.h" -grpc_error* grpc_chttp2_data_parser_init(grpc_chttp2_data_parser* parser) { - parser->state = GRPC_CHTTP2_DATA_FH_0; - parser->parsing_frame = nullptr; - return GRPC_ERROR_NONE; -} - -void grpc_chttp2_data_parser_destroy(grpc_chttp2_data_parser* parser) { - if (parser->parsing_frame != nullptr) { - GRPC_ERROR_UNREF(parser->parsing_frame->Finished( +grpc_chttp2_data_parser::~grpc_chttp2_data_parser() { + if (parsing_frame != nullptr) { + GRPC_ERROR_UNREF(parsing_frame->Finished( GRPC_ERROR_CREATE_FROM_STATIC_STRING("Parser destroyed"), false)); } - GRPC_ERROR_UNREF(parser->error); + GRPC_ERROR_UNREF(error); } grpc_error* grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser* parser, @@ -62,6 +56,7 @@ grpc_error* grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser* parser, if (flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) { s->received_last_frame = true; + s->eos_received = true; } else { s->received_last_frame = false; } @@ -191,6 +186,9 @@ grpc_error* grpc_deframe_unprocessed_incoming_frames( GPR_ASSERT(stream_out != nullptr); GPR_ASSERT(p->parsing_frame == nullptr); p->frame_size |= (static_cast<uint32_t>(*cur)); + if (t->channelz_socket != nullptr) { + t->channelz_socket->RecordMessageReceived(); + } p->state = GRPC_CHTTP2_DATA_FRAME; ++cur; message_flags = 0; diff --git a/src/core/ext/transport/chttp2/transport/frame_data.h b/src/core/ext/transport/chttp2/transport/frame_data.h index e5d01f764e..2c5da99fa6 100644 --- a/src/core/ext/transport/chttp2/transport/frame_data.h +++ b/src/core/ext/transport/chttp2/transport/frame_data.h @@ -43,20 +43,18 @@ namespace grpc_core { class Chttp2IncomingByteStream; } // namespace grpc_core -typedef struct { - grpc_chttp2_stream_state state; - uint8_t frame_type; - uint32_t frame_size; - grpc_error* error; +struct grpc_chttp2_data_parser { + grpc_chttp2_data_parser() = default; + ~grpc_chttp2_data_parser(); - bool is_frame_compressed; - grpc_core::Chttp2IncomingByteStream* parsing_frame; -} grpc_chttp2_data_parser; + grpc_chttp2_stream_state state = GRPC_CHTTP2_DATA_FH_0; + uint8_t frame_type = 0; + uint32_t frame_size = 0; + grpc_error* error = GRPC_ERROR_NONE; -/* initialize per-stream state for data frame parsing */ -grpc_error* grpc_chttp2_data_parser_init(grpc_chttp2_data_parser* parser); - -void grpc_chttp2_data_parser_destroy(grpc_chttp2_data_parser* parser); + bool is_frame_compressed = false; + grpc_core::Chttp2IncomingByteStream* parsing_frame = nullptr; +}; /* start processing a new data frame */ grpc_error* grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser* parser, diff --git a/src/core/ext/transport/chttp2/transport/frame_rst_stream.cc b/src/core/ext/transport/chttp2/transport/frame_rst_stream.cc index 4bdd4309a4..a0a7534594 100644 --- a/src/core/ext/transport/chttp2/transport/frame_rst_stream.cc +++ b/src/core/ext/transport/chttp2/transport/frame_rst_stream.cc @@ -32,7 +32,7 @@ grpc_slice grpc_chttp2_rst_stream_create(uint32_t id, uint32_t code, grpc_transport_one_way_stats* stats) { static const size_t frame_size = 13; grpc_slice slice = GRPC_SLICE_MALLOC(frame_size); - stats->framing_bytes += frame_size; + if (stats != nullptr) stats->framing_bytes += frame_size; uint8_t* p = GRPC_SLICE_START_PTR(slice); // Frame size. diff --git a/src/core/ext/transport/chttp2/transport/hpack_encoder.cc b/src/core/ext/transport/chttp2/transport/hpack_encoder.cc index 0eaf63f133..dbe9df6ae3 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_encoder.cc +++ b/src/core/ext/transport/chttp2/transport/hpack_encoder.cc @@ -212,10 +212,6 @@ static uint32_t prepare_space_for_new_elem(grpc_chttp2_hpack_compressor* c, return new_index; } -/* dummy function */ -static void add_nothing(grpc_chttp2_hpack_compressor* c, grpc_mdelem elem, - size_t elem_size) {} - // Add a key to the dynamic table. Both key and value will be added to table at // the decoder. static void add_key_with_index(grpc_chttp2_hpack_compressor* c, @@ -524,17 +520,22 @@ static void hpack_enc(grpc_chttp2_hpack_compressor* c, grpc_mdelem elem, uint32_t indices_key; /* should this elem be in the table? */ - size_t decoder_space_usage = - grpc_mdelem_get_size_in_hpack_table(elem, st->use_true_binary_metadata); - bool should_add_elem = elem_interned && - decoder_space_usage < MAX_DECODER_SPACE_USAGE && - c->filter_elems[HASH_FRAGMENT_1(elem_hash)] >= - c->filter_elems_sum / ONE_ON_ADD_PROBABILITY; - void (*maybe_add)(grpc_chttp2_hpack_compressor*, grpc_mdelem, size_t) = - should_add_elem ? add_elem : add_nothing; - void (*emit)(grpc_chttp2_hpack_compressor*, uint32_t, grpc_mdelem, - framer_state*) = - should_add_elem ? emit_lithdr_incidx : emit_lithdr_noidx; + const size_t decoder_space_usage = + grpc_chttp2_get_size_in_hpack_table(elem, st->use_true_binary_metadata); + const bool should_add_elem = elem_interned && + decoder_space_usage < MAX_DECODER_SPACE_USAGE && + c->filter_elems[HASH_FRAGMENT_1(elem_hash)] >= + c->filter_elems_sum / ONE_ON_ADD_PROBABILITY; + + auto emit_maybe_add = [&should_add_elem, &elem, &st, &c, &indices_key, + &decoder_space_usage] { + if (should_add_elem) { + emit_lithdr_incidx(c, dynidx(c, indices_key), elem, st); + add_elem(c, elem, decoder_space_usage); + } else { + emit_lithdr_noidx(c, dynidx(c, indices_key), elem, st); + } + }; /* no hits for the elem... maybe there's a key? */ indices_key = c->indices_keys[HASH_FRAGMENT_2(key_hash)]; @@ -542,8 +543,7 @@ static void hpack_enc(grpc_chttp2_hpack_compressor* c, grpc_mdelem elem, GRPC_MDKEY(elem)) && indices_key > c->tail_remote_index) { /* HIT: key (first cuckoo hash) */ - emit(c, dynidx(c, indices_key), elem, st); - maybe_add(c, elem, decoder_space_usage); + emit_maybe_add(); return; } @@ -552,20 +552,23 @@ static void hpack_enc(grpc_chttp2_hpack_compressor* c, grpc_mdelem elem, GRPC_MDKEY(elem)) && indices_key > c->tail_remote_index) { /* HIT: key (first cuckoo hash) */ - emit(c, dynidx(c, indices_key), elem, st); - maybe_add(c, elem, decoder_space_usage); + emit_maybe_add(); return; } /* no elem, key in the table... fall back to literal emission */ - bool should_add_key = + const bool should_add_key = !elem_interned && decoder_space_usage < MAX_DECODER_SPACE_USAGE; - emit = (should_add_elem || should_add_key) ? emit_lithdr_incidx_v - : emit_lithdr_noidx_v; - maybe_add = - should_add_elem ? add_elem : (should_add_key ? add_key : add_nothing); - emit(c, 0, elem, st); - maybe_add(c, elem, decoder_space_usage); + if (should_add_elem || should_add_key) { + emit_lithdr_incidx_v(c, 0, elem, st); + } else { + emit_lithdr_noidx_v(c, 0, elem, st); + } + if (should_add_elem) { + add_elem(c, elem, decoder_space_usage); + } else if (should_add_key) { + add_key(c, elem, decoder_space_usage); + } } #define STRLEN_LIT(x) (sizeof(x) - 1) @@ -688,11 +691,22 @@ void grpc_chttp2_encode_header(grpc_chttp2_hpack_compressor* c, emit_advertise_table_size_change(c, &st); } for (size_t i = 0; i < extra_headers_size; ++i) { - hpack_enc(c, *extra_headers[i], &st); + grpc_mdelem md = *extra_headers[i]; + uint8_t static_index = grpc_chttp2_get_static_hpack_table_index(md); + if (static_index) { + emit_indexed(c, static_index, &st); + } else { + hpack_enc(c, md, &st); + } } grpc_metadata_batch_assert_ok(metadata); for (grpc_linked_mdelem* l = metadata->list.head; l; l = l->next) { - hpack_enc(c, l->md, &st); + uint8_t static_index = grpc_chttp2_get_static_hpack_table_index(l->md); + if (static_index) { + emit_indexed(c, static_index, &st); + } else { + hpack_enc(c, l->md, &st); + } } grpc_millis deadline = metadata->deadline; if (deadline != GRPC_MILLIS_INF_FUTURE) { diff --git a/src/core/ext/transport/chttp2/transport/hpack_table.cc b/src/core/ext/transport/chttp2/transport/hpack_table.cc index 7929258356..fcfb01872b 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_table.cc +++ b/src/core/ext/transport/chttp2/transport/hpack_table.cc @@ -29,6 +29,7 @@ #include "src/core/lib/debug/trace.h" #include "src/core/lib/gpr/murmur_hash.h" +#include "src/core/lib/transport/static_metadata.h" extern grpc_core::TraceFlag grpc_http_trace; @@ -366,3 +367,31 @@ grpc_chttp2_hptbl_find_result grpc_chttp2_hptbl_find( return r; } + +static size_t get_base64_encoded_size(size_t raw_length) { + static const uint8_t tail_xtra[3] = {0, 2, 3}; + return raw_length / 3 * 4 + tail_xtra[raw_length % 3]; +} + +size_t grpc_chttp2_get_size_in_hpack_table(grpc_mdelem elem, + bool use_true_binary_metadata) { + size_t overhead_and_key = 32 + GRPC_SLICE_LENGTH(GRPC_MDKEY(elem)); + size_t value_len = GRPC_SLICE_LENGTH(GRPC_MDVALUE(elem)); + if (grpc_is_binary_header(GRPC_MDKEY(elem))) { + return overhead_and_key + (use_true_binary_metadata + ? value_len + 1 + : get_base64_encoded_size(value_len)); + } else { + return overhead_and_key + value_len; + } +} + +uint8_t grpc_chttp2_get_static_hpack_table_index(grpc_mdelem md) { + if (GRPC_MDELEM_STORAGE(md) == GRPC_MDELEM_STORAGE_STATIC) { + uint8_t index = GRPC_MDELEM_DATA(md) - grpc_static_mdelem_table; + if (index < GRPC_CHTTP2_LAST_STATIC_ENTRY) { + return index + 1; // Hpack static metadata element indices start at 1 + } + } + return 0; +} diff --git a/src/core/ext/transport/chttp2/transport/hpack_table.h b/src/core/ext/transport/chttp2/transport/hpack_table.h index 98026a4ba4..a0ffc6fab7 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_table.h +++ b/src/core/ext/transport/chttp2/transport/hpack_table.h @@ -83,6 +83,15 @@ grpc_mdelem grpc_chttp2_hptbl_lookup(const grpc_chttp2_hptbl* tbl, /* add a table entry to the index */ grpc_error* grpc_chttp2_hptbl_add(grpc_chttp2_hptbl* tbl, grpc_mdelem md) GRPC_MUST_USE_RESULT; + +size_t grpc_chttp2_get_size_in_hpack_table(grpc_mdelem elem, + bool use_true_binary_metadata); + +/* Returns the static hpack table index that corresponds to /a elem. Returns 0 + if /a elem is not statically stored or if it is not in the static hpack + table */ +uint8_t grpc_chttp2_get_static_hpack_table_index(grpc_mdelem md); + /* Find a key/value pair in the table... returns the index in the table of the most similar entry, or 0 if the value was not found */ typedef struct { diff --git a/src/core/ext/transport/chttp2/transport/incoming_metadata.cc b/src/core/ext/transport/chttp2/transport/incoming_metadata.cc index 4d7dfd900f..dca15e7680 100644 --- a/src/core/ext/transport/chttp2/transport/incoming_metadata.cc +++ b/src/core/ext/transport/chttp2/transport/incoming_metadata.cc @@ -27,18 +27,6 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> -void grpc_chttp2_incoming_metadata_buffer_init( - grpc_chttp2_incoming_metadata_buffer* buffer, gpr_arena* arena) { - buffer->arena = arena; - grpc_metadata_batch_init(&buffer->batch); - buffer->batch.deadline = GRPC_MILLIS_INF_FUTURE; -} - -void grpc_chttp2_incoming_metadata_buffer_destroy( - grpc_chttp2_incoming_metadata_buffer* buffer) { - grpc_metadata_batch_destroy(&buffer->batch); -} - grpc_error* grpc_chttp2_incoming_metadata_buffer_add( grpc_chttp2_incoming_metadata_buffer* buffer, grpc_mdelem elem) { buffer->size += GRPC_MDELEM_LENGTH(elem); diff --git a/src/core/ext/transport/chttp2/transport/incoming_metadata.h b/src/core/ext/transport/chttp2/transport/incoming_metadata.h index d029cf00d4..c551b3cc8b 100644 --- a/src/core/ext/transport/chttp2/transport/incoming_metadata.h +++ b/src/core/ext/transport/chttp2/transport/incoming_metadata.h @@ -23,17 +23,20 @@ #include "src/core/lib/transport/transport.h" -typedef struct { +struct grpc_chttp2_incoming_metadata_buffer { + grpc_chttp2_incoming_metadata_buffer(gpr_arena* arena) : arena(arena) { + grpc_metadata_batch_init(&batch); + batch.deadline = GRPC_MILLIS_INF_FUTURE; + } + ~grpc_chttp2_incoming_metadata_buffer() { + grpc_metadata_batch_destroy(&batch); + } + gpr_arena* arena; grpc_metadata_batch batch; - size_t size; // total size of metadata -} grpc_chttp2_incoming_metadata_buffer; - -/** assumes everything initially zeroed */ -void grpc_chttp2_incoming_metadata_buffer_init( - grpc_chttp2_incoming_metadata_buffer* buffer, gpr_arena* arena); -void grpc_chttp2_incoming_metadata_buffer_destroy( - grpc_chttp2_incoming_metadata_buffer* buffer); + size_t size = 0; // total size of metadata +}; + void grpc_chttp2_incoming_metadata_buffer_publish( grpc_chttp2_incoming_metadata_buffer* buffer, grpc_metadata_batch* batch); diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 32a13df48c..8a83f4894c 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -36,6 +36,7 @@ #include "src/core/ext/transport/chttp2/transport/hpack_parser.h" #include "src/core/ext/transport/chttp2/transport/incoming_metadata.h" #include "src/core/ext/transport/chttp2/transport/stream_map.h" +#include "src/core/lib/channel/channelz.h" #include "src/core/lib/compression/stream_compression.h" #include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/iomgr/combiner.h" @@ -106,8 +107,8 @@ const char* grpc_chttp2_initiate_write_reason_string( grpc_chttp2_initiate_write_reason reason); typedef struct { - grpc_closure_list lists[GRPC_CHTTP2_PCL_COUNT]; - uint64_t inflight_id; + grpc_closure_list lists[GRPC_CHTTP2_PCL_COUNT] = {}; + uint64_t inflight_id = 0; } grpc_chttp2_ping_queue; typedef struct { @@ -250,6 +251,8 @@ class Chttp2IncomingByteStream : public ByteStream { static void NextLocked(void* arg, grpc_error* error_ignored); static void OrphanLocked(void* arg, grpc_error* error_ignored); + void MaybeCreateStreamDecompressionCtx(); + grpc_chttp2_transport* transport_; // Immutable. grpc_chttp2_stream* stream_; // Immutable. @@ -281,34 +284,41 @@ typedef enum { } grpc_chttp2_keepalive_state; struct grpc_chttp2_transport { + grpc_chttp2_transport(const grpc_channel_args* channel_args, + grpc_endpoint* ep, bool is_client, + grpc_resource_user* resource_user); + ~grpc_chttp2_transport(); + grpc_transport base; /* must be first */ gpr_refcount refs; grpc_endpoint* ep; char* peer_string; + grpc_resource_user* resource_user; + grpc_combiner* combiner; - grpc_closure* notify_on_receive_settings; + grpc_closure* notify_on_receive_settings = nullptr; /** write execution state of the transport */ - grpc_chttp2_write_state write_state; + grpc_chttp2_write_state write_state = GRPC_CHTTP2_WRITE_STATE_IDLE; /** is this the first write in a series of writes? set when we initiate writing from idle, cleared when we initiate writing from writing+more */ - bool is_first_write_in_batch; + bool is_first_write_in_batch = false; /** is the transport destroying itself? */ - uint8_t destroying; + uint8_t destroying = false; /** has the upper layer closed the transport? */ - grpc_error* closed_with_error; + grpc_error* closed_with_error = GRPC_ERROR_NONE; /** is there a read request to the endpoint outstanding? */ - uint8_t endpoint_reading; + uint8_t endpoint_reading = 1; - grpc_chttp2_optimization_target opt_target; + grpc_chttp2_optimization_target opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY; /** various lists of streams */ - grpc_chttp2_stream_list lists[STREAM_LIST_COUNT]; + grpc_chttp2_stream_list lists[STREAM_LIST_COUNT] = {}; /** maps stream id to grpc_chttp2_stream objects */ grpc_chttp2_stream_map stream_map; @@ -325,7 +335,7 @@ struct grpc_chttp2_transport { /** address to place a newly accepted stream - set and unset by grpc_chttp2_parsing_accept_stream; used by init_stream to publish the accepted server stream */ - grpc_chttp2_stream** accepting_stream; + grpc_chttp2_stream** accepting_stream = nullptr; struct { /* accept stream callback */ @@ -349,41 +359,43 @@ struct grpc_chttp2_transport { /** how much data are we willing to buffer when the WRITE_BUFFER_HINT is set? */ - uint32_t write_buffer_size; + uint32_t write_buffer_size = grpc_core::chttp2::kDefaultWindow; /** Set to a grpc_error object if a goaway frame is received. By default, set * to GRPC_ERROR_NONE */ - grpc_error* goaway_error; + grpc_error* goaway_error = GRPC_ERROR_NONE; - grpc_chttp2_sent_goaway_state sent_goaway_state; + grpc_chttp2_sent_goaway_state sent_goaway_state = GRPC_CHTTP2_NO_GOAWAY_SEND; /** are the local settings dirty and need to be sent? */ - bool dirtied_local_settings; + bool dirtied_local_settings = true; /** have local settings been sent? */ - bool sent_local_settings; - /** bitmask of setting indexes to send out */ - uint32_t force_send_settings; + bool sent_local_settings = false; + /** bitmask of setting indexes to send out + Hack: it's common for implementations to assume 65536 bytes initial send + window -- this should by rights be 0 */ + uint32_t force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE; /** settings values */ uint32_t settings[GRPC_NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS]; /** what is the next stream id to be allocated by this peer? copied to next_stream_id in parsing when parsing commences */ - uint32_t next_stream_id; + uint32_t next_stream_id = 0; /** last new stream id */ - uint32_t last_new_stream_id; + uint32_t last_new_stream_id = 0; /** ping queues for various ping insertion points */ - grpc_chttp2_ping_queue ping_queue; + grpc_chttp2_ping_queue ping_queue = grpc_chttp2_ping_queue(); grpc_chttp2_repeated_ping_policy ping_policy; grpc_chttp2_repeated_ping_state ping_state; - uint64_t ping_ctr; /* unique id for pings */ + uint64_t ping_ctr = 0; /* unique id for pings */ grpc_closure retry_initiate_ping_locked; /** ping acks */ - size_t ping_ack_count; - size_t ping_ack_capacity; - uint64_t* ping_acks; + size_t ping_ack_count = 0; + size_t ping_ack_capacity = 0; + uint64_t* ping_acks = nullptr; grpc_chttp2_server_ping_recv_state ping_recv_state; /** parser for headers */ @@ -409,22 +421,22 @@ struct grpc_chttp2_transport { int64_t initial_window_update = 0; /* deframing */ - grpc_chttp2_deframe_transport_state deframe_state; - uint8_t incoming_frame_type; - uint8_t incoming_frame_flags; - uint8_t header_eof; - bool is_first_frame; - uint32_t expect_continuation_stream_id; - uint32_t incoming_frame_size; - uint32_t incoming_stream_id; + grpc_chttp2_deframe_transport_state deframe_state = GRPC_DTS_CLIENT_PREFIX_0; + uint8_t incoming_frame_type = 0; + uint8_t incoming_frame_flags = 0; + uint8_t header_eof = 0; + bool is_first_frame = true; + uint32_t expect_continuation_stream_id = 0; + uint32_t incoming_frame_size = 0; + uint32_t incoming_stream_id = 0; /* active parser */ - void* parser_data; - grpc_chttp2_stream* incoming_stream; + void* parser_data = nullptr; + grpc_chttp2_stream* incoming_stream = nullptr; grpc_error* (*parser)(void* parser_user_data, grpc_chttp2_transport* t, grpc_chttp2_stream* s, grpc_slice slice, int is_last); - grpc_chttp2_write_cb* write_cb_pool; + grpc_chttp2_write_cb* write_cb_pool = nullptr; /* bdp estimator */ grpc_closure next_bdp_ping_timer_expired_locked; @@ -433,23 +445,23 @@ struct grpc_chttp2_transport { /* if non-NULL, close the transport with this error when writes are finished */ - grpc_error* close_transport_on_writes_finished; + grpc_error* close_transport_on_writes_finished = GRPC_ERROR_NONE; /* a list of closures to run after writes are finished */ - grpc_closure_list run_after_write; + grpc_closure_list run_after_write = GRPC_CLOSURE_LIST_INIT; /* buffer pool state */ /** have we scheduled a benign cleanup? */ - bool benign_reclaimer_registered; + bool benign_reclaimer_registered = false; /** have we scheduled a destructive cleanup? */ - bool destructive_reclaimer_registered; + bool destructive_reclaimer_registered = false; /** benign cleanup closure */ grpc_closure benign_reclaimer_locked; /** destructive cleanup closure */ grpc_closure destructive_reclaimer_locked; /* next bdp ping timer */ - bool have_next_bdp_ping_timer; + bool have_next_bdp_ping_timer = false; grpc_timer next_bdp_ping_timer; /* keep-alive ping support */ @@ -470,10 +482,12 @@ struct grpc_chttp2_transport { /** grace period for a ping to complete before watchdog kicks in */ grpc_millis keepalive_timeout; /** if keepalive pings are allowed when there's no outstanding streams */ - bool keepalive_permit_without_calls; + bool keepalive_permit_without_calls = false; /** keep-alive state machine state */ grpc_chttp2_keepalive_state keepalive_state; grpc_core::ContextList* cl; + grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode> channelz_socket; + uint32_t num_messages_in_next_write = 0; }; typedef enum { @@ -484,6 +498,10 @@ typedef enum { } grpc_published_metadata_method; struct grpc_chttp2_stream { + grpc_chttp2_stream(grpc_chttp2_transport* t, grpc_stream_refcount* refcount, + const void* server_data, gpr_arena* arena); + ~grpc_chttp2_stream(); + void* context; grpc_chttp2_transport* t; grpc_stream_refcount* refcount; @@ -492,59 +510,63 @@ struct grpc_chttp2_stream { grpc_closure* destroy_stream_arg; grpc_chttp2_stream_link links[STREAM_LIST_COUNT]; - uint8_t included[STREAM_LIST_COUNT]; + uint8_t included[STREAM_LIST_COUNT] = {}; /** HTTP2 stream id for this stream, or zero if one has not been assigned */ - uint32_t id; + uint32_t id = 0; /** things the upper layers would like to send */ - grpc_metadata_batch* send_initial_metadata; - grpc_closure* send_initial_metadata_finished; - grpc_metadata_batch* send_trailing_metadata; - grpc_closure* send_trailing_metadata_finished; + grpc_metadata_batch* send_initial_metadata = nullptr; + grpc_closure* send_initial_metadata_finished = nullptr; + grpc_metadata_batch* send_trailing_metadata = nullptr; + grpc_closure* send_trailing_metadata_finished = nullptr; grpc_core::OrphanablePtr<grpc_core::ByteStream> fetching_send_message; - uint32_t fetched_send_message_length; - grpc_slice fetching_slice; + uint32_t fetched_send_message_length = 0; + grpc_slice fetching_slice = grpc_empty_slice(); int64_t next_message_end_offset; - int64_t flow_controlled_bytes_written; - int64_t flow_controlled_bytes_flowed; + int64_t flow_controlled_bytes_written = 0; + int64_t flow_controlled_bytes_flowed = 0; grpc_closure complete_fetch_locked; - grpc_closure* fetching_send_message_finished; + grpc_closure* fetching_send_message_finished = nullptr; grpc_metadata_batch* recv_initial_metadata; - grpc_closure* recv_initial_metadata_ready; - bool* trailing_metadata_available; + grpc_closure* recv_initial_metadata_ready = nullptr; + bool* trailing_metadata_available = nullptr; grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message; - grpc_closure* recv_message_ready; + grpc_closure* recv_message_ready = nullptr; grpc_metadata_batch* recv_trailing_metadata; - grpc_closure* recv_trailing_metadata_finished; + grpc_closure* recv_trailing_metadata_finished = nullptr; - grpc_transport_stream_stats* collecting_stats; - grpc_transport_stream_stats stats; + grpc_transport_stream_stats* collecting_stats = nullptr; + grpc_transport_stream_stats stats = grpc_transport_stream_stats(); /** Is this stream closed for writing. */ - bool write_closed; + bool write_closed = false; /** Is this stream reading half-closed. */ - bool read_closed; + bool read_closed = false; /** Are all published incoming byte streams closed. */ - bool all_incoming_byte_streams_finished; + bool all_incoming_byte_streams_finished = false; /** Has this stream seen an error. If true, then pending incoming frames can be thrown away. */ - bool seen_error; + bool seen_error = false; /** Are we buffering writes on this stream? If yes, we won't become writable until there's enough queued up in the flow_controlled_buffer */ - bool write_buffering; + bool write_buffering = false; /** Has trailing metadata been received. */ - bool received_trailing_metadata; + bool received_trailing_metadata = false; + + /* have we sent or received the EOS bit? */ + bool eos_received = false; + bool eos_sent = false; /** the error that resulted in this stream being read-closed */ - grpc_error* read_closed_error; + grpc_error* read_closed_error = GRPC_ERROR_NONE; /** the error that resulted in this stream being write-closed */ - grpc_error* write_closed_error; + grpc_error* write_closed_error = GRPC_ERROR_NONE; - grpc_published_metadata_method published_metadata[2]; - bool final_metadata_requested; + grpc_published_metadata_method published_metadata[2] = {}; + bool final_metadata_requested = false; grpc_chttp2_incoming_metadata_buffer metadata_buffer[2]; @@ -554,33 +576,33 @@ struct grpc_chttp2_stream { * Accessed only by application thread when stream->pending_byte_stream == * true */ grpc_slice_buffer unprocessed_incoming_frames_buffer; - grpc_closure* on_next; /* protected by t combiner */ - bool pending_byte_stream; /* protected by t combiner */ + grpc_closure* on_next = nullptr; /* protected by t combiner */ + bool pending_byte_stream = false; /* protected by t combiner */ // cached length of buffer to be used by the transport thread in cases where // stream->pending_byte_stream == true. The value is saved before // application threads are allowed to modify // unprocessed_incoming_frames_buffer - size_t unprocessed_incoming_frames_buffer_cached_length; + size_t unprocessed_incoming_frames_buffer_cached_length = 0; grpc_closure reset_byte_stream; - grpc_error* byte_stream_error; /* protected by t combiner */ - bool received_last_frame; /* protected by t combiner */ + grpc_error* byte_stream_error = GRPC_ERROR_NONE; /* protected by t combiner */ + bool received_last_frame = false; /* protected by t combiner */ - grpc_millis deadline; + grpc_millis deadline = GRPC_MILLIS_INF_FUTURE; /** saw some stream level error */ - grpc_error* forced_close_error; + grpc_error* forced_close_error = GRPC_ERROR_NONE; /** how many header frames have we received? */ - uint8_t header_frames_received; + uint8_t header_frames_received = 0; /** parsing state for data frames */ /* Accessed only by transport thread when stream->pending_byte_stream == false * Accessed only by application thread when stream->pending_byte_stream == * true */ grpc_chttp2_data_parser data_parser; /** number of bytes received - reset at end of parse thread execution */ - int64_t received_bytes; + int64_t received_bytes = 0; - bool sent_initial_metadata; - bool sent_trailing_metadata; + bool sent_initial_metadata = false; + bool sent_trailing_metadata = false; grpc_core::PolymorphicManualConstructor< grpc_core::chttp2::StreamFlowControlBase, @@ -590,32 +612,34 @@ struct grpc_chttp2_stream { grpc_slice_buffer flow_controlled_buffer; - grpc_chttp2_write_cb* on_flow_controlled_cbs; - grpc_chttp2_write_cb* on_write_finished_cbs; - grpc_chttp2_write_cb* finish_after_write; - size_t sending_bytes; + grpc_chttp2_write_cb* on_flow_controlled_cbs = nullptr; + grpc_chttp2_write_cb* on_write_finished_cbs = nullptr; + grpc_chttp2_write_cb* finish_after_write = nullptr; + size_t sending_bytes = 0; /* Stream compression method to be used. */ - grpc_stream_compression_method stream_compression_method; + grpc_stream_compression_method stream_compression_method = + GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS; /* Stream decompression method to be used. */ - grpc_stream_compression_method stream_decompression_method; + grpc_stream_compression_method stream_decompression_method = + GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS; /** Stream compression decompress context */ - grpc_stream_compression_context* stream_decompression_ctx; + grpc_stream_compression_context* stream_decompression_ctx = nullptr; /** Stream compression compress context */ - grpc_stream_compression_context* stream_compression_ctx; + grpc_stream_compression_context* stream_compression_ctx = nullptr; /** Buffer storing data that is compressed but not sent */ grpc_slice_buffer compressed_data_buffer; /** Amount of uncompressed bytes sent out when compressed_data_buffer is * emptied */ - size_t uncompressed_data_size; + size_t uncompressed_data_size = 0; /** Temporary buffer storing decompressed data */ grpc_slice_buffer decompressed_data_buffer; /** Whether bytes stored in unprocessed_incoming_byte_stream is decompressed */ - bool unprocessed_incoming_frames_decompressed; + bool unprocessed_incoming_frames_decompressed = false; /** gRPC header bytes that are already decompressed */ - size_t decompressed_header_bytes; + size_t decompressed_header_bytes = 0; }; /** Transport writing call flow: diff --git a/src/core/ext/transport/chttp2/transport/parsing.cc b/src/core/ext/transport/chttp2/transport/parsing.cc index 1e491d2ef8..1ff96d3cd3 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.cc +++ b/src/core/ext/transport/chttp2/transport/parsing.cc @@ -368,6 +368,7 @@ static grpc_error* init_data_frame_parser(grpc_chttp2_transport* t) { &s->data_parser, t->incoming_frame_flags, s->id, s); } error_handler: + intptr_t unused; if (err == GRPC_ERROR_NONE) { t->incoming_stream = s; /* t->parser = grpc_chttp2_data_parser_parse;*/ @@ -375,7 +376,7 @@ error_handler: t->parser_data = &s->data_parser; t->ping_state.last_ping_sent_time = GRPC_MILLIS_INF_PAST; return GRPC_ERROR_NONE; - } else if (grpc_error_get_int(err, GRPC_ERROR_INT_STREAM_ID, nullptr)) { + } else if (grpc_error_get_int(err, GRPC_ERROR_INT_STREAM_ID, &unused)) { /* handle stream errors by closing the stream */ if (s != nullptr) { grpc_chttp2_mark_stream_closed(t, s, true, false, err); @@ -409,67 +410,81 @@ static void on_initial_header(void* tp, grpc_mdelem md) { gpr_free(value); } - if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_GRPC_STATUS) && - !grpc_mdelem_eq(md, GRPC_MDELEM_GRPC_STATUS_0)) { - /* TODO(ctiller): check for a status like " 0" */ - s->seen_error = true; - } + if (GRPC_MDELEM_STORAGE(md) == GRPC_MDELEM_STORAGE_STATIC) { + // We don't use grpc_mdelem_eq here to avoid executing additional + // instructions. The reasoning is if the payload is not equal, we already + // know that the metadata elements are not equal because the md is + // confirmed to be static. If we had used grpc_mdelem_eq here, then if the + // payloads are not equal, grpc_mdelem_eq executes more instructions to + // determine if they're equal or not. + if (md.payload == GRPC_MDELEM_GRPC_STATUS_1.payload || + md.payload == GRPC_MDELEM_GRPC_STATUS_2.payload) { + s->seen_error = true; + } + } else { + if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_GRPC_STATUS) && + !grpc_mdelem_eq(md, GRPC_MDELEM_GRPC_STATUS_0)) { + /* TODO(ctiller): check for a status like " 0" */ + s->seen_error = true; + } - if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_GRPC_TIMEOUT)) { - grpc_millis* cached_timeout = - static_cast<grpc_millis*>(grpc_mdelem_get_user_data(md, free_timeout)); - grpc_millis timeout; - if (cached_timeout != nullptr) { - timeout = *cached_timeout; - } else { - if (GPR_UNLIKELY( - !grpc_http2_decode_timeout(GRPC_MDVALUE(md), &timeout))) { - char* val = grpc_slice_to_c_string(GRPC_MDVALUE(md)); - gpr_log(GPR_ERROR, "Ignoring bad timeout value '%s'", val); - gpr_free(val); - timeout = GRPC_MILLIS_INF_FUTURE; + if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_GRPC_TIMEOUT)) { + grpc_millis* cached_timeout = static_cast<grpc_millis*>( + grpc_mdelem_get_user_data(md, free_timeout)); + grpc_millis timeout; + if (cached_timeout != nullptr) { + timeout = *cached_timeout; + } else { + if (GPR_UNLIKELY( + !grpc_http2_decode_timeout(GRPC_MDVALUE(md), &timeout))) { + char* val = grpc_slice_to_c_string(GRPC_MDVALUE(md)); + gpr_log(GPR_ERROR, "Ignoring bad timeout value '%s'", val); + gpr_free(val); + timeout = GRPC_MILLIS_INF_FUTURE; + } + if (GRPC_MDELEM_IS_INTERNED(md)) { + /* store the result */ + cached_timeout = + static_cast<grpc_millis*>(gpr_malloc(sizeof(grpc_millis))); + *cached_timeout = timeout; + grpc_mdelem_set_user_data(md, free_timeout, cached_timeout); + } } - if (GRPC_MDELEM_IS_INTERNED(md)) { - /* store the result */ - cached_timeout = - static_cast<grpc_millis*>(gpr_malloc(sizeof(grpc_millis))); - *cached_timeout = timeout; - grpc_mdelem_set_user_data(md, free_timeout, cached_timeout); + if (timeout != GRPC_MILLIS_INF_FUTURE) { + grpc_chttp2_incoming_metadata_buffer_set_deadline( + &s->metadata_buffer[0], grpc_core::ExecCtx::Get()->Now() + timeout); } + GRPC_MDELEM_UNREF(md); + return; } - if (timeout != GRPC_MILLIS_INF_FUTURE) { - grpc_chttp2_incoming_metadata_buffer_set_deadline( - &s->metadata_buffer[0], grpc_core::ExecCtx::Get()->Now() + timeout); - } + } + + const size_t new_size = s->metadata_buffer[0].size + GRPC_MDELEM_LENGTH(md); + const size_t metadata_size_limit = + t->settings[GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE]; + if (new_size > metadata_size_limit) { + gpr_log(GPR_DEBUG, + "received initial metadata size exceeds limit (%" PRIuPTR + " vs. %" PRIuPTR ")", + new_size, metadata_size_limit); + grpc_chttp2_cancel_stream( + t, s, + grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "received initial metadata size exceeds limit"), + GRPC_ERROR_INT_GRPC_STATUS, + GRPC_STATUS_RESOURCE_EXHAUSTED)); + grpc_chttp2_parsing_become_skip_parser(t); + s->seen_error = true; GRPC_MDELEM_UNREF(md); } else { - const size_t new_size = s->metadata_buffer[0].size + GRPC_MDELEM_LENGTH(md); - const size_t metadata_size_limit = - t->settings[GRPC_ACKED_SETTINGS] - [GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE]; - if (new_size > metadata_size_limit) { - gpr_log(GPR_DEBUG, - "received initial metadata size exceeds limit (%" PRIuPTR - " vs. %" PRIuPTR ")", - new_size, metadata_size_limit); - grpc_chttp2_cancel_stream( - t, s, - grpc_error_set_int( - GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "received initial metadata size exceeds limit"), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED)); + grpc_error* error = + grpc_chttp2_incoming_metadata_buffer_add(&s->metadata_buffer[0], md); + if (error != GRPC_ERROR_NONE) { + grpc_chttp2_cancel_stream(t, s, error); grpc_chttp2_parsing_become_skip_parser(t); s->seen_error = true; GRPC_MDELEM_UNREF(md); - } else { - grpc_error* error = - grpc_chttp2_incoming_metadata_buffer_add(&s->metadata_buffer[0], md); - if (error != GRPC_ERROR_NONE) { - grpc_chttp2_cancel_stream(t, s, error); - grpc_chttp2_parsing_become_skip_parser(t); - s->seen_error = true; - GRPC_MDELEM_UNREF(md); - } } } } @@ -491,8 +506,19 @@ static void on_trailing_header(void* tp, grpc_mdelem md) { gpr_free(value); } - if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_GRPC_STATUS) && - !grpc_mdelem_eq(md, GRPC_MDELEM_GRPC_STATUS_0)) { + if (GRPC_MDELEM_STORAGE(md) == GRPC_MDELEM_STORAGE_STATIC) { + // We don't use grpc_mdelem_eq here to avoid executing additional + // instructions. The reasoning is if the payload is not equal, we already + // know that the metadata elements are not equal because the md is + // confirmed to be static. If we had used grpc_mdelem_eq here, then if the + // payloads are not equal, grpc_mdelem_eq executes more instructions to + // determine if they're equal or not. + if (md.payload == GRPC_MDELEM_GRPC_STATUS_1.payload || + md.payload == GRPC_MDELEM_GRPC_STATUS_2.payload) { + s->seen_error = true; + } + } else if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_GRPC_STATUS) && + !grpc_mdelem_eq(md, GRPC_MDELEM_GRPC_STATUS_0)) { /* TODO(ctiller): check for a status like " 0" */ s->seen_error = true; } @@ -598,6 +624,9 @@ static grpc_error* init_header_frame_parser(grpc_chttp2_transport* t, gpr_log(GPR_ERROR, "grpc_chttp2_stream not accepted")); return init_skip_frame_parser(t, 1); } + if (t->channelz_socket != nullptr) { + t->channelz_socket->RecordStreamStartedFromRemote(); + } } else { t->incoming_stream = s; } @@ -611,6 +640,9 @@ static grpc_error* init_header_frame_parser(grpc_chttp2_transport* t, } t->parser = grpc_chttp2_header_parser_parse; t->parser_data = &t->hpack_parser; + if (t->header_eof) { + s->eos_received = true; + } switch (s->header_frames_received) { case 0: if (t->is_client && t->header_eof) { @@ -725,9 +757,10 @@ static grpc_error* parse_frame_slice(grpc_chttp2_transport* t, grpc_slice slice, int is_last) { grpc_chttp2_stream* s = t->incoming_stream; grpc_error* err = t->parser(t->parser_data, t, s, slice, is_last); + intptr_t unused; if (GPR_LIKELY(err == GRPC_ERROR_NONE)) { return err; - } else if (grpc_error_get_int(err, GRPC_ERROR_INT_STREAM_ID, nullptr)) { + } else if (grpc_error_get_int(err, GRPC_ERROR_INT_STREAM_ID, &unused)) { if (grpc_http_trace.enabled()) { const char* msg = grpc_error_string(err); gpr_log(GPR_ERROR, "%s", msg); diff --git a/src/core/ext/transport/chttp2/transport/writing.cc b/src/core/ext/transport/chttp2/transport/writing.cc index c9273f7e39..3b3367d0f3 100644 --- a/src/core/ext/transport/chttp2/transport/writing.cc +++ b/src/core/ext/transport/chttp2/transport/writing.cc @@ -574,6 +574,7 @@ class StreamWriteContext { void SentLastFrame() { s_->send_trailing_metadata = nullptr; s_->sent_trailing_metadata = true; + s_->eos_sent = true; if (!t_->is_client && !s_->read_closed) { grpc_slice_buffer_add( @@ -637,6 +638,11 @@ void grpc_chttp2_end_write(grpc_chttp2_transport* t, grpc_error* error) { GPR_TIMER_SCOPE("grpc_chttp2_end_write", 0); grpc_chttp2_stream* s; + if (t->channelz_socket != nullptr) { + t->channelz_socket->RecordMessagesSent(t->num_messages_in_next_write); + } + t->num_messages_in_next_write = 0; + while (grpc_chttp2_list_pop_writing_stream(t, &s)) { if (s->sending_bytes != 0) { update_list(t, s, static_cast<int64_t>(s->sending_bytes), diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.cc b/src/core/ext/transport/cronet/transport/cronet_transport.cc index 4a252d972d..349d8681d5 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.cc +++ b/src/core/ext/transport/cronet/transport/cronet_transport.cc @@ -111,16 +111,21 @@ typedef struct grpc_cronet_transport grpc_cronet_transport; /* TODO (makdharma): reorder structure for memory efficiency per http://www.catb.org/esr/structure-packing/#_structure_reordering: */ struct read_state { + read_state(gpr_arena* arena) + : trailing_metadata(arena), initial_metadata(arena) { + grpc_slice_buffer_init(&read_slice_buffer); + } + /* vars to store data coming from server */ - char* read_buffer; - bool length_field_received; - int received_bytes; - int remaining_bytes; - int length_field; - bool compressed; - char grpc_header_bytes[GRPC_HEADER_SIZE_IN_BYTES]; - char* payload_field; - bool read_stream_closed; + char* read_buffer = nullptr; + bool length_field_received = false; + int received_bytes = 0; + int remaining_bytes = 0; + int length_field = 0; + bool compressed = 0; + char grpc_header_bytes[GRPC_HEADER_SIZE_IN_BYTES] = {}; + char* payload_field = nullptr; + bool read_stream_closed = 0; /* vars for holding data destined for the application */ grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sbs; @@ -128,59 +133,71 @@ struct read_state { /* vars for trailing metadata */ grpc_chttp2_incoming_metadata_buffer trailing_metadata; - bool trailing_metadata_valid; + bool trailing_metadata_valid = false; /* vars for initial metadata */ grpc_chttp2_incoming_metadata_buffer initial_metadata; }; struct write_state { - char* write_buffer; + char* write_buffer = nullptr; }; /* track state of one stream op */ struct op_state { - bool state_op_done[OP_NUM_OPS]; - bool state_callback_received[OP_NUM_OPS]; + op_state(gpr_arena* arena) : rs(arena) {} + + bool state_op_done[OP_NUM_OPS] = {}; + bool state_callback_received[OP_NUM_OPS] = {}; /* A non-zero gRPC status code has been seen */ - bool fail_state; + bool fail_state = false; /* Transport is discarding all buffered messages */ - bool flush_read; - bool flush_cronet_when_ready; - bool pending_write_for_trailer; - bool pending_send_message; + bool flush_read = false; + bool flush_cronet_when_ready = false; + bool pending_write_for_trailer = false; + bool pending_send_message = false; /* User requested RECV_TRAILING_METADATA */ - bool pending_recv_trailing_metadata; + bool pending_recv_trailing_metadata = false; /* Cronet has not issued a callback of a bidirectional read */ - bool pending_read_from_cronet; - grpc_error* cancel_error; + bool pending_read_from_cronet = false; + grpc_error* cancel_error = GRPC_ERROR_NONE; /* data structure for storing data coming from server */ struct read_state rs; /* data structure for storing data going to the server */ struct write_state ws; }; +struct stream_obj; + struct op_and_state { + op_and_state(stream_obj* s, const grpc_transport_stream_op_batch& op); + grpc_transport_stream_op_batch op; struct op_state state; - bool done; - struct stream_obj* s; /* Pointer back to the stream object */ - struct op_and_state* next; /* next op_and_state in the linked list */ + bool done = false; + struct stream_obj* s; /* Pointer back to the stream object */ + /* next op_and_state in the linked list */ + struct op_and_state* next; }; struct op_storage { - int num_pending_ops; - struct op_and_state* head; + int num_pending_ops = 0; + struct op_and_state* head = nullptr; }; struct stream_obj { + stream_obj(grpc_transport* gt, grpc_stream* gs, + grpc_stream_refcount* refcount, gpr_arena* arena); + ~stream_obj(); + gpr_arena* arena; - struct op_and_state* oas; - grpc_transport_stream_op_batch* curr_op; + struct op_and_state* oas = nullptr; + grpc_transport_stream_op_batch* curr_op = nullptr; grpc_cronet_transport* curr_ct; grpc_stream* curr_gs; - bidirectional_stream* cbs; - bidirectional_stream_header_array header_array; + bidirectional_stream* cbs = nullptr; + bidirectional_stream_header_array header_array = + bidirectional_stream_header_array(); // Zero-initialize the structure. /* Stream level state. Some state will be tracked both at stream and stream_op * level */ @@ -195,7 +212,6 @@ struct stream_obj { /* Refcount object of the stream */ grpc_stream_refcount* refcount; }; -typedef struct stream_obj stream_obj; #ifndef NDEBUG #define GRPC_CRONET_STREAM_REF(stream, reason) \ @@ -306,6 +322,10 @@ static grpc_error* make_error_with_desc(int error_code, const char* desc) { return error; } +inline op_and_state::op_and_state(stream_obj* s, + const grpc_transport_stream_op_batch& op) + : op(op), state(s->arena), s(s), next(s->storage.head) {} + /* Add a new stream op to op storage. */ @@ -314,14 +334,8 @@ static void add_to_storage(struct stream_obj* s, struct op_storage* storage = &s->storage; /* add new op at the beginning of the linked list. The memory is freed in remove_from_storage */ - struct op_and_state* new_op = static_cast<struct op_and_state*>( - gpr_malloc(sizeof(struct op_and_state))); - memcpy(&new_op->op, op, sizeof(grpc_transport_stream_op_batch)); - memset(&new_op->state, 0, sizeof(new_op->state)); - new_op->s = s; - new_op->done = false; + op_and_state* new_op = grpc_core::New<op_and_state>(s, *op); gpr_mu_lock(&s->mu); - new_op->next = storage->head; storage->head = new_op; storage->num_pending_ops++; if (op->send_message) { @@ -347,7 +361,7 @@ static void remove_from_storage(struct stream_obj* s, } if (s->storage.head == oas) { s->storage.head = oas->next; - gpr_free(oas); + grpc_core::Delete(oas); s->storage.num_pending_ops--; CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas, s->storage.num_pending_ops); @@ -358,7 +372,7 @@ static void remove_from_storage(struct stream_obj* s, s->storage.num_pending_ops--; CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas, s->storage.num_pending_ops); - gpr_free(oas); + grpc_core::Delete(oas); break; } else if (GPR_UNLIKELY(curr->next == nullptr)) { CRONET_LOG(GPR_ERROR, "Reached end of LL and did not find op to free"); @@ -540,10 +554,6 @@ static void on_response_headers_received( } gpr_mu_lock(&s->mu); - memset(&s->state.rs.initial_metadata, 0, - sizeof(s->state.rs.initial_metadata)); - grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.initial_metadata, - s->arena); convert_cronet_array_to_metadata(headers, &s->state.rs.initial_metadata); s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true; if (!(s->state.state_op_done[OP_CANCEL_ERROR] || @@ -634,11 +644,7 @@ static void on_response_trailers_received( stream_obj* s = static_cast<stream_obj*>(stream->annotation); grpc_cronet_transport* t = s->curr_ct; gpr_mu_lock(&s->mu); - memset(&s->state.rs.trailing_metadata, 0, - sizeof(s->state.rs.trailing_metadata)); s->state.rs.trailing_metadata_valid = false; - grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.trailing_metadata, - s->arena); convert_cronet_array_to_metadata(trailers, &s->state.rs.trailing_metadata); if (trailers->count > 0) { s->state.rs.trailing_metadata_valid = true; @@ -1287,7 +1293,7 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) { grpc_error* error = GRPC_ERROR_NONE; if (stream_state->state_op_done[OP_CANCEL_ERROR]) { error = GRPC_ERROR_REF(stream_state->cancel_error); - } else if (stream_state->state_op_done[OP_FAILED]) { + } else if (stream_state->state_callback_received[OP_FAILED]) { error = make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."); } else if (oas->s->state.rs.trailing_metadata_valid) { grpc_chttp2_incoming_metadata_buffer_publish( @@ -1354,36 +1360,28 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) { Functions used by upper layers to access transport functionality. */ +inline stream_obj::stream_obj(grpc_transport* gt, grpc_stream* gs, + grpc_stream_refcount* refcount, gpr_arena* arena) + : arena(arena), + curr_ct(reinterpret_cast<grpc_cronet_transport*>(gt)), + curr_gs(gs), + state(arena), + refcount(refcount) { + GRPC_CRONET_STREAM_REF(this, "cronet transport"); + gpr_mu_init(&mu); +} + +inline stream_obj::~stream_obj() { + null_and_maybe_free_read_buffer(this); + /* Clean up read_slice_buffer in case there is unread data. */ + grpc_slice_buffer_destroy_internal(&state.rs.read_slice_buffer); + GRPC_ERROR_UNREF(state.cancel_error); +} + static int init_stream(grpc_transport* gt, grpc_stream* gs, grpc_stream_refcount* refcount, const void* server_data, gpr_arena* arena) { - stream_obj* s = reinterpret_cast<stream_obj*>(gs); - - s->refcount = refcount; - GRPC_CRONET_STREAM_REF(s, "cronet transport"); - memset(&s->storage, 0, sizeof(s->storage)); - s->storage.head = nullptr; - memset(&s->state, 0, sizeof(s->state)); - s->curr_op = nullptr; - s->cbs = nullptr; - memset(&s->header_array, 0, sizeof(s->header_array)); - memset(&s->state.rs, 0, sizeof(s->state.rs)); - memset(&s->state.ws, 0, sizeof(s->state.ws)); - memset(s->state.state_op_done, 0, sizeof(s->state.state_op_done)); - memset(s->state.state_callback_received, 0, - sizeof(s->state.state_callback_received)); - s->state.fail_state = s->state.flush_read = false; - s->state.cancel_error = nullptr; - s->state.flush_cronet_when_ready = s->state.pending_write_for_trailer = false; - s->state.pending_send_message = false; - s->state.pending_recv_trailing_metadata = false; - s->state.pending_read_from_cronet = false; - - s->curr_gs = gs; - s->curr_ct = reinterpret_cast<grpc_cronet_transport*>(gt); - s->arena = arena; - - gpr_mu_init(&s->mu); + new (gs) stream_obj(gt, gs, refcount, arena); return 0; } @@ -1426,10 +1424,7 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, static void destroy_stream(grpc_transport* gt, grpc_stream* gs, grpc_closure* then_schedule_closure) { stream_obj* s = reinterpret_cast<stream_obj*>(gs); - null_and_maybe_free_read_buffer(s); - /* Clean up read_slice_buffer in case there is unread data. */ - grpc_slice_buffer_destroy_internal(&s->state.rs.read_slice_buffer); - GRPC_ERROR_UNREF(s->state.cancel_error); + s->~stream_obj(); GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE); } diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc index b0ca7f8207..61968de4d5 100644 --- a/src/core/ext/transport/inproc/inproc_transport.cc +++ b/src/core/ext/transport/inproc/inproc_transport.cc @@ -40,18 +40,68 @@ if (grpc_inproc_trace.enabled()) gpr_log(__VA_ARGS__); \ } while (0) -static grpc_slice g_empty_slice; -static grpc_slice g_fake_path_key; -static grpc_slice g_fake_path_value; -static grpc_slice g_fake_auth_key; -static grpc_slice g_fake_auth_value; +namespace { +grpc_slice g_empty_slice; +grpc_slice g_fake_path_key; +grpc_slice g_fake_path_value; +grpc_slice g_fake_auth_key; +grpc_slice g_fake_auth_value; + +struct inproc_stream; +bool cancel_stream_locked(inproc_stream* s, grpc_error* error); +void op_state_machine(void* arg, grpc_error* error); +void log_metadata(const grpc_metadata_batch* md_batch, bool is_client, + bool is_initial); +grpc_error* fill_in_metadata(inproc_stream* s, + const grpc_metadata_batch* metadata, + uint32_t flags, grpc_metadata_batch* out_md, + uint32_t* outflags, bool* markfilled); + +struct shared_mu { + shared_mu() { + // Share one lock between both sides since both sides get affected + gpr_mu_init(&mu); + gpr_ref_init(&refs, 2); + } -typedef struct { gpr_mu mu; gpr_refcount refs; -} shared_mu; +}; + +struct inproc_transport { + inproc_transport(const grpc_transport_vtable* vtable, shared_mu* mu, + bool is_client) + : mu(mu), is_client(is_client) { + base.vtable = vtable; + // Start each side of transport with 2 refs since they each have a ref + // to the other + gpr_ref_init(&refs, 2); + grpc_connectivity_state_init(&connectivity, GRPC_CHANNEL_READY, + is_client ? "inproc_client" : "inproc_server"); + } + + ~inproc_transport() { + grpc_connectivity_state_destroy(&connectivity); + if (gpr_unref(&mu->refs)) { + gpr_free(mu); + } + } + + void ref() { + INPROC_LOG(GPR_INFO, "ref_transport %p", this); + gpr_ref(&refs); + } + + void unref() { + INPROC_LOG(GPR_INFO, "unref_transport %p", this); + if (!gpr_unref(&refs)) { + return; + } + INPROC_LOG(GPR_INFO, "really_destroy_transport %p", this); + this->~inproc_transport(); + gpr_free(this); + } -typedef struct inproc_transport { grpc_transport base; shared_mu* mu; gpr_refcount refs; @@ -60,128 +110,174 @@ typedef struct inproc_transport { void (*accept_stream_cb)(void* user_data, grpc_transport* transport, const void* server_data); void* accept_stream_data; - bool is_closed; + bool is_closed = false; struct inproc_transport* other_side; - struct inproc_stream* stream_list; -} inproc_transport; + struct inproc_stream* stream_list = nullptr; +}; + +struct inproc_stream { + inproc_stream(inproc_transport* t, grpc_stream_refcount* refcount, + const void* server_data, gpr_arena* arena) + : t(t), refs(refcount), arena(arena) { + // Ref this stream right now for ctor and list. + ref("inproc_init_stream:init"); + ref("inproc_init_stream:list"); + + grpc_metadata_batch_init(&to_read_initial_md); + grpc_metadata_batch_init(&to_read_trailing_md); + GRPC_CLOSURE_INIT(&op_closure, op_state_machine, this, + grpc_schedule_on_exec_ctx); + grpc_metadata_batch_init(&write_buffer_initial_md); + grpc_metadata_batch_init(&write_buffer_trailing_md); + + stream_list_prev = nullptr; + gpr_mu_lock(&t->mu->mu); + stream_list_next = t->stream_list; + if (t->stream_list) { + t->stream_list->stream_list_prev = this; + } + t->stream_list = this; + gpr_mu_unlock(&t->mu->mu); + + if (!server_data) { + t->ref(); + inproc_transport* st = t->other_side; + st->ref(); + other_side = nullptr; // will get filled in soon + // Pass the client-side stream address to the server-side for a ref + ref("inproc_init_stream:clt"); // ref it now on behalf of server + // side to avoid destruction + INPROC_LOG(GPR_INFO, "calling accept stream cb %p %p", + st->accept_stream_cb, st->accept_stream_data); + (*st->accept_stream_cb)(st->accept_stream_data, &st->base, (void*)this); + } else { + // This is the server-side and is being called through accept_stream_cb + inproc_stream* cs = (inproc_stream*)server_data; + other_side = cs; + // Ref the server-side stream on behalf of the client now + ref("inproc_init_stream:srv"); + + // Now we are about to affect the other side, so lock the transport + // to make sure that it doesn't get destroyed + gpr_mu_lock(&t->mu->mu); + cs->other_side = this; + // Now transfer from the other side's write_buffer if any to the to_read + // buffer + if (cs->write_buffer_initial_md_filled) { + fill_in_metadata(this, &cs->write_buffer_initial_md, + cs->write_buffer_initial_md_flags, &to_read_initial_md, + &to_read_initial_md_flags, &to_read_initial_md_filled); + deadline = GPR_MIN(deadline, cs->write_buffer_deadline); + grpc_metadata_batch_clear(&cs->write_buffer_initial_md); + cs->write_buffer_initial_md_filled = false; + } + if (cs->write_buffer_trailing_md_filled) { + fill_in_metadata(this, &cs->write_buffer_trailing_md, 0, + &to_read_trailing_md, nullptr, + &to_read_trailing_md_filled); + grpc_metadata_batch_clear(&cs->write_buffer_trailing_md); + cs->write_buffer_trailing_md_filled = false; + } + if (cs->write_buffer_cancel_error != GRPC_ERROR_NONE) { + cancel_other_error = cs->write_buffer_cancel_error; + cs->write_buffer_cancel_error = GRPC_ERROR_NONE; + } + + gpr_mu_unlock(&t->mu->mu); + } + } + + ~inproc_stream() { + GRPC_ERROR_UNREF(write_buffer_cancel_error); + GRPC_ERROR_UNREF(cancel_self_error); + GRPC_ERROR_UNREF(cancel_other_error); + + if (recv_inited) { + grpc_slice_buffer_destroy_internal(&recv_message); + } + + t->unref(); + + if (closure_at_destroy) { + GRPC_CLOSURE_SCHED(closure_at_destroy, GRPC_ERROR_NONE); + } + } + +#ifndef NDEBUG +#define STREAM_REF(refs, reason) grpc_stream_ref(refs, reason) +#define STREAM_UNREF(refs, reason) grpc_stream_unref(refs, reason) +#else +#define STREAM_REF(refs, reason) grpc_stream_ref(refs) +#define STREAM_UNREF(refs, reason) grpc_stream_unref(refs) +#endif + void ref(const char* reason) { + INPROC_LOG(GPR_INFO, "ref_stream %p %s", this, reason); + STREAM_REF(refs, reason); + } + + void unref(const char* reason) { + INPROC_LOG(GPR_INFO, "unref_stream %p %s", this, reason); + STREAM_UNREF(refs, reason); + } +#undef STREAM_REF +#undef STREAM_UNREF -typedef struct inproc_stream { inproc_transport* t; grpc_metadata_batch to_read_initial_md; - uint32_t to_read_initial_md_flags; - bool to_read_initial_md_filled; + uint32_t to_read_initial_md_flags = 0; + bool to_read_initial_md_filled = false; grpc_metadata_batch to_read_trailing_md; - bool to_read_trailing_md_filled; - bool ops_needed; - bool op_closure_scheduled; + bool to_read_trailing_md_filled = false; + bool ops_needed = false; + bool op_closure_scheduled = false; grpc_closure op_closure; // Write buffer used only during gap at init time when client-side // stream is set up but server side stream is not yet set up grpc_metadata_batch write_buffer_initial_md; - bool write_buffer_initial_md_filled; - uint32_t write_buffer_initial_md_flags; - grpc_millis write_buffer_deadline; + bool write_buffer_initial_md_filled = false; + uint32_t write_buffer_initial_md_flags = 0; + grpc_millis write_buffer_deadline = GRPC_MILLIS_INF_FUTURE; grpc_metadata_batch write_buffer_trailing_md; - bool write_buffer_trailing_md_filled; - grpc_error* write_buffer_cancel_error; + bool write_buffer_trailing_md_filled = false; + grpc_error* write_buffer_cancel_error = GRPC_ERROR_NONE; struct inproc_stream* other_side; - bool other_side_closed; // won't talk anymore - bool write_buffer_other_side_closed; // on hold + bool other_side_closed = false; // won't talk anymore + bool write_buffer_other_side_closed = false; // on hold grpc_stream_refcount* refs; - grpc_closure* closure_at_destroy; + grpc_closure* closure_at_destroy = nullptr; gpr_arena* arena; - grpc_transport_stream_op_batch* send_message_op; - grpc_transport_stream_op_batch* send_trailing_md_op; - grpc_transport_stream_op_batch* recv_initial_md_op; - grpc_transport_stream_op_batch* recv_message_op; - grpc_transport_stream_op_batch* recv_trailing_md_op; + grpc_transport_stream_op_batch* send_message_op = nullptr; + grpc_transport_stream_op_batch* send_trailing_md_op = nullptr; + grpc_transport_stream_op_batch* recv_initial_md_op = nullptr; + grpc_transport_stream_op_batch* recv_message_op = nullptr; + grpc_transport_stream_op_batch* recv_trailing_md_op = nullptr; grpc_slice_buffer recv_message; grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> recv_stream; - bool recv_inited; + bool recv_inited = false; - bool initial_md_sent; - bool trailing_md_sent; - bool initial_md_recvd; - bool trailing_md_recvd; + bool initial_md_sent = false; + bool trailing_md_sent = false; + bool initial_md_recvd = false; + bool trailing_md_recvd = false; - bool closed; + bool closed = false; - grpc_error* cancel_self_error; - grpc_error* cancel_other_error; + grpc_error* cancel_self_error = GRPC_ERROR_NONE; + grpc_error* cancel_other_error = GRPC_ERROR_NONE; - grpc_millis deadline; + grpc_millis deadline = GRPC_MILLIS_INF_FUTURE; - bool listed; + bool listed = true; struct inproc_stream* stream_list_prev; struct inproc_stream* stream_list_next; -} inproc_stream; - -static bool cancel_stream_locked(inproc_stream* s, grpc_error* error); -static void op_state_machine(void* arg, grpc_error* error); - -static void ref_transport(inproc_transport* t) { - INPROC_LOG(GPR_INFO, "ref_transport %p", t); - gpr_ref(&t->refs); -} - -static void really_destroy_transport(inproc_transport* t) { - INPROC_LOG(GPR_INFO, "really_destroy_transport %p", t); - grpc_connectivity_state_destroy(&t->connectivity); - if (gpr_unref(&t->mu->refs)) { - gpr_free(t->mu); - } - gpr_free(t); -} - -static void unref_transport(inproc_transport* t) { - INPROC_LOG(GPR_INFO, "unref_transport %p", t); - if (gpr_unref(&t->refs)) { - really_destroy_transport(t); - } -} - -#ifndef NDEBUG -#define STREAM_REF(refs, reason) grpc_stream_ref(refs, reason) -#define STREAM_UNREF(refs, reason) grpc_stream_unref(refs, reason) -#else -#define STREAM_REF(refs, reason) grpc_stream_ref(refs) -#define STREAM_UNREF(refs, reason) grpc_stream_unref(refs) -#endif - -static void ref_stream(inproc_stream* s, const char* reason) { - INPROC_LOG(GPR_INFO, "ref_stream %p %s", s, reason); - STREAM_REF(s->refs, reason); -} - -static void unref_stream(inproc_stream* s, const char* reason) { - INPROC_LOG(GPR_INFO, "unref_stream %p %s", s, reason); - STREAM_UNREF(s->refs, reason); -} - -static void really_destroy_stream(inproc_stream* s) { - INPROC_LOG(GPR_INFO, "really_destroy_stream %p", s); +}; - GRPC_ERROR_UNREF(s->write_buffer_cancel_error); - GRPC_ERROR_UNREF(s->cancel_self_error); - GRPC_ERROR_UNREF(s->cancel_other_error); - - if (s->recv_inited) { - grpc_slice_buffer_destroy_internal(&s->recv_message); - } - - unref_transport(s->t); - - if (s->closure_at_destroy) { - GRPC_CLOSURE_SCHED(s->closure_at_destroy, GRPC_ERROR_NONE); - } -} - -static void log_metadata(const grpc_metadata_batch* md_batch, bool is_client, - bool is_initial) { +void log_metadata(const grpc_metadata_batch* md_batch, bool is_client, + bool is_initial) { for (grpc_linked_mdelem* md = md_batch->list.head; md != nullptr; md = md->next) { char* key = grpc_slice_to_c_string(GRPC_MDKEY(md->md)); @@ -193,10 +289,10 @@ static void log_metadata(const grpc_metadata_batch* md_batch, bool is_client, } } -static grpc_error* fill_in_metadata(inproc_stream* s, - const grpc_metadata_batch* metadata, - uint32_t flags, grpc_metadata_batch* out_md, - uint32_t* outflags, bool* markfilled) { +grpc_error* fill_in_metadata(inproc_stream* s, + const grpc_metadata_batch* metadata, + uint32_t flags, grpc_metadata_batch* out_md, + uint32_t* outflags, bool* markfilled) { if (grpc_inproc_trace.enabled()) { log_metadata(metadata, s->t->is_client, outflags != nullptr); } @@ -221,109 +317,16 @@ static grpc_error* fill_in_metadata(inproc_stream* s, return error; } -static int init_stream(grpc_transport* gt, grpc_stream* gs, - grpc_stream_refcount* refcount, const void* server_data, - gpr_arena* arena) { +int init_stream(grpc_transport* gt, grpc_stream* gs, + grpc_stream_refcount* refcount, const void* server_data, + gpr_arena* arena) { INPROC_LOG(GPR_INFO, "init_stream %p %p %p", gt, gs, server_data); inproc_transport* t = reinterpret_cast<inproc_transport*>(gt); - inproc_stream* s = reinterpret_cast<inproc_stream*>(gs); - s->arena = arena; - - s->refs = refcount; - // Ref this stream right now - ref_stream(s, "inproc_init_stream:init"); - - grpc_metadata_batch_init(&s->to_read_initial_md); - s->to_read_initial_md_flags = 0; - s->to_read_initial_md_filled = false; - grpc_metadata_batch_init(&s->to_read_trailing_md); - s->to_read_trailing_md_filled = false; - grpc_metadata_batch_init(&s->write_buffer_initial_md); - s->write_buffer_initial_md_flags = 0; - s->write_buffer_initial_md_filled = false; - grpc_metadata_batch_init(&s->write_buffer_trailing_md); - s->write_buffer_trailing_md_filled = false; - s->ops_needed = false; - s->op_closure_scheduled = false; - GRPC_CLOSURE_INIT(&s->op_closure, op_state_machine, s, - grpc_schedule_on_exec_ctx); - s->t = t; - s->closure_at_destroy = nullptr; - s->other_side_closed = false; - - s->initial_md_sent = s->trailing_md_sent = s->initial_md_recvd = - s->trailing_md_recvd = false; - - s->closed = false; - - s->cancel_self_error = GRPC_ERROR_NONE; - s->cancel_other_error = GRPC_ERROR_NONE; - s->write_buffer_cancel_error = GRPC_ERROR_NONE; - s->deadline = GRPC_MILLIS_INF_FUTURE; - s->write_buffer_deadline = GRPC_MILLIS_INF_FUTURE; - - s->stream_list_prev = nullptr; - gpr_mu_lock(&t->mu->mu); - s->listed = true; - ref_stream(s, "inproc_init_stream:list"); - s->stream_list_next = t->stream_list; - if (t->stream_list) { - t->stream_list->stream_list_prev = s; - } - t->stream_list = s; - gpr_mu_unlock(&t->mu->mu); - - if (!server_data) { - ref_transport(t); - inproc_transport* st = t->other_side; - ref_transport(st); - s->other_side = nullptr; // will get filled in soon - // Pass the client-side stream address to the server-side for a ref - ref_stream(s, "inproc_init_stream:clt"); // ref it now on behalf of server - // side to avoid destruction - INPROC_LOG(GPR_INFO, "calling accept stream cb %p %p", st->accept_stream_cb, - st->accept_stream_data); - (*st->accept_stream_cb)(st->accept_stream_data, &st->base, (void*)s); - } else { - // This is the server-side and is being called through accept_stream_cb - inproc_stream* cs = (inproc_stream*)server_data; - s->other_side = cs; - // Ref the server-side stream on behalf of the client now - ref_stream(s, "inproc_init_stream:srv"); - - // Now we are about to affect the other side, so lock the transport - // to make sure that it doesn't get destroyed - gpr_mu_lock(&s->t->mu->mu); - cs->other_side = s; - // Now transfer from the other side's write_buffer if any to the to_read - // buffer - if (cs->write_buffer_initial_md_filled) { - fill_in_metadata(s, &cs->write_buffer_initial_md, - cs->write_buffer_initial_md_flags, - &s->to_read_initial_md, &s->to_read_initial_md_flags, - &s->to_read_initial_md_filled); - s->deadline = GPR_MIN(s->deadline, cs->write_buffer_deadline); - grpc_metadata_batch_clear(&cs->write_buffer_initial_md); - cs->write_buffer_initial_md_filled = false; - } - if (cs->write_buffer_trailing_md_filled) { - fill_in_metadata(s, &cs->write_buffer_trailing_md, 0, - &s->to_read_trailing_md, nullptr, - &s->to_read_trailing_md_filled); - grpc_metadata_batch_clear(&cs->write_buffer_trailing_md); - cs->write_buffer_trailing_md_filled = false; - } - if (cs->write_buffer_cancel_error != GRPC_ERROR_NONE) { - s->cancel_other_error = cs->write_buffer_cancel_error; - cs->write_buffer_cancel_error = GRPC_ERROR_NONE; - } - - gpr_mu_unlock(&s->t->mu->mu); - } + new (gs) inproc_stream(t, refcount, server_data, arena); return 0; // return value is not important } -static void close_stream_locked(inproc_stream* s) { +void close_stream_locked(inproc_stream* s) { if (!s->closed) { // Release the metadata that we would have written out grpc_metadata_batch_destroy(&s->write_buffer_initial_md); @@ -341,21 +344,21 @@ static void close_stream_locked(inproc_stream* s) { n->stream_list_prev = p; } s->listed = false; - unref_stream(s, "close_stream:list"); + s->unref("close_stream:list"); } s->closed = true; - unref_stream(s, "close_stream:closing"); + s->unref("close_stream:closing"); } } // This function means that we are done talking/listening to the other side -static void close_other_side_locked(inproc_stream* s, const char* reason) { +void close_other_side_locked(inproc_stream* s, const char* reason) { if (s->other_side != nullptr) { // First release the metadata that came from the other side's arena grpc_metadata_batch_destroy(&s->to_read_initial_md); grpc_metadata_batch_destroy(&s->to_read_trailing_md); - unref_stream(s->other_side, reason); + s->other_side->unref(reason); s->other_side_closed = true; s->other_side = nullptr; } else if (!s->other_side_closed) { @@ -367,9 +370,9 @@ static void close_other_side_locked(inproc_stream* s, const char* reason) { // this stream_op_batch is only one of the pending operations for this // stream. This is called when one of the pending operations for the stream // is done and about to be NULLed out -static void complete_if_batch_end_locked(inproc_stream* s, grpc_error* error, - grpc_transport_stream_op_batch* op, - const char* msg) { +void complete_if_batch_end_locked(inproc_stream* s, grpc_error* error, + grpc_transport_stream_op_batch* op, + const char* msg) { int is_sm = static_cast<int>(op == s->send_message_op); int is_stm = static_cast<int>(op == s->send_trailing_md_op); // TODO(vjpai): We should not consider the recv ops here, since they @@ -386,8 +389,7 @@ static void complete_if_batch_end_locked(inproc_stream* s, grpc_error* error, } } -static void maybe_schedule_op_closure_locked(inproc_stream* s, - grpc_error* error) { +void maybe_schedule_op_closure_locked(inproc_stream* s, grpc_error* error) { if (s && s->ops_needed && !s->op_closure_scheduled) { GRPC_CLOSURE_SCHED(&s->op_closure, GRPC_ERROR_REF(error)); s->op_closure_scheduled = true; @@ -395,7 +397,7 @@ static void maybe_schedule_op_closure_locked(inproc_stream* s, } } -static void fail_helper_locked(inproc_stream* s, grpc_error* error) { +void fail_helper_locked(inproc_stream* s, grpc_error* error) { INPROC_LOG(GPR_INFO, "op_state_machine %p fail_helper", s); // If we're failing this side, we need to make sure that // we also send or have already sent trailing metadata @@ -525,8 +527,7 @@ static void fail_helper_locked(inproc_stream* s, grpc_error* error) { // that the incoming byte stream's next() call will always return // synchronously. That assumption is true today but may not always be // true in the future. -static void message_transfer_locked(inproc_stream* sender, - inproc_stream* receiver) { +void message_transfer_locked(inproc_stream* sender, inproc_stream* receiver) { size_t remaining = sender->send_message_op->payload->send_message.send_message->length(); if (receiver->recv_inited) { @@ -572,7 +573,7 @@ static void message_transfer_locked(inproc_stream* sender, sender->send_message_op = nullptr; } -static void op_state_machine(void* arg, grpc_error* error) { +void op_state_machine(void* arg, grpc_error* error) { // This function gets called when we have contents in the unprocessed reads // Get what we want based on our ops wanted // Schedule our appropriate closures @@ -607,10 +608,8 @@ static void op_state_machine(void* arg, grpc_error* error) { if (other->recv_message_op) { message_transfer_locked(s, other); maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE); - } else if (!s->t->is_client && - (s->trailing_md_sent || other->recv_trailing_md_op)) { - // A server send will never be matched if the client is waiting - // for trailing metadata already + } else if (!s->t->is_client && s->trailing_md_sent) { + // A server send will never be matched if the server already sent status s->send_message_op->payload->send_message.send_message.reset(); complete_if_batch_end_locked( s, GRPC_ERROR_NONE, s->send_message_op, @@ -621,11 +620,15 @@ static void op_state_machine(void* arg, grpc_error* error) { // Pause a send trailing metadata if there is still an outstanding // send message unless we know that the send message will never get // matched to a receive. This happens on the client if the server has - // already sent status. + // already sent status or on the server if the client has requested + // status if (s->send_trailing_md_op && (!s->send_message_op || (s->t->is_client && - (s->trailing_md_recvd || s->to_read_trailing_md_filled)))) { + (s->trailing_md_recvd || s->to_read_trailing_md_filled)) || + (!s->t->is_client && other && + (other->trailing_md_recvd || other->to_read_trailing_md_filled || + other->recv_trailing_md_op)))) { grpc_metadata_batch* dest = (other == nullptr) ? &s->write_buffer_trailing_md : &other->to_read_trailing_md; @@ -723,16 +726,6 @@ static void op_state_machine(void* arg, grpc_error* error) { maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE); } } - if (s->recv_trailing_md_op && s->t->is_client && other && - other->send_message_op) { - INPROC_LOG(GPR_INFO, - "op_state_machine %p scheduling trailing-metadata-ready %p", s, - GRPC_ERROR_NONE); - GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata - .recv_trailing_metadata_ready, - GRPC_ERROR_NONE); - maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE); - } if (s->to_read_trailing_md_filled) { if (s->trailing_md_recvd) { new_err = @@ -748,6 +741,7 @@ static void op_state_machine(void* arg, grpc_error* error) { if (s->recv_message_op != nullptr) { // This message needs to be wrapped up because it will never be // satisfied + *s->recv_message_op->payload->recv_message.recv_message = nullptr; INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s); GRPC_CLOSURE_SCHED( s->recv_message_op->payload->recv_message.recv_message_ready, @@ -810,6 +804,7 @@ static void op_state_machine(void* arg, grpc_error* error) { // No further message will come on this stream, so finish off the // recv_message_op INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s); + *s->recv_message_op->payload->recv_message.recv_message = nullptr; GRPC_CLOSURE_SCHED( s->recv_message_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); @@ -847,7 +842,7 @@ done: GRPC_ERROR_UNREF(new_err); } -static bool cancel_stream_locked(inproc_stream* s, grpc_error* error) { +bool cancel_stream_locked(inproc_stream* s, grpc_error* error) { bool ret = false; // was the cancel accepted INPROC_LOG(GPR_INFO, "cancel_stream %p with %s", s, grpc_error_string(error)); if (s->cancel_self_error == GRPC_ERROR_NONE) { @@ -900,10 +895,10 @@ static bool cancel_stream_locked(inproc_stream* s, grpc_error* error) { return ret; } -static void do_nothing(void* arg, grpc_error* error) {} +void do_nothing(void* arg, grpc_error* error) {} -static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, - grpc_transport_stream_op_batch* op) { +void perform_stream_op(grpc_transport* gt, grpc_stream* gs, + grpc_transport_stream_op_batch* op) { INPROC_LOG(GPR_INFO, "perform_stream_op %p %p %p", gt, gs, op); inproc_stream* s = reinterpret_cast<inproc_stream*>(gs); gpr_mu* mu = &s->t->mu->mu; // save aside in case s gets closed @@ -1012,18 +1007,18 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, } // We want to initiate the closure if: - // 1. We want to send a message and the other side wants to receive or end + // 1. We want to send a message and the other side wants to receive // 2. We want to send trailing metadata and there isn't an unmatched send + // or the other side wants trailing metadata // 3. We want initial metadata and the other side has sent it // 4. We want to receive a message and there is a message ready // 5. There is trailing metadata, even if nothing specifically wants // that because that can shut down the receive message as well - if ((op->send_message && other && - ((other->recv_message_op != nullptr) || - (other->recv_trailing_md_op != nullptr))) || - (op->send_trailing_metadata && !op->send_message) || + if ((op->send_message && other && other->recv_message_op != nullptr) || + (op->send_trailing_metadata && + (!s->send_message_op || (other && other->recv_trailing_md_op))) || (op->recv_initial_metadata && s->to_read_initial_md_filled) || - (op->recv_message && other && (other->send_message_op != nullptr)) || + (op->recv_message && other && other->send_message_op != nullptr) || (s->to_read_trailing_md_filled || s->trailing_md_recvd)) { if (!s->op_closure_scheduled) { GRPC_CLOSURE_SCHED(&s->op_closure, GRPC_ERROR_NONE); @@ -1083,7 +1078,7 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, GRPC_ERROR_UNREF(error); } -static void close_transport_locked(inproc_transport* t) { +void close_transport_locked(inproc_transport* t) { INPROC_LOG(GPR_INFO, "close_transport %p %d", t, t->is_closed); grpc_connectivity_state_set( &t->connectivity, GRPC_CHANNEL_SHUTDOWN, @@ -1103,7 +1098,7 @@ static void close_transport_locked(inproc_transport* t) { } } -static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) { +void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) { inproc_transport* t = reinterpret_cast<inproc_transport*>(gt); INPROC_LOG(GPR_INFO, "perform_transport_op %p %p", t, op); gpr_mu_lock(&t->mu->mu); @@ -1136,39 +1131,64 @@ static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) { gpr_mu_unlock(&t->mu->mu); } -static void destroy_stream(grpc_transport* gt, grpc_stream* gs, - grpc_closure* then_schedule_closure) { +void destroy_stream(grpc_transport* gt, grpc_stream* gs, + grpc_closure* then_schedule_closure) { INPROC_LOG(GPR_INFO, "destroy_stream %p %p", gs, then_schedule_closure); inproc_stream* s = reinterpret_cast<inproc_stream*>(gs); s->closure_at_destroy = then_schedule_closure; - really_destroy_stream(s); + s->~inproc_stream(); } -static void destroy_transport(grpc_transport* gt) { +void destroy_transport(grpc_transport* gt) { inproc_transport* t = reinterpret_cast<inproc_transport*>(gt); INPROC_LOG(GPR_INFO, "destroy_transport %p", t); gpr_mu_lock(&t->mu->mu); close_transport_locked(t); gpr_mu_unlock(&t->mu->mu); - unref_transport(t->other_side); - unref_transport(t); + t->other_side->unref(); + t->unref(); } /******************************************************************************* * INTEGRATION GLUE */ -static void set_pollset(grpc_transport* gt, grpc_stream* gs, - grpc_pollset* pollset) { +void set_pollset(grpc_transport* gt, grpc_stream* gs, grpc_pollset* pollset) { // Nothing to do here } -static void set_pollset_set(grpc_transport* gt, grpc_stream* gs, - grpc_pollset_set* pollset_set) { +void set_pollset_set(grpc_transport* gt, grpc_stream* gs, + grpc_pollset_set* pollset_set) { // Nothing to do here } -static grpc_endpoint* get_endpoint(grpc_transport* t) { return nullptr; } +grpc_endpoint* get_endpoint(grpc_transport* t) { return nullptr; } + +const grpc_transport_vtable inproc_vtable = { + sizeof(inproc_stream), "inproc", init_stream, + set_pollset, set_pollset_set, perform_stream_op, + perform_transport_op, destroy_stream, destroy_transport, + get_endpoint}; + +/******************************************************************************* + * Main inproc transport functions + */ +void inproc_transports_create(grpc_transport** server_transport, + const grpc_channel_args* server_args, + grpc_transport** client_transport, + const grpc_channel_args* client_args) { + INPROC_LOG(GPR_INFO, "inproc_transports_create"); + shared_mu* mu = new (gpr_malloc(sizeof(*mu))) shared_mu(); + inproc_transport* st = new (gpr_malloc(sizeof(*st))) + inproc_transport(&inproc_vtable, mu, /*is_client=*/false); + inproc_transport* ct = new (gpr_malloc(sizeof(*ct))) + inproc_transport(&inproc_vtable, mu, /*is_client=*/true); + st->other_side = ct; + ct->other_side = st; + *server_transport = reinterpret_cast<grpc_transport*>(st); + *client_transport = reinterpret_cast<grpc_transport*>(ct); +} +} // namespace /******************************************************************************* * GLOBAL INIT AND DESTROY @@ -1190,48 +1210,6 @@ void grpc_inproc_transport_init(void) { g_fake_auth_value = grpc_slice_from_static_string("inproc-fail"); } -static const grpc_transport_vtable inproc_vtable = { - sizeof(inproc_stream), "inproc", init_stream, - set_pollset, set_pollset_set, perform_stream_op, - perform_transport_op, destroy_stream, destroy_transport, - get_endpoint}; - -/******************************************************************************* - * Main inproc transport functions - */ -static void inproc_transports_create(grpc_transport** server_transport, - const grpc_channel_args* server_args, - grpc_transport** client_transport, - const grpc_channel_args* client_args) { - INPROC_LOG(GPR_INFO, "inproc_transports_create"); - inproc_transport* st = - static_cast<inproc_transport*>(gpr_zalloc(sizeof(*st))); - inproc_transport* ct = - static_cast<inproc_transport*>(gpr_zalloc(sizeof(*ct))); - // Share one lock between both sides since both sides get affected - st->mu = ct->mu = static_cast<shared_mu*>(gpr_malloc(sizeof(*st->mu))); - gpr_mu_init(&st->mu->mu); - gpr_ref_init(&st->mu->refs, 2); - st->base.vtable = &inproc_vtable; - ct->base.vtable = &inproc_vtable; - // Start each side of transport with 2 refs since they each have a ref - // to the other - gpr_ref_init(&st->refs, 2); - gpr_ref_init(&ct->refs, 2); - st->is_client = false; - ct->is_client = true; - grpc_connectivity_state_init(&st->connectivity, GRPC_CHANNEL_READY, - "inproc_server"); - grpc_connectivity_state_init(&ct->connectivity, GRPC_CHANNEL_READY, - "inproc_client"); - st->other_side = ct; - ct->other_side = st; - st->stream_list = nullptr; - ct->stream_list = nullptr; - *server_transport = reinterpret_cast<grpc_transport*>(st); - *client_transport = reinterpret_cast<grpc_transport*>(ct); -} - grpc_channel* grpc_inproc_channel_create(grpc_server* server, grpc_channel_args* args, void* reserved) { @@ -1256,7 +1234,9 @@ grpc_channel* grpc_inproc_channel_create(grpc_server* server, inproc_transports_create(&server_transport, server_args, &client_transport, client_args); - grpc_server_setup_transport(server, server_transport, nullptr, server_args); + // TODO(ncteisen): design and support channelz GetSocket for inproc. + grpc_server_setup_transport(server, server_transport, nullptr, server_args, + 0); grpc_channel* channel = grpc_channel_create( "inproc", client_args, GRPC_CLIENT_DIRECT_CHANNEL, client_transport); |