diff options
author | Soheil Hassas Yeganeh <soheil@google.com> | 2018-11-02 15:20:02 -0400 |
---|---|---|
committer | Soheil Hassas Yeganeh <soheil@google.com> | 2018-11-05 10:12:39 -0500 |
commit | 48e4a81b05f2ad6541d72e819cd4f638055f13d5 (patch) | |
tree | 8d5b64b7113721afb2eb4a0363cbd3fd7e47ff41 /src/core/ext/transport | |
parent | 5e6c4491bf60aa91bd3e4fed3c8203601a4c795e (diff) |
Remeve memset(0) from arena allocated memory.
Callers are updated to properly initialize the memory.
This behavior can be overridden using GRPC_ARENA_INIT_STRATEGY
environment variable.
Diffstat (limited to 'src/core/ext/transport')
8 files changed, 615 insertions, 663 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 7c2dc0bcbe..978ecd59e4 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -154,53 +154,52 @@ bool g_flow_control_enabled = true; /******************************************************************************* * CONSTRUCTION/DESTRUCTION/REFCOUNTING */ - -static void destruct_transport(grpc_chttp2_transport* t) { +grpc_chttp2_transport::~grpc_chttp2_transport() { size_t i; - if (t->channelz_socket != nullptr) { - t->channelz_socket.reset(); + if (channelz_socket != nullptr) { + channelz_socket.reset(); } - grpc_endpoint_destroy(t->ep); + grpc_endpoint_destroy(ep); - grpc_slice_buffer_destroy_internal(&t->qbuf); + grpc_slice_buffer_destroy_internal(&qbuf); - grpc_slice_buffer_destroy_internal(&t->outbuf); - grpc_chttp2_hpack_compressor_destroy(&t->hpack_compressor); + grpc_slice_buffer_destroy_internal(&outbuf); + grpc_chttp2_hpack_compressor_destroy(&hpack_compressor); - grpc_slice_buffer_destroy_internal(&t->read_buffer); - grpc_chttp2_hpack_parser_destroy(&t->hpack_parser); - grpc_chttp2_goaway_parser_destroy(&t->goaway_parser); + grpc_slice_buffer_destroy_internal(&read_buffer); + grpc_chttp2_hpack_parser_destroy(&hpack_parser); + grpc_chttp2_goaway_parser_destroy(&goaway_parser); for (i = 0; i < STREAM_LIST_COUNT; i++) { - GPR_ASSERT(t->lists[i].head == nullptr); - GPR_ASSERT(t->lists[i].tail == nullptr); + GPR_ASSERT(lists[i].head == nullptr); + GPR_ASSERT(lists[i].tail == nullptr); } - GRPC_ERROR_UNREF(t->goaway_error); + GRPC_ERROR_UNREF(goaway_error); - GPR_ASSERT(grpc_chttp2_stream_map_size(&t->stream_map) == 0); + GPR_ASSERT(grpc_chttp2_stream_map_size(&stream_map) == 0); - grpc_chttp2_stream_map_destroy(&t->stream_map); - grpc_connectivity_state_destroy(&t->channel_callback.state_tracker); + grpc_chttp2_stream_map_destroy(&stream_map); + grpc_connectivity_state_destroy(&channel_callback.state_tracker); - GRPC_COMBINER_UNREF(t->combiner, "chttp2_transport"); + GRPC_COMBINER_UNREF(combiner, "chttp2_transport"); - cancel_pings(t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed")); + cancel_pings(this, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed")); - while (t->write_cb_pool) { - grpc_chttp2_write_cb* next = t->write_cb_pool->next; - gpr_free(t->write_cb_pool); - t->write_cb_pool = next; + while (write_cb_pool) { + grpc_chttp2_write_cb* next = write_cb_pool->next; + gpr_free(write_cb_pool); + write_cb_pool = next; } - t->flow_control.Destroy(); + flow_control.Destroy(); - GRPC_ERROR_UNREF(t->closed_with_error); - gpr_free(t->ping_acks); - gpr_free(t->peer_string); - gpr_free(t); + GRPC_ERROR_UNREF(closed_with_error); + gpr_free(ping_acks); + gpr_free(peer_string); } #ifndef NDEBUG @@ -212,7 +211,8 @@ void grpc_chttp2_unref_transport(grpc_chttp2_transport* t, const char* reason, t, val, val - 1, reason, file, line); } if (!gpr_unref(&t->refs)) return; - destruct_transport(t); + t->~grpc_chttp2_transport(); + gpr_free(t); } void grpc_chttp2_ref_transport(grpc_chttp2_transport* t, const char* reason, @@ -227,7 +227,8 @@ void grpc_chttp2_ref_transport(grpc_chttp2_transport* t, const char* reason, #else void grpc_chttp2_unref_transport(grpc_chttp2_transport* t) { if (!gpr_unref(&t->refs)) return; - destruct_transport(t); + t->~grpc_chttp2_transport(); + gpr_free(t); } void grpc_chttp2_ref_transport(grpc_chttp2_transport* t) { gpr_ref(&t->refs); } @@ -481,115 +482,97 @@ static void init_keepalive_pings_if_enabled(grpc_chttp2_transport* t) { } } -static void init_transport(grpc_chttp2_transport* t, - const grpc_channel_args* channel_args, - grpc_endpoint* ep, bool is_client, - grpc_resource_user* resource_user) { +grpc_chttp2_transport::grpc_chttp2_transport( + const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client, + grpc_resource_user* resource_user) + : ep(ep), + peer_string(grpc_endpoint_get_peer(ep)), + resource_user(resource_user), + combiner(grpc_combiner_create()), + is_client(is_client), + next_stream_id(is_client ? 1 : 2), + deframe_state(is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0) { GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) == GRPC_CHTTP2_CLIENT_CONNECT_STRLEN); - - t->base.vtable = get_vtable(); - t->ep = ep; + base.vtable = get_vtable(); /* one ref is for destroy */ - gpr_ref_init(&t->refs, 1); - t->combiner = grpc_combiner_create(); - t->peer_string = grpc_endpoint_get_peer(ep); - t->endpoint_reading = 1; - t->next_stream_id = is_client ? 1 : 2; - t->is_client = is_client; - t->resource_user = resource_user; - t->deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0; - t->is_first_frame = true; - grpc_connectivity_state_init( - &t->channel_callback.state_tracker, GRPC_CHANNEL_READY, - is_client ? "client_transport" : "server_transport"); - - grpc_slice_buffer_init(&t->qbuf); - grpc_slice_buffer_init(&t->outbuf); - grpc_chttp2_hpack_compressor_init(&t->hpack_compressor); - - init_transport_closures(t); - - t->goaway_error = GRPC_ERROR_NONE; - grpc_chttp2_goaway_parser_init(&t->goaway_parser); - grpc_chttp2_hpack_parser_init(&t->hpack_parser); - - grpc_slice_buffer_init(&t->read_buffer); - + gpr_ref_init(&refs, 1); /* 8 is a random stab in the dark as to a good initial size: it's small enough that it shouldn't waste memory for infrequently used connections, yet large enough that the exponential growth should happen nicely when it's needed. TODO(ctiller): tune this */ - grpc_chttp2_stream_map_init(&t->stream_map, 8); + grpc_chttp2_stream_map_init(&stream_map, 8); + grpc_slice_buffer_init(&read_buffer); + grpc_connectivity_state_init( + &channel_callback.state_tracker, GRPC_CHANNEL_READY, + is_client ? "client_transport" : "server_transport"); + grpc_slice_buffer_init(&outbuf); + if (is_client) { + grpc_slice_buffer_add(&outbuf, grpc_slice_from_copied_string( + GRPC_CHTTP2_CLIENT_CONNECT_STRING)); + } + grpc_chttp2_hpack_compressor_init(&hpack_compressor); + grpc_slice_buffer_init(&qbuf); /* copy in initial settings to all setting sets */ size_t i; int j; for (i = 0; i < GRPC_CHTTP2_NUM_SETTINGS; i++) { for (j = 0; j < GRPC_NUM_SETTING_SETS; j++) { - t->settings[j][i] = grpc_chttp2_settings_parameters[i].default_value; + settings[j][i] = grpc_chttp2_settings_parameters[i].default_value; } } - t->dirtied_local_settings = 1; - /* Hack: it's common for implementations to assume 65536 bytes initial send - window -- this should by rights be 0 */ - t->force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE; - t->sent_local_settings = 0; - t->write_buffer_size = grpc_core::chttp2::kDefaultWindow; + grpc_chttp2_hpack_parser_init(&hpack_parser); + grpc_chttp2_goaway_parser_init(&goaway_parser); - if (is_client) { - grpc_slice_buffer_add(&t->outbuf, grpc_slice_from_copied_string( - GRPC_CHTTP2_CLIENT_CONNECT_STRING)); - } + init_transport_closures(this); /* configure http2 the way we like it */ if (is_client) { - queue_setting_update(t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0); - queue_setting_update(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0); + queue_setting_update(this, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0); + queue_setting_update(this, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0); } - queue_setting_update(t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, + queue_setting_update(this, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, DEFAULT_MAX_HEADER_LIST_SIZE); - queue_setting_update(t, GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA, - 1); - - configure_transport_ping_policy(t); - init_transport_keepalive_settings(t); + queue_setting_update(this, + GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA, 1); - t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY; + configure_transport_ping_policy(this); + init_transport_keepalive_settings(this); bool enable_bdp = true; if (channel_args) { - enable_bdp = read_channel_args(t, channel_args, is_client); + enable_bdp = read_channel_args(this, channel_args, is_client); } if (g_flow_control_enabled) { - t->flow_control.Init<grpc_core::chttp2::TransportFlowControl>(t, - enable_bdp); + flow_control.Init<grpc_core::chttp2::TransportFlowControl>(this, + enable_bdp); } else { - t->flow_control.Init<grpc_core::chttp2::TransportFlowControlDisabled>(t); + flow_control.Init<grpc_core::chttp2::TransportFlowControlDisabled>(this); enable_bdp = false; } /* No pings allowed before receiving a header or data frame. */ - t->ping_state.pings_before_data_required = 0; - t->ping_state.is_delayed_ping_timer_set = false; - t->ping_state.last_ping_sent_time = GRPC_MILLIS_INF_PAST; + ping_state.pings_before_data_required = 0; + ping_state.is_delayed_ping_timer_set = false; + ping_state.last_ping_sent_time = GRPC_MILLIS_INF_PAST; - t->ping_recv_state.last_ping_recv_time = GRPC_MILLIS_INF_PAST; - t->ping_recv_state.ping_strikes = 0; + ping_recv_state.last_ping_recv_time = GRPC_MILLIS_INF_PAST; + ping_recv_state.ping_strikes = 0; - init_keepalive_pings_if_enabled(t); + init_keepalive_pings_if_enabled(this); if (enable_bdp) { - GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping"); - schedule_bdp_ping_locked(t); - grpc_chttp2_act_on_flowctl_action(t->flow_control->PeriodicUpdate(), t, + GRPC_CHTTP2_REF_TRANSPORT(this, "bdp_ping"); + schedule_bdp_ping_locked(this); + grpc_chttp2_act_on_flowctl_action(flow_control->PeriodicUpdate(), this, nullptr); } - grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE); - post_benign_reclaimer(t); + grpc_chttp2_initiate_write(this, GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE); + post_benign_reclaimer(this); } static void destroy_transport_locked(void* tp, grpc_error* error) { @@ -599,6 +582,7 @@ static void destroy_transport_locked(void* tp, grpc_error* error) { t, grpc_error_set_int( GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"), GRPC_ERROR_INT_OCCURRED_DURING_WRITE, t->write_state)); + // Must be the last line. GRPC_CHTTP2_UNREF_TRANSPORT(t, "destroy"); } @@ -683,115 +667,108 @@ void grpc_chttp2_stream_unref(grpc_chttp2_stream* s) { } #endif -static int init_stream(grpc_transport* gt, grpc_stream* gs, - grpc_stream_refcount* refcount, const void* server_data, - gpr_arena* arena) { - GPR_TIMER_SCOPE("init_stream", 0); - grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt); - grpc_chttp2_stream* s = reinterpret_cast<grpc_chttp2_stream*>(gs); - - s->t = t; - s->refcount = refcount; +grpc_chttp2_stream::grpc_chttp2_stream(grpc_chttp2_transport* t, + grpc_stream_refcount* refcount, + const void* server_data, + gpr_arena* arena) + : t(t), refcount(refcount), metadata_buffer{{arena}, {arena}} { /* We reserve one 'active stream' that's dropped when the stream is read-closed. The others are for Chttp2IncomingByteStreams that are actively reading */ - GRPC_CHTTP2_STREAM_REF(s, "chttp2"); - - grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0], arena); - grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[1], arena); - grpc_chttp2_data_parser_init(&s->data_parser); - grpc_slice_buffer_init(&s->flow_controlled_buffer); - s->deadline = GRPC_MILLIS_INF_FUTURE; - GRPC_CLOSURE_INIT(&s->complete_fetch_locked, complete_fetch_locked, s, - grpc_schedule_on_exec_ctx); - grpc_slice_buffer_init(&s->unprocessed_incoming_frames_buffer); - s->unprocessed_incoming_frames_buffer_cached_length = 0; - grpc_slice_buffer_init(&s->frame_storage); - grpc_slice_buffer_init(&s->compressed_data_buffer); - grpc_slice_buffer_init(&s->decompressed_data_buffer); - s->pending_byte_stream = false; - s->decompressed_header_bytes = 0; - GRPC_CLOSURE_INIT(&s->reset_byte_stream, reset_byte_stream, s, - grpc_combiner_scheduler(t->combiner)); - + GRPC_CHTTP2_STREAM_REF(this, "chttp2"); GRPC_CHTTP2_REF_TRANSPORT(t, "stream"); if (server_data) { - s->id = static_cast<uint32_t>((uintptr_t)server_data); - *t->accepting_stream = s; - grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); + id = static_cast<uint32_t>((uintptr_t)server_data); + *t->accepting_stream = this; + grpc_chttp2_stream_map_add(&t->stream_map, id, this); post_destructive_reclaimer(t); } - if (t->flow_control->flow_control_enabled()) { - s->flow_control.Init<grpc_core::chttp2::StreamFlowControl>( + flow_control.Init<grpc_core::chttp2::StreamFlowControl>( static_cast<grpc_core::chttp2::TransportFlowControl*>( t->flow_control.get()), - s); + this); } else { - s->flow_control.Init<grpc_core::chttp2::StreamFlowControlDisabled>(); + flow_control.Init<grpc_core::chttp2::StreamFlowControlDisabled>(); } - return 0; -} + grpc_slice_buffer_init(&frame_storage); + grpc_slice_buffer_init(&unprocessed_incoming_frames_buffer); + grpc_slice_buffer_init(&flow_controlled_buffer); + grpc_slice_buffer_init(&compressed_data_buffer); + grpc_slice_buffer_init(&decompressed_data_buffer); -static void destroy_stream_locked(void* sp, grpc_error* error) { - GPR_TIMER_SCOPE("destroy_stream", 0); - grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(sp); - grpc_chttp2_transport* t = s->t; + GRPC_CLOSURE_INIT(&complete_fetch_locked, ::complete_fetch_locked, this, + grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&reset_byte_stream, ::reset_byte_stream, this, + grpc_combiner_scheduler(t->combiner)); +} +grpc_chttp2_stream::~grpc_chttp2_stream() { if (t->channelz_socket != nullptr) { - if ((t->is_client && s->eos_received) || (!t->is_client && s->eos_sent)) { + if ((t->is_client && eos_received) || (!t->is_client && eos_sent)) { t->channelz_socket->RecordStreamSucceeded(); } else { t->channelz_socket->RecordStreamFailed(); } } - GPR_ASSERT((s->write_closed && s->read_closed) || s->id == 0); - if (s->id != 0) { - GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, s->id) == nullptr); + GPR_ASSERT((write_closed && read_closed) || id == 0); + if (id != 0) { + GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, id) == nullptr); } - grpc_slice_buffer_destroy_internal(&s->unprocessed_incoming_frames_buffer); - grpc_slice_buffer_destroy_internal(&s->frame_storage); - grpc_slice_buffer_destroy_internal(&s->compressed_data_buffer); - grpc_slice_buffer_destroy_internal(&s->decompressed_data_buffer); + grpc_slice_buffer_destroy_internal(&unprocessed_incoming_frames_buffer); + grpc_slice_buffer_destroy_internal(&frame_storage); + grpc_slice_buffer_destroy_internal(&compressed_data_buffer); + grpc_slice_buffer_destroy_internal(&decompressed_data_buffer); - grpc_chttp2_list_remove_stalled_by_transport(t, s); - grpc_chttp2_list_remove_stalled_by_stream(t, s); + grpc_chttp2_list_remove_stalled_by_transport(t, this); + grpc_chttp2_list_remove_stalled_by_stream(t, this); for (int i = 0; i < STREAM_LIST_COUNT; i++) { - if (GPR_UNLIKELY(s->included[i])) { + if (GPR_UNLIKELY(included[i])) { gpr_log(GPR_ERROR, "%s stream %d still included in list %d", - t->is_client ? "client" : "server", s->id, i); + t->is_client ? "client" : "server", id, i); abort(); } } - GPR_ASSERT(s->send_initial_metadata_finished == nullptr); - GPR_ASSERT(s->fetching_send_message == nullptr); - GPR_ASSERT(s->send_trailing_metadata_finished == nullptr); - GPR_ASSERT(s->recv_initial_metadata_ready == nullptr); - GPR_ASSERT(s->recv_message_ready == nullptr); - GPR_ASSERT(s->recv_trailing_metadata_finished == nullptr); - grpc_chttp2_data_parser_destroy(&s->data_parser); - grpc_chttp2_incoming_metadata_buffer_destroy(&s->metadata_buffer[0]); - grpc_chttp2_incoming_metadata_buffer_destroy(&s->metadata_buffer[1]); - grpc_slice_buffer_destroy_internal(&s->flow_controlled_buffer); - GRPC_ERROR_UNREF(s->read_closed_error); - GRPC_ERROR_UNREF(s->write_closed_error); - GRPC_ERROR_UNREF(s->byte_stream_error); + GPR_ASSERT(send_initial_metadata_finished == nullptr); + GPR_ASSERT(fetching_send_message == nullptr); + GPR_ASSERT(send_trailing_metadata_finished == nullptr); + GPR_ASSERT(recv_initial_metadata_ready == nullptr); + GPR_ASSERT(recv_message_ready == nullptr); + GPR_ASSERT(recv_trailing_metadata_finished == nullptr); + grpc_slice_buffer_destroy_internal(&flow_controlled_buffer); + GRPC_ERROR_UNREF(read_closed_error); + GRPC_ERROR_UNREF(write_closed_error); + GRPC_ERROR_UNREF(byte_stream_error); - s->flow_control.Destroy(); + flow_control.Destroy(); if (t->resource_user != nullptr) { grpc_resource_user_free(t->resource_user, GRPC_RESOURCE_QUOTA_CALL_SIZE); } GRPC_CHTTP2_UNREF_TRANSPORT(t, "stream"); + GRPC_CLOSURE_SCHED(destroy_stream_arg, GRPC_ERROR_NONE); +} + +static int init_stream(grpc_transport* gt, grpc_stream* gs, + grpc_stream_refcount* refcount, const void* server_data, + gpr_arena* arena) { + GPR_TIMER_SCOPE("init_stream", 0); + grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt); + new (gs) grpc_chttp2_stream(t, refcount, server_data, arena); + return 0; +} - GRPC_CLOSURE_SCHED(s->destroy_stream_arg, GRPC_ERROR_NONE); +static void destroy_stream_locked(void* sp, grpc_error* error) { + GPR_TIMER_SCOPE("destroy_stream", 0); + grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(sp); + s->~grpc_chttp2_stream(); } static void destroy_stream(grpc_transport* gt, grpc_stream* gs, @@ -1726,8 +1703,8 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, gpr_free(str); } - op->handler_private.extra_arg = gs; GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op"); + op->handler_private.extra_arg = gs; GRPC_CLOSURE_SCHED( GRPC_CLOSURE_INIT(&op->handler_private.closure, perform_stream_op_locked, op, grpc_combiner_scheduler(t->combiner)), @@ -2733,6 +2710,7 @@ static void init_keepalive_ping_locked(void* arg, grpc_error* error) { grpc_chttp2_stream_map_size(&t->stream_map) > 0) { t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING; GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end"); + grpc_timer_init_unset(&t->keepalive_watchdog_timer); send_keepalive_ping_locked(t); grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING); } else { @@ -3212,9 +3190,8 @@ intptr_t grpc_chttp2_transport_get_socket_uuid(grpc_transport* transport) { grpc_transport* grpc_create_chttp2_transport( const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client, grpc_resource_user* resource_user) { - grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>( - gpr_zalloc(sizeof(grpc_chttp2_transport))); - init_transport(t, channel_args, ep, is_client, resource_user); + auto t = new (gpr_malloc(sizeof(grpc_chttp2_transport))) + grpc_chttp2_transport(channel_args, ep, is_client, resource_user); return &t->base; } diff --git a/src/core/ext/transport/chttp2/transport/frame_data.cc b/src/core/ext/transport/chttp2/transport/frame_data.cc index 933b32c03c..1de00735cf 100644 --- a/src/core/ext/transport/chttp2/transport/frame_data.cc +++ b/src/core/ext/transport/chttp2/transport/frame_data.cc @@ -32,18 +32,12 @@ #include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/transport/transport.h" -grpc_error* grpc_chttp2_data_parser_init(grpc_chttp2_data_parser* parser) { - parser->state = GRPC_CHTTP2_DATA_FH_0; - parser->parsing_frame = nullptr; - return GRPC_ERROR_NONE; -} - -void grpc_chttp2_data_parser_destroy(grpc_chttp2_data_parser* parser) { - if (parser->parsing_frame != nullptr) { - GRPC_ERROR_UNREF(parser->parsing_frame->Finished( +grpc_chttp2_data_parser::~grpc_chttp2_data_parser() { + if (parsing_frame != nullptr) { + GRPC_ERROR_UNREF(parsing_frame->Finished( GRPC_ERROR_CREATE_FROM_STATIC_STRING("Parser destroyed"), false)); } - GRPC_ERROR_UNREF(parser->error); + GRPC_ERROR_UNREF(error); } grpc_error* grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser* parser, diff --git a/src/core/ext/transport/chttp2/transport/frame_data.h b/src/core/ext/transport/chttp2/transport/frame_data.h index e5d01f764e..2c5da99fa6 100644 --- a/src/core/ext/transport/chttp2/transport/frame_data.h +++ b/src/core/ext/transport/chttp2/transport/frame_data.h @@ -43,20 +43,18 @@ namespace grpc_core { class Chttp2IncomingByteStream; } // namespace grpc_core -typedef struct { - grpc_chttp2_stream_state state; - uint8_t frame_type; - uint32_t frame_size; - grpc_error* error; +struct grpc_chttp2_data_parser { + grpc_chttp2_data_parser() = default; + ~grpc_chttp2_data_parser(); - bool is_frame_compressed; - grpc_core::Chttp2IncomingByteStream* parsing_frame; -} grpc_chttp2_data_parser; + grpc_chttp2_stream_state state = GRPC_CHTTP2_DATA_FH_0; + uint8_t frame_type = 0; + uint32_t frame_size = 0; + grpc_error* error = GRPC_ERROR_NONE; -/* initialize per-stream state for data frame parsing */ -grpc_error* grpc_chttp2_data_parser_init(grpc_chttp2_data_parser* parser); - -void grpc_chttp2_data_parser_destroy(grpc_chttp2_data_parser* parser); + bool is_frame_compressed = false; + grpc_core::Chttp2IncomingByteStream* parsing_frame = nullptr; +}; /* start processing a new data frame */ grpc_error* grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser* parser, diff --git a/src/core/ext/transport/chttp2/transport/incoming_metadata.cc b/src/core/ext/transport/chttp2/transport/incoming_metadata.cc index 4d7dfd900f..dca15e7680 100644 --- a/src/core/ext/transport/chttp2/transport/incoming_metadata.cc +++ b/src/core/ext/transport/chttp2/transport/incoming_metadata.cc @@ -27,18 +27,6 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> -void grpc_chttp2_incoming_metadata_buffer_init( - grpc_chttp2_incoming_metadata_buffer* buffer, gpr_arena* arena) { - buffer->arena = arena; - grpc_metadata_batch_init(&buffer->batch); - buffer->batch.deadline = GRPC_MILLIS_INF_FUTURE; -} - -void grpc_chttp2_incoming_metadata_buffer_destroy( - grpc_chttp2_incoming_metadata_buffer* buffer) { - grpc_metadata_batch_destroy(&buffer->batch); -} - grpc_error* grpc_chttp2_incoming_metadata_buffer_add( grpc_chttp2_incoming_metadata_buffer* buffer, grpc_mdelem elem) { buffer->size += GRPC_MDELEM_LENGTH(elem); diff --git a/src/core/ext/transport/chttp2/transport/incoming_metadata.h b/src/core/ext/transport/chttp2/transport/incoming_metadata.h index d029cf00d4..c551b3cc8b 100644 --- a/src/core/ext/transport/chttp2/transport/incoming_metadata.h +++ b/src/core/ext/transport/chttp2/transport/incoming_metadata.h @@ -23,17 +23,20 @@ #include "src/core/lib/transport/transport.h" -typedef struct { +struct grpc_chttp2_incoming_metadata_buffer { + grpc_chttp2_incoming_metadata_buffer(gpr_arena* arena) : arena(arena) { + grpc_metadata_batch_init(&batch); + batch.deadline = GRPC_MILLIS_INF_FUTURE; + } + ~grpc_chttp2_incoming_metadata_buffer() { + grpc_metadata_batch_destroy(&batch); + } + gpr_arena* arena; grpc_metadata_batch batch; - size_t size; // total size of metadata -} grpc_chttp2_incoming_metadata_buffer; - -/** assumes everything initially zeroed */ -void grpc_chttp2_incoming_metadata_buffer_init( - grpc_chttp2_incoming_metadata_buffer* buffer, gpr_arena* arena); -void grpc_chttp2_incoming_metadata_buffer_destroy( - grpc_chttp2_incoming_metadata_buffer* buffer); + size_t size = 0; // total size of metadata +}; + void grpc_chttp2_incoming_metadata_buffer_publish( grpc_chttp2_incoming_metadata_buffer* buffer, grpc_metadata_batch* batch); diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 202017641b..3ee408c103 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -103,8 +103,8 @@ const char* grpc_chttp2_initiate_write_reason_string( grpc_chttp2_initiate_write_reason reason); typedef struct { - grpc_closure_list lists[GRPC_CHTTP2_PCL_COUNT]; - uint64_t inflight_id; + grpc_closure_list lists[GRPC_CHTTP2_PCL_COUNT] = {}; + uint64_t inflight_id = 0; } grpc_chttp2_ping_queue; typedef struct { @@ -280,6 +280,11 @@ typedef enum { } grpc_chttp2_keepalive_state; struct grpc_chttp2_transport { + grpc_chttp2_transport(const grpc_channel_args* channel_args, + grpc_endpoint* ep, bool is_client, + grpc_resource_user* resource_user); + ~grpc_chttp2_transport(); + grpc_transport base; /* must be first */ gpr_refcount refs; grpc_endpoint* ep; @@ -289,27 +294,27 @@ struct grpc_chttp2_transport { grpc_combiner* combiner; - grpc_closure* notify_on_receive_settings; + grpc_closure* notify_on_receive_settings = nullptr; /** write execution state of the transport */ - grpc_chttp2_write_state write_state; + grpc_chttp2_write_state write_state = GRPC_CHTTP2_WRITE_STATE_IDLE; /** is this the first write in a series of writes? set when we initiate writing from idle, cleared when we initiate writing from writing+more */ - bool is_first_write_in_batch; + bool is_first_write_in_batch = false; /** is the transport destroying itself? */ - uint8_t destroying; + uint8_t destroying = false; /** has the upper layer closed the transport? */ - grpc_error* closed_with_error; + grpc_error* closed_with_error = GRPC_ERROR_NONE; /** is there a read request to the endpoint outstanding? */ - uint8_t endpoint_reading; + uint8_t endpoint_reading = 1; - grpc_chttp2_optimization_target opt_target; + grpc_chttp2_optimization_target opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY; /** various lists of streams */ - grpc_chttp2_stream_list lists[STREAM_LIST_COUNT]; + grpc_chttp2_stream_list lists[STREAM_LIST_COUNT] = {}; /** maps stream id to grpc_chttp2_stream objects */ grpc_chttp2_stream_map stream_map; @@ -326,7 +331,7 @@ struct grpc_chttp2_transport { /** address to place a newly accepted stream - set and unset by grpc_chttp2_parsing_accept_stream; used by init_stream to publish the accepted server stream */ - grpc_chttp2_stream** accepting_stream; + grpc_chttp2_stream** accepting_stream = nullptr; struct { /* accept stream callback */ @@ -350,41 +355,43 @@ struct grpc_chttp2_transport { /** how much data are we willing to buffer when the WRITE_BUFFER_HINT is set? */ - uint32_t write_buffer_size; + uint32_t write_buffer_size = grpc_core::chttp2::kDefaultWindow; /** Set to a grpc_error object if a goaway frame is received. By default, set * to GRPC_ERROR_NONE */ - grpc_error* goaway_error; + grpc_error* goaway_error = GRPC_ERROR_NONE; - grpc_chttp2_sent_goaway_state sent_goaway_state; + grpc_chttp2_sent_goaway_state sent_goaway_state = GRPC_CHTTP2_NO_GOAWAY_SEND; /** are the local settings dirty and need to be sent? */ - bool dirtied_local_settings; + bool dirtied_local_settings = true; /** have local settings been sent? */ - bool sent_local_settings; - /** bitmask of setting indexes to send out */ - uint32_t force_send_settings; + bool sent_local_settings = false; + /** bitmask of setting indexes to send out + Hack: it's common for implementations to assume 65536 bytes initial send + window -- this should by rights be 0 */ + uint32_t force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE; /** settings values */ uint32_t settings[GRPC_NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS]; /** what is the next stream id to be allocated by this peer? copied to next_stream_id in parsing when parsing commences */ - uint32_t next_stream_id; + uint32_t next_stream_id = 0; /** last new stream id */ - uint32_t last_new_stream_id; + uint32_t last_new_stream_id = 0; /** ping queues for various ping insertion points */ - grpc_chttp2_ping_queue ping_queue; + grpc_chttp2_ping_queue ping_queue = grpc_chttp2_ping_queue(); grpc_chttp2_repeated_ping_policy ping_policy; grpc_chttp2_repeated_ping_state ping_state; - uint64_t ping_ctr; /* unique id for pings */ + uint64_t ping_ctr = 0; /* unique id for pings */ grpc_closure retry_initiate_ping_locked; /** ping acks */ - size_t ping_ack_count; - size_t ping_ack_capacity; - uint64_t* ping_acks; + size_t ping_ack_count = 0; + size_t ping_ack_capacity = 0; + uint64_t* ping_acks = nullptr; grpc_chttp2_server_ping_recv_state ping_recv_state; /** parser for headers */ @@ -410,22 +417,22 @@ struct grpc_chttp2_transport { int64_t initial_window_update = 0; /* deframing */ - grpc_chttp2_deframe_transport_state deframe_state; - uint8_t incoming_frame_type; - uint8_t incoming_frame_flags; - uint8_t header_eof; - bool is_first_frame; - uint32_t expect_continuation_stream_id; - uint32_t incoming_frame_size; - uint32_t incoming_stream_id; + grpc_chttp2_deframe_transport_state deframe_state = GRPC_DTS_CLIENT_PREFIX_0; + uint8_t incoming_frame_type = 0; + uint8_t incoming_frame_flags = 0; + uint8_t header_eof = 0; + bool is_first_frame = true; + uint32_t expect_continuation_stream_id = 0; + uint32_t incoming_frame_size = 0; + uint32_t incoming_stream_id = 0; /* active parser */ - void* parser_data; - grpc_chttp2_stream* incoming_stream; + void* parser_data = nullptr; + grpc_chttp2_stream* incoming_stream = nullptr; grpc_error* (*parser)(void* parser_user_data, grpc_chttp2_transport* t, grpc_chttp2_stream* s, grpc_slice slice, int is_last); - grpc_chttp2_write_cb* write_cb_pool; + grpc_chttp2_write_cb* write_cb_pool = nullptr; /* bdp estimator */ grpc_closure next_bdp_ping_timer_expired_locked; @@ -434,23 +441,23 @@ struct grpc_chttp2_transport { /* if non-NULL, close the transport with this error when writes are finished */ - grpc_error* close_transport_on_writes_finished; + grpc_error* close_transport_on_writes_finished = GRPC_ERROR_NONE; /* a list of closures to run after writes are finished */ - grpc_closure_list run_after_write; + grpc_closure_list run_after_write = GRPC_CLOSURE_LIST_INIT; /* buffer pool state */ /** have we scheduled a benign cleanup? */ - bool benign_reclaimer_registered; + bool benign_reclaimer_registered = false; /** have we scheduled a destructive cleanup? */ - bool destructive_reclaimer_registered; + bool destructive_reclaimer_registered = false; /** benign cleanup closure */ grpc_closure benign_reclaimer_locked; /** destructive cleanup closure */ grpc_closure destructive_reclaimer_locked; /* next bdp ping timer */ - bool have_next_bdp_ping_timer; + bool have_next_bdp_ping_timer = false; grpc_timer next_bdp_ping_timer; /* keep-alive ping support */ @@ -471,12 +478,12 @@ struct grpc_chttp2_transport { /** grace period for a ping to complete before watchdog kicks in */ grpc_millis keepalive_timeout; /** if keepalive pings are allowed when there's no outstanding streams */ - bool keepalive_permit_without_calls; + bool keepalive_permit_without_calls = false; /** keep-alive state machine state */ grpc_chttp2_keepalive_state keepalive_state; grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode> channelz_socket; - uint32_t num_messages_in_next_write; + uint32_t num_messages_in_next_write = 0; }; typedef enum { @@ -487,6 +494,10 @@ typedef enum { } grpc_published_metadata_method; struct grpc_chttp2_stream { + grpc_chttp2_stream(grpc_chttp2_transport* t, grpc_stream_refcount* refcount, + const void* server_data, gpr_arena* arena); + ~grpc_chttp2_stream(); + grpc_chttp2_transport* t; grpc_stream_refcount* refcount; @@ -494,63 +505,63 @@ struct grpc_chttp2_stream { grpc_closure* destroy_stream_arg; grpc_chttp2_stream_link links[STREAM_LIST_COUNT]; - uint8_t included[STREAM_LIST_COUNT]; + uint8_t included[STREAM_LIST_COUNT] = {}; /** HTTP2 stream id for this stream, or zero if one has not been assigned */ - uint32_t id; + uint32_t id = 0; /** things the upper layers would like to send */ - grpc_metadata_batch* send_initial_metadata; - grpc_closure* send_initial_metadata_finished; - grpc_metadata_batch* send_trailing_metadata; - grpc_closure* send_trailing_metadata_finished; + grpc_metadata_batch* send_initial_metadata = nullptr; + grpc_closure* send_initial_metadata_finished = nullptr; + grpc_metadata_batch* send_trailing_metadata = nullptr; + grpc_closure* send_trailing_metadata_finished = nullptr; grpc_core::OrphanablePtr<grpc_core::ByteStream> fetching_send_message; - uint32_t fetched_send_message_length; - grpc_slice fetching_slice; + uint32_t fetched_send_message_length = 0; + grpc_slice fetching_slice = grpc_empty_slice(); int64_t next_message_end_offset; - int64_t flow_controlled_bytes_written; - int64_t flow_controlled_bytes_flowed; + int64_t flow_controlled_bytes_written = 0; + int64_t flow_controlled_bytes_flowed = 0; grpc_closure complete_fetch_locked; - grpc_closure* fetching_send_message_finished; + grpc_closure* fetching_send_message_finished = nullptr; grpc_metadata_batch* recv_initial_metadata; - grpc_closure* recv_initial_metadata_ready; - bool* trailing_metadata_available; + grpc_closure* recv_initial_metadata_ready = nullptr; + bool* trailing_metadata_available = nullptr; grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message; - grpc_closure* recv_message_ready; + grpc_closure* recv_message_ready = nullptr; grpc_metadata_batch* recv_trailing_metadata; - grpc_closure* recv_trailing_metadata_finished; + grpc_closure* recv_trailing_metadata_finished = nullptr; - grpc_transport_stream_stats* collecting_stats; - grpc_transport_stream_stats stats; + grpc_transport_stream_stats* collecting_stats = nullptr; + grpc_transport_stream_stats stats = grpc_transport_stream_stats(); /** Is this stream closed for writing. */ - bool write_closed; + bool write_closed = false; /** Is this stream reading half-closed. */ - bool read_closed; + bool read_closed = false; /** Are all published incoming byte streams closed. */ - bool all_incoming_byte_streams_finished; + bool all_incoming_byte_streams_finished = false; /** Has this stream seen an error. If true, then pending incoming frames can be thrown away. */ - bool seen_error; + bool seen_error = false; /** Are we buffering writes on this stream? If yes, we won't become writable until there's enough queued up in the flow_controlled_buffer */ - bool write_buffering; + bool write_buffering = false; /** Has trailing metadata been received. */ - bool received_trailing_metadata; + bool received_trailing_metadata = false; /* have we sent or received the EOS bit? */ - bool eos_received; - bool eos_sent; + bool eos_received = false; + bool eos_sent = false; /** the error that resulted in this stream being read-closed */ - grpc_error* read_closed_error; + grpc_error* read_closed_error = GRPC_ERROR_NONE; /** the error that resulted in this stream being write-closed */ - grpc_error* write_closed_error; + grpc_error* write_closed_error = GRPC_ERROR_NONE; - grpc_published_metadata_method published_metadata[2]; - bool final_metadata_requested; + grpc_published_metadata_method published_metadata[2] = {}; + bool final_metadata_requested = false; grpc_chttp2_incoming_metadata_buffer metadata_buffer[2]; @@ -560,33 +571,33 @@ struct grpc_chttp2_stream { * Accessed only by application thread when stream->pending_byte_stream == * true */ grpc_slice_buffer unprocessed_incoming_frames_buffer; - grpc_closure* on_next; /* protected by t combiner */ - bool pending_byte_stream; /* protected by t combiner */ + grpc_closure* on_next = nullptr; /* protected by t combiner */ + bool pending_byte_stream = false; /* protected by t combiner */ // cached length of buffer to be used by the transport thread in cases where // stream->pending_byte_stream == true. The value is saved before // application threads are allowed to modify // unprocessed_incoming_frames_buffer - size_t unprocessed_incoming_frames_buffer_cached_length; + size_t unprocessed_incoming_frames_buffer_cached_length = 0; grpc_closure reset_byte_stream; - grpc_error* byte_stream_error; /* protected by t combiner */ - bool received_last_frame; /* protected by t combiner */ + grpc_error* byte_stream_error = GRPC_ERROR_NONE; /* protected by t combiner */ + bool received_last_frame = false; /* protected by t combiner */ - grpc_millis deadline; + grpc_millis deadline = GRPC_MILLIS_INF_FUTURE; /** saw some stream level error */ - grpc_error* forced_close_error; + grpc_error* forced_close_error = GRPC_ERROR_NONE; /** how many header frames have we received? */ - uint8_t header_frames_received; + uint8_t header_frames_received = 0; /** parsing state for data frames */ /* Accessed only by transport thread when stream->pending_byte_stream == false * Accessed only by application thread when stream->pending_byte_stream == * true */ grpc_chttp2_data_parser data_parser; /** number of bytes received - reset at end of parse thread execution */ - int64_t received_bytes; + int64_t received_bytes = 0; - bool sent_initial_metadata; - bool sent_trailing_metadata; + bool sent_initial_metadata = false; + bool sent_trailing_metadata = false; grpc_core::PolymorphicManualConstructor< grpc_core::chttp2::StreamFlowControlBase, @@ -596,32 +607,34 @@ struct grpc_chttp2_stream { grpc_slice_buffer flow_controlled_buffer; - grpc_chttp2_write_cb* on_flow_controlled_cbs; - grpc_chttp2_write_cb* on_write_finished_cbs; - grpc_chttp2_write_cb* finish_after_write; - size_t sending_bytes; + grpc_chttp2_write_cb* on_flow_controlled_cbs = nullptr; + grpc_chttp2_write_cb* on_write_finished_cbs = nullptr; + grpc_chttp2_write_cb* finish_after_write = nullptr; + size_t sending_bytes = 0; /* Stream compression method to be used. */ - grpc_stream_compression_method stream_compression_method; + grpc_stream_compression_method stream_compression_method = + GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS; /* Stream decompression method to be used. */ - grpc_stream_compression_method stream_decompression_method; + grpc_stream_compression_method stream_decompression_method = + GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS; /** Stream compression decompress context */ - grpc_stream_compression_context* stream_decompression_ctx; + grpc_stream_compression_context* stream_decompression_ctx = nullptr; /** Stream compression compress context */ - grpc_stream_compression_context* stream_compression_ctx; + grpc_stream_compression_context* stream_compression_ctx = nullptr; /** Buffer storing data that is compressed but not sent */ grpc_slice_buffer compressed_data_buffer; /** Amount of uncompressed bytes sent out when compressed_data_buffer is * emptied */ - size_t uncompressed_data_size; + size_t uncompressed_data_size = 0; /** Temporary buffer storing decompressed data */ grpc_slice_buffer decompressed_data_buffer; /** Whether bytes stored in unprocessed_incoming_byte_stream is decompressed */ - bool unprocessed_incoming_frames_decompressed; + bool unprocessed_incoming_frames_decompressed = false; /** gRPC header bytes that are already decompressed */ - size_t decompressed_header_bytes; + size_t decompressed_header_bytes = 0; }; /** Transport writing call flow: diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.cc b/src/core/ext/transport/cronet/transport/cronet_transport.cc index 81e2634e3a..349d8681d5 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.cc +++ b/src/core/ext/transport/cronet/transport/cronet_transport.cc @@ -111,16 +111,21 @@ typedef struct grpc_cronet_transport grpc_cronet_transport; /* TODO (makdharma): reorder structure for memory efficiency per http://www.catb.org/esr/structure-packing/#_structure_reordering: */ struct read_state { + read_state(gpr_arena* arena) + : trailing_metadata(arena), initial_metadata(arena) { + grpc_slice_buffer_init(&read_slice_buffer); + } + /* vars to store data coming from server */ - char* read_buffer; - bool length_field_received; - int received_bytes; - int remaining_bytes; - int length_field; - bool compressed; - char grpc_header_bytes[GRPC_HEADER_SIZE_IN_BYTES]; - char* payload_field; - bool read_stream_closed; + char* read_buffer = nullptr; + bool length_field_received = false; + int received_bytes = 0; + int remaining_bytes = 0; + int length_field = 0; + bool compressed = 0; + char grpc_header_bytes[GRPC_HEADER_SIZE_IN_BYTES] = {}; + char* payload_field = nullptr; + bool read_stream_closed = 0; /* vars for holding data destined for the application */ grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sbs; @@ -128,59 +133,71 @@ struct read_state { /* vars for trailing metadata */ grpc_chttp2_incoming_metadata_buffer trailing_metadata; - bool trailing_metadata_valid; + bool trailing_metadata_valid = false; /* vars for initial metadata */ grpc_chttp2_incoming_metadata_buffer initial_metadata; }; struct write_state { - char* write_buffer; + char* write_buffer = nullptr; }; /* track state of one stream op */ struct op_state { - bool state_op_done[OP_NUM_OPS]; - bool state_callback_received[OP_NUM_OPS]; + op_state(gpr_arena* arena) : rs(arena) {} + + bool state_op_done[OP_NUM_OPS] = {}; + bool state_callback_received[OP_NUM_OPS] = {}; /* A non-zero gRPC status code has been seen */ - bool fail_state; + bool fail_state = false; /* Transport is discarding all buffered messages */ - bool flush_read; - bool flush_cronet_when_ready; - bool pending_write_for_trailer; - bool pending_send_message; + bool flush_read = false; + bool flush_cronet_when_ready = false; + bool pending_write_for_trailer = false; + bool pending_send_message = false; /* User requested RECV_TRAILING_METADATA */ - bool pending_recv_trailing_metadata; + bool pending_recv_trailing_metadata = false; /* Cronet has not issued a callback of a bidirectional read */ - bool pending_read_from_cronet; - grpc_error* cancel_error; + bool pending_read_from_cronet = false; + grpc_error* cancel_error = GRPC_ERROR_NONE; /* data structure for storing data coming from server */ struct read_state rs; /* data structure for storing data going to the server */ struct write_state ws; }; +struct stream_obj; + struct op_and_state { + op_and_state(stream_obj* s, const grpc_transport_stream_op_batch& op); + grpc_transport_stream_op_batch op; struct op_state state; - bool done; - struct stream_obj* s; /* Pointer back to the stream object */ - struct op_and_state* next; /* next op_and_state in the linked list */ + bool done = false; + struct stream_obj* s; /* Pointer back to the stream object */ + /* next op_and_state in the linked list */ + struct op_and_state* next; }; struct op_storage { - int num_pending_ops; - struct op_and_state* head; + int num_pending_ops = 0; + struct op_and_state* head = nullptr; }; struct stream_obj { + stream_obj(grpc_transport* gt, grpc_stream* gs, + grpc_stream_refcount* refcount, gpr_arena* arena); + ~stream_obj(); + gpr_arena* arena; - struct op_and_state* oas; - grpc_transport_stream_op_batch* curr_op; + struct op_and_state* oas = nullptr; + grpc_transport_stream_op_batch* curr_op = nullptr; grpc_cronet_transport* curr_ct; grpc_stream* curr_gs; - bidirectional_stream* cbs; - bidirectional_stream_header_array header_array; + bidirectional_stream* cbs = nullptr; + bidirectional_stream_header_array header_array = + bidirectional_stream_header_array(); // Zero-initialize the structure. /* Stream level state. Some state will be tracked both at stream and stream_op * level */ @@ -195,7 +212,6 @@ struct stream_obj { /* Refcount object of the stream */ grpc_stream_refcount* refcount; }; -typedef struct stream_obj stream_obj; #ifndef NDEBUG #define GRPC_CRONET_STREAM_REF(stream, reason) \ @@ -306,6 +322,10 @@ static grpc_error* make_error_with_desc(int error_code, const char* desc) { return error; } +inline op_and_state::op_and_state(stream_obj* s, + const grpc_transport_stream_op_batch& op) + : op(op), state(s->arena), s(s), next(s->storage.head) {} + /* Add a new stream op to op storage. */ @@ -314,14 +334,8 @@ static void add_to_storage(struct stream_obj* s, struct op_storage* storage = &s->storage; /* add new op at the beginning of the linked list. The memory is freed in remove_from_storage */ - struct op_and_state* new_op = static_cast<struct op_and_state*>( - gpr_malloc(sizeof(struct op_and_state))); - memcpy(&new_op->op, op, sizeof(grpc_transport_stream_op_batch)); - memset(&new_op->state, 0, sizeof(new_op->state)); - new_op->s = s; - new_op->done = false; + op_and_state* new_op = grpc_core::New<op_and_state>(s, *op); gpr_mu_lock(&s->mu); - new_op->next = storage->head; storage->head = new_op; storage->num_pending_ops++; if (op->send_message) { @@ -347,7 +361,7 @@ static void remove_from_storage(struct stream_obj* s, } if (s->storage.head == oas) { s->storage.head = oas->next; - gpr_free(oas); + grpc_core::Delete(oas); s->storage.num_pending_ops--; CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas, s->storage.num_pending_ops); @@ -358,7 +372,7 @@ static void remove_from_storage(struct stream_obj* s, s->storage.num_pending_ops--; CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas, s->storage.num_pending_ops); - gpr_free(oas); + grpc_core::Delete(oas); break; } else if (GPR_UNLIKELY(curr->next == nullptr)) { CRONET_LOG(GPR_ERROR, "Reached end of LL and did not find op to free"); @@ -540,10 +554,6 @@ static void on_response_headers_received( } gpr_mu_lock(&s->mu); - memset(&s->state.rs.initial_metadata, 0, - sizeof(s->state.rs.initial_metadata)); - grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.initial_metadata, - s->arena); convert_cronet_array_to_metadata(headers, &s->state.rs.initial_metadata); s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true; if (!(s->state.state_op_done[OP_CANCEL_ERROR] || @@ -634,11 +644,7 @@ static void on_response_trailers_received( stream_obj* s = static_cast<stream_obj*>(stream->annotation); grpc_cronet_transport* t = s->curr_ct; gpr_mu_lock(&s->mu); - memset(&s->state.rs.trailing_metadata, 0, - sizeof(s->state.rs.trailing_metadata)); s->state.rs.trailing_metadata_valid = false; - grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.trailing_metadata, - s->arena); convert_cronet_array_to_metadata(trailers, &s->state.rs.trailing_metadata); if (trailers->count > 0) { s->state.rs.trailing_metadata_valid = true; @@ -1354,36 +1360,28 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) { Functions used by upper layers to access transport functionality. */ +inline stream_obj::stream_obj(grpc_transport* gt, grpc_stream* gs, + grpc_stream_refcount* refcount, gpr_arena* arena) + : arena(arena), + curr_ct(reinterpret_cast<grpc_cronet_transport*>(gt)), + curr_gs(gs), + state(arena), + refcount(refcount) { + GRPC_CRONET_STREAM_REF(this, "cronet transport"); + gpr_mu_init(&mu); +} + +inline stream_obj::~stream_obj() { + null_and_maybe_free_read_buffer(this); + /* Clean up read_slice_buffer in case there is unread data. */ + grpc_slice_buffer_destroy_internal(&state.rs.read_slice_buffer); + GRPC_ERROR_UNREF(state.cancel_error); +} + static int init_stream(grpc_transport* gt, grpc_stream* gs, grpc_stream_refcount* refcount, const void* server_data, gpr_arena* arena) { - stream_obj* s = reinterpret_cast<stream_obj*>(gs); - - s->refcount = refcount; - GRPC_CRONET_STREAM_REF(s, "cronet transport"); - memset(&s->storage, 0, sizeof(s->storage)); - s->storage.head = nullptr; - memset(&s->state, 0, sizeof(s->state)); - s->curr_op = nullptr; - s->cbs = nullptr; - memset(&s->header_array, 0, sizeof(s->header_array)); - memset(&s->state.rs, 0, sizeof(s->state.rs)); - memset(&s->state.ws, 0, sizeof(s->state.ws)); - memset(s->state.state_op_done, 0, sizeof(s->state.state_op_done)); - memset(s->state.state_callback_received, 0, - sizeof(s->state.state_callback_received)); - s->state.fail_state = s->state.flush_read = false; - s->state.cancel_error = nullptr; - s->state.flush_cronet_when_ready = s->state.pending_write_for_trailer = false; - s->state.pending_send_message = false; - s->state.pending_recv_trailing_metadata = false; - s->state.pending_read_from_cronet = false; - - s->curr_gs = gs; - s->curr_ct = reinterpret_cast<grpc_cronet_transport*>(gt); - s->arena = arena; - - gpr_mu_init(&s->mu); + new (gs) stream_obj(gt, gs, refcount, arena); return 0; } @@ -1426,10 +1424,7 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, static void destroy_stream(grpc_transport* gt, grpc_stream* gs, grpc_closure* then_schedule_closure) { stream_obj* s = reinterpret_cast<stream_obj*>(gs); - null_and_maybe_free_read_buffer(s); - /* Clean up read_slice_buffer in case there is unread data. */ - grpc_slice_buffer_destroy_internal(&s->state.rs.read_slice_buffer); - GRPC_ERROR_UNREF(s->state.cancel_error); + s->~stream_obj(); GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE); } diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc index 9dbd095843..1c4e2e79fe 100644 --- a/src/core/ext/transport/inproc/inproc_transport.cc +++ b/src/core/ext/transport/inproc/inproc_transport.cc @@ -40,18 +40,68 @@ if (grpc_inproc_trace.enabled()) gpr_log(__VA_ARGS__); \ } while (0) -static grpc_slice g_empty_slice; -static grpc_slice g_fake_path_key; -static grpc_slice g_fake_path_value; -static grpc_slice g_fake_auth_key; -static grpc_slice g_fake_auth_value; +namespace { +grpc_slice g_empty_slice; +grpc_slice g_fake_path_key; +grpc_slice g_fake_path_value; +grpc_slice g_fake_auth_key; +grpc_slice g_fake_auth_value; + +struct inproc_stream; +bool cancel_stream_locked(inproc_stream* s, grpc_error* error); +void op_state_machine(void* arg, grpc_error* error); +void log_metadata(const grpc_metadata_batch* md_batch, bool is_client, + bool is_initial); +grpc_error* fill_in_metadata(inproc_stream* s, + const grpc_metadata_batch* metadata, + uint32_t flags, grpc_metadata_batch* out_md, + uint32_t* outflags, bool* markfilled); + +struct shared_mu { + shared_mu() { + // Share one lock between both sides since both sides get affected + gpr_mu_init(&mu); + gpr_ref_init(&refs, 2); + } -typedef struct { gpr_mu mu; gpr_refcount refs; -} shared_mu; +}; + +struct inproc_transport { + inproc_transport(const grpc_transport_vtable* vtable, shared_mu* mu, + bool is_client) + : mu(mu), is_client(is_client) { + base.vtable = vtable; + // Start each side of transport with 2 refs since they each have a ref + // to the other + gpr_ref_init(&refs, 2); + grpc_connectivity_state_init(&connectivity, GRPC_CHANNEL_READY, + is_client ? "inproc_client" : "inproc_server"); + } + + ~inproc_transport() { + grpc_connectivity_state_destroy(&connectivity); + if (gpr_unref(&mu->refs)) { + gpr_free(mu); + } + } + + void ref() { + INPROC_LOG(GPR_INFO, "ref_transport %p", this); + gpr_ref(&refs); + } + + void unref() { + INPROC_LOG(GPR_INFO, "unref_transport %p", this); + if (!gpr_unref(&refs)) { + return; + } + INPROC_LOG(GPR_INFO, "really_destroy_transport %p", this); + this->~inproc_transport(); + gpr_free(this); + } -typedef struct inproc_transport { grpc_transport base; shared_mu* mu; gpr_refcount refs; @@ -60,128 +110,174 @@ typedef struct inproc_transport { void (*accept_stream_cb)(void* user_data, grpc_transport* transport, const void* server_data); void* accept_stream_data; - bool is_closed; + bool is_closed = false; struct inproc_transport* other_side; - struct inproc_stream* stream_list; -} inproc_transport; + struct inproc_stream* stream_list = nullptr; +}; + +struct inproc_stream { + inproc_stream(inproc_transport* t, grpc_stream_refcount* refcount, + const void* server_data, gpr_arena* arena) + : t(t), refs(refcount), arena(arena) { + // Ref this stream right now for ctor and list. + ref("inproc_init_stream:init"); + ref("inproc_init_stream:list"); + + grpc_metadata_batch_init(&to_read_initial_md); + grpc_metadata_batch_init(&to_read_trailing_md); + GRPC_CLOSURE_INIT(&op_closure, op_state_machine, this, + grpc_schedule_on_exec_ctx); + grpc_metadata_batch_init(&write_buffer_initial_md); + grpc_metadata_batch_init(&write_buffer_trailing_md); + + stream_list_prev = nullptr; + gpr_mu_lock(&t->mu->mu); + stream_list_next = t->stream_list; + if (t->stream_list) { + t->stream_list->stream_list_prev = this; + } + t->stream_list = this; + gpr_mu_unlock(&t->mu->mu); + + if (!server_data) { + t->ref(); + inproc_transport* st = t->other_side; + st->ref(); + other_side = nullptr; // will get filled in soon + // Pass the client-side stream address to the server-side for a ref + ref("inproc_init_stream:clt"); // ref it now on behalf of server + // side to avoid destruction + INPROC_LOG(GPR_INFO, "calling accept stream cb %p %p", + st->accept_stream_cb, st->accept_stream_data); + (*st->accept_stream_cb)(st->accept_stream_data, &st->base, (void*)this); + } else { + // This is the server-side and is being called through accept_stream_cb + inproc_stream* cs = (inproc_stream*)server_data; + other_side = cs; + // Ref the server-side stream on behalf of the client now + ref("inproc_init_stream:srv"); + + // Now we are about to affect the other side, so lock the transport + // to make sure that it doesn't get destroyed + gpr_mu_lock(&t->mu->mu); + cs->other_side = this; + // Now transfer from the other side's write_buffer if any to the to_read + // buffer + if (cs->write_buffer_initial_md_filled) { + fill_in_metadata(this, &cs->write_buffer_initial_md, + cs->write_buffer_initial_md_flags, &to_read_initial_md, + &to_read_initial_md_flags, &to_read_initial_md_filled); + deadline = GPR_MIN(deadline, cs->write_buffer_deadline); + grpc_metadata_batch_clear(&cs->write_buffer_initial_md); + cs->write_buffer_initial_md_filled = false; + } + if (cs->write_buffer_trailing_md_filled) { + fill_in_metadata(this, &cs->write_buffer_trailing_md, 0, + &to_read_trailing_md, nullptr, + &to_read_trailing_md_filled); + grpc_metadata_batch_clear(&cs->write_buffer_trailing_md); + cs->write_buffer_trailing_md_filled = false; + } + if (cs->write_buffer_cancel_error != GRPC_ERROR_NONE) { + cancel_other_error = cs->write_buffer_cancel_error; + cs->write_buffer_cancel_error = GRPC_ERROR_NONE; + } + + gpr_mu_unlock(&t->mu->mu); + } + } + + ~inproc_stream() { + GRPC_ERROR_UNREF(write_buffer_cancel_error); + GRPC_ERROR_UNREF(cancel_self_error); + GRPC_ERROR_UNREF(cancel_other_error); + + if (recv_inited) { + grpc_slice_buffer_destroy_internal(&recv_message); + } + + t->unref(); + + if (closure_at_destroy) { + GRPC_CLOSURE_SCHED(closure_at_destroy, GRPC_ERROR_NONE); + } + } + +#ifndef NDEBUG +#define STREAM_REF(refs, reason) grpc_stream_ref(refs, reason) +#define STREAM_UNREF(refs, reason) grpc_stream_unref(refs, reason) +#else +#define STREAM_REF(refs, reason) grpc_stream_ref(refs) +#define STREAM_UNREF(refs, reason) grpc_stream_unref(refs) +#endif + void ref(const char* reason) { + INPROC_LOG(GPR_INFO, "ref_stream %p %s", this, reason); + STREAM_REF(refs, reason); + } + + void unref(const char* reason) { + INPROC_LOG(GPR_INFO, "unref_stream %p %s", this, reason); + STREAM_UNREF(refs, reason); + } +#undef STREAM_REF +#undef STREAM_UNREF -typedef struct inproc_stream { inproc_transport* t; grpc_metadata_batch to_read_initial_md; - uint32_t to_read_initial_md_flags; - bool to_read_initial_md_filled; + uint32_t to_read_initial_md_flags = 0; + bool to_read_initial_md_filled = false; grpc_metadata_batch to_read_trailing_md; - bool to_read_trailing_md_filled; - bool ops_needed; - bool op_closure_scheduled; + bool to_read_trailing_md_filled = false; + bool ops_needed = false; + bool op_closure_scheduled = false; grpc_closure op_closure; // Write buffer used only during gap at init time when client-side // stream is set up but server side stream is not yet set up grpc_metadata_batch write_buffer_initial_md; - bool write_buffer_initial_md_filled; - uint32_t write_buffer_initial_md_flags; - grpc_millis write_buffer_deadline; + bool write_buffer_initial_md_filled = false; + uint32_t write_buffer_initial_md_flags = 0; + grpc_millis write_buffer_deadline = GRPC_MILLIS_INF_FUTURE; grpc_metadata_batch write_buffer_trailing_md; - bool write_buffer_trailing_md_filled; - grpc_error* write_buffer_cancel_error; + bool write_buffer_trailing_md_filled = false; + grpc_error* write_buffer_cancel_error = GRPC_ERROR_NONE; struct inproc_stream* other_side; - bool other_side_closed; // won't talk anymore - bool write_buffer_other_side_closed; // on hold + bool other_side_closed = false; // won't talk anymore + bool write_buffer_other_side_closed = false; // on hold grpc_stream_refcount* refs; - grpc_closure* closure_at_destroy; + grpc_closure* closure_at_destroy = nullptr; gpr_arena* arena; - grpc_transport_stream_op_batch* send_message_op; - grpc_transport_stream_op_batch* send_trailing_md_op; - grpc_transport_stream_op_batch* recv_initial_md_op; - grpc_transport_stream_op_batch* recv_message_op; - grpc_transport_stream_op_batch* recv_trailing_md_op; + grpc_transport_stream_op_batch* send_message_op = nullptr; + grpc_transport_stream_op_batch* send_trailing_md_op = nullptr; + grpc_transport_stream_op_batch* recv_initial_md_op = nullptr; + grpc_transport_stream_op_batch* recv_message_op = nullptr; + grpc_transport_stream_op_batch* recv_trailing_md_op = nullptr; grpc_slice_buffer recv_message; grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> recv_stream; - bool recv_inited; + bool recv_inited = false; - bool initial_md_sent; - bool trailing_md_sent; - bool initial_md_recvd; - bool trailing_md_recvd; + bool initial_md_sent = false; + bool trailing_md_sent = false; + bool initial_md_recvd = false; + bool trailing_md_recvd = false; - bool closed; + bool closed = false; - grpc_error* cancel_self_error; - grpc_error* cancel_other_error; + grpc_error* cancel_self_error = GRPC_ERROR_NONE; + grpc_error* cancel_other_error = GRPC_ERROR_NONE; - grpc_millis deadline; + grpc_millis deadline = GRPC_MILLIS_INF_FUTURE; - bool listed; + bool listed = true; struct inproc_stream* stream_list_prev; struct inproc_stream* stream_list_next; -} inproc_stream; - -static bool cancel_stream_locked(inproc_stream* s, grpc_error* error); -static void op_state_machine(void* arg, grpc_error* error); - -static void ref_transport(inproc_transport* t) { - INPROC_LOG(GPR_INFO, "ref_transport %p", t); - gpr_ref(&t->refs); -} - -static void really_destroy_transport(inproc_transport* t) { - INPROC_LOG(GPR_INFO, "really_destroy_transport %p", t); - grpc_connectivity_state_destroy(&t->connectivity); - if (gpr_unref(&t->mu->refs)) { - gpr_free(t->mu); - } - gpr_free(t); -} +}; -static void unref_transport(inproc_transport* t) { - INPROC_LOG(GPR_INFO, "unref_transport %p", t); - if (gpr_unref(&t->refs)) { - really_destroy_transport(t); - } -} - -#ifndef NDEBUG -#define STREAM_REF(refs, reason) grpc_stream_ref(refs, reason) -#define STREAM_UNREF(refs, reason) grpc_stream_unref(refs, reason) -#else -#define STREAM_REF(refs, reason) grpc_stream_ref(refs) -#define STREAM_UNREF(refs, reason) grpc_stream_unref(refs) -#endif - -static void ref_stream(inproc_stream* s, const char* reason) { - INPROC_LOG(GPR_INFO, "ref_stream %p %s", s, reason); - STREAM_REF(s->refs, reason); -} - -static void unref_stream(inproc_stream* s, const char* reason) { - INPROC_LOG(GPR_INFO, "unref_stream %p %s", s, reason); - STREAM_UNREF(s->refs, reason); -} - -static void really_destroy_stream(inproc_stream* s) { - INPROC_LOG(GPR_INFO, "really_destroy_stream %p", s); - - GRPC_ERROR_UNREF(s->write_buffer_cancel_error); - GRPC_ERROR_UNREF(s->cancel_self_error); - GRPC_ERROR_UNREF(s->cancel_other_error); - - if (s->recv_inited) { - grpc_slice_buffer_destroy_internal(&s->recv_message); - } - - unref_transport(s->t); - - if (s->closure_at_destroy) { - GRPC_CLOSURE_SCHED(s->closure_at_destroy, GRPC_ERROR_NONE); - } -} - -static void log_metadata(const grpc_metadata_batch* md_batch, bool is_client, - bool is_initial) { +void log_metadata(const grpc_metadata_batch* md_batch, bool is_client, + bool is_initial) { for (grpc_linked_mdelem* md = md_batch->list.head; md != nullptr; md = md->next) { char* key = grpc_slice_to_c_string(GRPC_MDKEY(md->md)); @@ -193,10 +289,10 @@ static void log_metadata(const grpc_metadata_batch* md_batch, bool is_client, } } -static grpc_error* fill_in_metadata(inproc_stream* s, - const grpc_metadata_batch* metadata, - uint32_t flags, grpc_metadata_batch* out_md, - uint32_t* outflags, bool* markfilled) { +grpc_error* fill_in_metadata(inproc_stream* s, + const grpc_metadata_batch* metadata, + uint32_t flags, grpc_metadata_batch* out_md, + uint32_t* outflags, bool* markfilled) { if (grpc_inproc_trace.enabled()) { log_metadata(metadata, s->t->is_client, outflags != nullptr); } @@ -221,109 +317,16 @@ static grpc_error* fill_in_metadata(inproc_stream* s, return error; } -static int init_stream(grpc_transport* gt, grpc_stream* gs, - grpc_stream_refcount* refcount, const void* server_data, - gpr_arena* arena) { +int init_stream(grpc_transport* gt, grpc_stream* gs, + grpc_stream_refcount* refcount, const void* server_data, + gpr_arena* arena) { INPROC_LOG(GPR_INFO, "init_stream %p %p %p", gt, gs, server_data); inproc_transport* t = reinterpret_cast<inproc_transport*>(gt); - inproc_stream* s = reinterpret_cast<inproc_stream*>(gs); - s->arena = arena; - - s->refs = refcount; - // Ref this stream right now - ref_stream(s, "inproc_init_stream:init"); - - grpc_metadata_batch_init(&s->to_read_initial_md); - s->to_read_initial_md_flags = 0; - s->to_read_initial_md_filled = false; - grpc_metadata_batch_init(&s->to_read_trailing_md); - s->to_read_trailing_md_filled = false; - grpc_metadata_batch_init(&s->write_buffer_initial_md); - s->write_buffer_initial_md_flags = 0; - s->write_buffer_initial_md_filled = false; - grpc_metadata_batch_init(&s->write_buffer_trailing_md); - s->write_buffer_trailing_md_filled = false; - s->ops_needed = false; - s->op_closure_scheduled = false; - GRPC_CLOSURE_INIT(&s->op_closure, op_state_machine, s, - grpc_schedule_on_exec_ctx); - s->t = t; - s->closure_at_destroy = nullptr; - s->other_side_closed = false; - - s->initial_md_sent = s->trailing_md_sent = s->initial_md_recvd = - s->trailing_md_recvd = false; - - s->closed = false; - - s->cancel_self_error = GRPC_ERROR_NONE; - s->cancel_other_error = GRPC_ERROR_NONE; - s->write_buffer_cancel_error = GRPC_ERROR_NONE; - s->deadline = GRPC_MILLIS_INF_FUTURE; - s->write_buffer_deadline = GRPC_MILLIS_INF_FUTURE; - - s->stream_list_prev = nullptr; - gpr_mu_lock(&t->mu->mu); - s->listed = true; - ref_stream(s, "inproc_init_stream:list"); - s->stream_list_next = t->stream_list; - if (t->stream_list) { - t->stream_list->stream_list_prev = s; - } - t->stream_list = s; - gpr_mu_unlock(&t->mu->mu); - - if (!server_data) { - ref_transport(t); - inproc_transport* st = t->other_side; - ref_transport(st); - s->other_side = nullptr; // will get filled in soon - // Pass the client-side stream address to the server-side for a ref - ref_stream(s, "inproc_init_stream:clt"); // ref it now on behalf of server - // side to avoid destruction - INPROC_LOG(GPR_INFO, "calling accept stream cb %p %p", st->accept_stream_cb, - st->accept_stream_data); - (*st->accept_stream_cb)(st->accept_stream_data, &st->base, (void*)s); - } else { - // This is the server-side and is being called through accept_stream_cb - inproc_stream* cs = (inproc_stream*)server_data; - s->other_side = cs; - // Ref the server-side stream on behalf of the client now - ref_stream(s, "inproc_init_stream:srv"); - - // Now we are about to affect the other side, so lock the transport - // to make sure that it doesn't get destroyed - gpr_mu_lock(&s->t->mu->mu); - cs->other_side = s; - // Now transfer from the other side's write_buffer if any to the to_read - // buffer - if (cs->write_buffer_initial_md_filled) { - fill_in_metadata(s, &cs->write_buffer_initial_md, - cs->write_buffer_initial_md_flags, - &s->to_read_initial_md, &s->to_read_initial_md_flags, - &s->to_read_initial_md_filled); - s->deadline = GPR_MIN(s->deadline, cs->write_buffer_deadline); - grpc_metadata_batch_clear(&cs->write_buffer_initial_md); - cs->write_buffer_initial_md_filled = false; - } - if (cs->write_buffer_trailing_md_filled) { - fill_in_metadata(s, &cs->write_buffer_trailing_md, 0, - &s->to_read_trailing_md, nullptr, - &s->to_read_trailing_md_filled); - grpc_metadata_batch_clear(&cs->write_buffer_trailing_md); - cs->write_buffer_trailing_md_filled = false; - } - if (cs->write_buffer_cancel_error != GRPC_ERROR_NONE) { - s->cancel_other_error = cs->write_buffer_cancel_error; - cs->write_buffer_cancel_error = GRPC_ERROR_NONE; - } - - gpr_mu_unlock(&s->t->mu->mu); - } + new (gs) inproc_stream(t, refcount, server_data, arena); return 0; // return value is not important } -static void close_stream_locked(inproc_stream* s) { +void close_stream_locked(inproc_stream* s) { if (!s->closed) { // Release the metadata that we would have written out grpc_metadata_batch_destroy(&s->write_buffer_initial_md); @@ -341,21 +344,21 @@ static void close_stream_locked(inproc_stream* s) { n->stream_list_prev = p; } s->listed = false; - unref_stream(s, "close_stream:list"); + s->unref("close_stream:list"); } s->closed = true; - unref_stream(s, "close_stream:closing"); + s->unref("close_stream:closing"); } } // This function means that we are done talking/listening to the other side -static void close_other_side_locked(inproc_stream* s, const char* reason) { +void close_other_side_locked(inproc_stream* s, const char* reason) { if (s->other_side != nullptr) { // First release the metadata that came from the other side's arena grpc_metadata_batch_destroy(&s->to_read_initial_md); grpc_metadata_batch_destroy(&s->to_read_trailing_md); - unref_stream(s->other_side, reason); + s->other_side->unref(reason); s->other_side_closed = true; s->other_side = nullptr; } else if (!s->other_side_closed) { @@ -367,9 +370,9 @@ static void close_other_side_locked(inproc_stream* s, const char* reason) { // this stream_op_batch is only one of the pending operations for this // stream. This is called when one of the pending operations for the stream // is done and about to be NULLed out -static void complete_if_batch_end_locked(inproc_stream* s, grpc_error* error, - grpc_transport_stream_op_batch* op, - const char* msg) { +void complete_if_batch_end_locked(inproc_stream* s, grpc_error* error, + grpc_transport_stream_op_batch* op, + const char* msg) { int is_sm = static_cast<int>(op == s->send_message_op); int is_stm = static_cast<int>(op == s->send_trailing_md_op); // TODO(vjpai): We should not consider the recv ops here, since they @@ -386,8 +389,7 @@ static void complete_if_batch_end_locked(inproc_stream* s, grpc_error* error, } } -static void maybe_schedule_op_closure_locked(inproc_stream* s, - grpc_error* error) { +void maybe_schedule_op_closure_locked(inproc_stream* s, grpc_error* error) { if (s && s->ops_needed && !s->op_closure_scheduled) { GRPC_CLOSURE_SCHED(&s->op_closure, GRPC_ERROR_REF(error)); s->op_closure_scheduled = true; @@ -395,7 +397,7 @@ static void maybe_schedule_op_closure_locked(inproc_stream* s, } } -static void fail_helper_locked(inproc_stream* s, grpc_error* error) { +void fail_helper_locked(inproc_stream* s, grpc_error* error) { INPROC_LOG(GPR_INFO, "op_state_machine %p fail_helper", s); // If we're failing this side, we need to make sure that // we also send or have already sent trailing metadata @@ -525,8 +527,7 @@ static void fail_helper_locked(inproc_stream* s, grpc_error* error) { // that the incoming byte stream's next() call will always return // synchronously. That assumption is true today but may not always be // true in the future. -static void message_transfer_locked(inproc_stream* sender, - inproc_stream* receiver) { +void message_transfer_locked(inproc_stream* sender, inproc_stream* receiver) { size_t remaining = sender->send_message_op->payload->send_message.send_message->length(); if (receiver->recv_inited) { @@ -572,7 +573,7 @@ static void message_transfer_locked(inproc_stream* sender, sender->send_message_op = nullptr; } -static void op_state_machine(void* arg, grpc_error* error) { +void op_state_machine(void* arg, grpc_error* error) { // This function gets called when we have contents in the unprocessed reads // Get what we want based on our ops wanted // Schedule our appropriate closures @@ -847,7 +848,7 @@ done: GRPC_ERROR_UNREF(new_err); } -static bool cancel_stream_locked(inproc_stream* s, grpc_error* error) { +bool cancel_stream_locked(inproc_stream* s, grpc_error* error) { bool ret = false; // was the cancel accepted INPROC_LOG(GPR_INFO, "cancel_stream %p with %s", s, grpc_error_string(error)); if (s->cancel_self_error == GRPC_ERROR_NONE) { @@ -900,10 +901,10 @@ static bool cancel_stream_locked(inproc_stream* s, grpc_error* error) { return ret; } -static void do_nothing(void* arg, grpc_error* error) {} +void do_nothing(void* arg, grpc_error* error) {} -static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, - grpc_transport_stream_op_batch* op) { +void perform_stream_op(grpc_transport* gt, grpc_stream* gs, + grpc_transport_stream_op_batch* op) { INPROC_LOG(GPR_INFO, "perform_stream_op %p %p %p", gt, gs, op); inproc_stream* s = reinterpret_cast<inproc_stream*>(gs); gpr_mu* mu = &s->t->mu->mu; // save aside in case s gets closed @@ -1083,7 +1084,7 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, GRPC_ERROR_UNREF(error); } -static void close_transport_locked(inproc_transport* t) { +void close_transport_locked(inproc_transport* t) { INPROC_LOG(GPR_INFO, "close_transport %p %d", t, t->is_closed); grpc_connectivity_state_set( &t->connectivity, GRPC_CHANNEL_SHUTDOWN, @@ -1103,7 +1104,7 @@ static void close_transport_locked(inproc_transport* t) { } } -static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) { +void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) { inproc_transport* t = reinterpret_cast<inproc_transport*>(gt); INPROC_LOG(GPR_INFO, "perform_transport_op %p %p", t, op); gpr_mu_lock(&t->mu->mu); @@ -1136,39 +1137,64 @@ static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) { gpr_mu_unlock(&t->mu->mu); } -static void destroy_stream(grpc_transport* gt, grpc_stream* gs, - grpc_closure* then_schedule_closure) { +void destroy_stream(grpc_transport* gt, grpc_stream* gs, + grpc_closure* then_schedule_closure) { INPROC_LOG(GPR_INFO, "destroy_stream %p %p", gs, then_schedule_closure); inproc_stream* s = reinterpret_cast<inproc_stream*>(gs); s->closure_at_destroy = then_schedule_closure; - really_destroy_stream(s); + s->~inproc_stream(); } -static void destroy_transport(grpc_transport* gt) { +void destroy_transport(grpc_transport* gt) { inproc_transport* t = reinterpret_cast<inproc_transport*>(gt); INPROC_LOG(GPR_INFO, "destroy_transport %p", t); gpr_mu_lock(&t->mu->mu); close_transport_locked(t); gpr_mu_unlock(&t->mu->mu); - unref_transport(t->other_side); - unref_transport(t); + t->other_side->unref(); + t->unref(); } /******************************************************************************* * INTEGRATION GLUE */ -static void set_pollset(grpc_transport* gt, grpc_stream* gs, - grpc_pollset* pollset) { +void set_pollset(grpc_transport* gt, grpc_stream* gs, grpc_pollset* pollset) { // Nothing to do here } -static void set_pollset_set(grpc_transport* gt, grpc_stream* gs, - grpc_pollset_set* pollset_set) { +void set_pollset_set(grpc_transport* gt, grpc_stream* gs, + grpc_pollset_set* pollset_set) { // Nothing to do here } -static grpc_endpoint* get_endpoint(grpc_transport* t) { return nullptr; } +grpc_endpoint* get_endpoint(grpc_transport* t) { return nullptr; } + +const grpc_transport_vtable inproc_vtable = { + sizeof(inproc_stream), "inproc", init_stream, + set_pollset, set_pollset_set, perform_stream_op, + perform_transport_op, destroy_stream, destroy_transport, + get_endpoint}; + +/******************************************************************************* + * Main inproc transport functions + */ +void inproc_transports_create(grpc_transport** server_transport, + const grpc_channel_args* server_args, + grpc_transport** client_transport, + const grpc_channel_args* client_args) { + INPROC_LOG(GPR_INFO, "inproc_transports_create"); + shared_mu* mu = new (gpr_malloc(sizeof(*mu))) shared_mu(); + inproc_transport* st = new (gpr_malloc(sizeof(*st))) + inproc_transport(&inproc_vtable, mu, /*is_client=*/false); + inproc_transport* ct = new (gpr_malloc(sizeof(*ct))) + inproc_transport(&inproc_vtable, mu, /*is_client=*/true); + st->other_side = ct; + ct->other_side = st; + *server_transport = reinterpret_cast<grpc_transport*>(st); + *client_transport = reinterpret_cast<grpc_transport*>(ct); +} +} // namespace /******************************************************************************* * GLOBAL INIT AND DESTROY @@ -1190,48 +1216,6 @@ void grpc_inproc_transport_init(void) { g_fake_auth_value = grpc_slice_from_static_string("inproc-fail"); } -static const grpc_transport_vtable inproc_vtable = { - sizeof(inproc_stream), "inproc", init_stream, - set_pollset, set_pollset_set, perform_stream_op, - perform_transport_op, destroy_stream, destroy_transport, - get_endpoint}; - -/******************************************************************************* - * Main inproc transport functions - */ -static void inproc_transports_create(grpc_transport** server_transport, - const grpc_channel_args* server_args, - grpc_transport** client_transport, - const grpc_channel_args* client_args) { - INPROC_LOG(GPR_INFO, "inproc_transports_create"); - inproc_transport* st = - static_cast<inproc_transport*>(gpr_zalloc(sizeof(*st))); - inproc_transport* ct = - static_cast<inproc_transport*>(gpr_zalloc(sizeof(*ct))); - // Share one lock between both sides since both sides get affected - st->mu = ct->mu = static_cast<shared_mu*>(gpr_malloc(sizeof(*st->mu))); - gpr_mu_init(&st->mu->mu); - gpr_ref_init(&st->mu->refs, 2); - st->base.vtable = &inproc_vtable; - ct->base.vtable = &inproc_vtable; - // Start each side of transport with 2 refs since they each have a ref - // to the other - gpr_ref_init(&st->refs, 2); - gpr_ref_init(&ct->refs, 2); - st->is_client = false; - ct->is_client = true; - grpc_connectivity_state_init(&st->connectivity, GRPC_CHANNEL_READY, - "inproc_server"); - grpc_connectivity_state_init(&ct->connectivity, GRPC_CHANNEL_READY, - "inproc_client"); - st->other_side = ct; - ct->other_side = st; - st->stream_list = nullptr; - ct->stream_list = nullptr; - *server_transport = reinterpret_cast<grpc_transport*>(st); - *client_transport = reinterpret_cast<grpc_transport*>(ct); -} - grpc_channel* grpc_inproc_channel_create(grpc_server* server, grpc_channel_args* args, void* reserved) { |