diff options
Diffstat (limited to 'src/core/ext/transport/chttp2/transport/chttp2_transport.cc')
-rw-r--r-- | src/core/ext/transport/chttp2/transport/chttp2_transport.cc | 293 |
1 files changed, 124 insertions, 169 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 56aaada912..a4d616d778 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -39,6 +39,7 @@ #include "src/core/lib/debug/stats.h" #include "src/core/lib/gpr/env.h" #include "src/core/lib/gpr/string.h" +#include "src/core/lib/gprpp/memory.h" #include "src/core/lib/http/parser.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/timer.h" @@ -117,12 +118,6 @@ static void connectivity_state_set(grpc_chttp2_transport* t, grpc_connectivity_state state, grpc_error* error, const char* reason); -static void incoming_byte_stream_destroy_locked(void* byte_stream, - grpc_error* error_ignored); -static void incoming_byte_stream_publish_error( - grpc_chttp2_incoming_byte_stream* bs, grpc_error* error); -static void incoming_byte_stream_unref(grpc_chttp2_incoming_byte_stream* bs); - static void benign_reclaimer_locked(void* t, grpc_error* error); static void destructive_reclaimer_locked(void* t, grpc_error* error); @@ -662,8 +657,8 @@ static int init_stream(grpc_transport* gt, grpc_stream* gs, s->t = t; s->refcount = refcount; /* We reserve one 'active stream' that's dropped when the stream is - read-closed. The others are for incoming_byte_streams that are actively - reading */ + read-closed. The others are for Chttp2IncomingByteStreams that are + actively reading */ GRPC_CHTTP2_STREAM_REF(s, "chttp2"); grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0], arena); @@ -1256,8 +1251,7 @@ static void continue_fetching_send_locked(grpc_chttp2_transport* t, abort(); /* TODO(ctiller): what cleanup here? */ return; /* early out */ } - if (s->fetched_send_message_length == s->fetching_send_message->length) { - grpc_byte_stream_destroy(s->fetching_send_message); + if (s->fetched_send_message_length == s->fetching_send_message->length()) { int64_t notify_offset = s->next_message_end_offset; if (notify_offset <= s->flow_controlled_bytes_written) { grpc_chttp2_complete_closure_step( @@ -1274,20 +1268,19 @@ static void continue_fetching_send_locked(grpc_chttp2_transport* t, cb->closure = s->fetching_send_message_finished; s->fetching_send_message_finished = nullptr; grpc_chttp2_write_cb** list = - s->fetching_send_message->flags & GRPC_WRITE_THROUGH + s->fetching_send_message->flags() & GRPC_WRITE_THROUGH ? &s->on_write_finished_cbs : &s->on_flow_controlled_cbs; cb->next = *list; *list = cb; } - s->fetching_send_message = nullptr; + s->fetching_send_message.reset(); return; /* early out */ - } else if (grpc_byte_stream_next(s->fetching_send_message, UINT32_MAX, - &s->complete_fetch_locked)) { - grpc_error* error = - grpc_byte_stream_pull(s->fetching_send_message, &s->fetching_slice); + } else if (s->fetching_send_message->Next(UINT32_MAX, + &s->complete_fetch_locked)) { + grpc_error* error = s->fetching_send_message->Pull(&s->fetching_slice); if (error != GRPC_ERROR_NONE) { - grpc_byte_stream_destroy(s->fetching_send_message); + s->fetching_send_message.reset(); grpc_chttp2_cancel_stream(t, s, error); } else { add_fetched_slice_locked(t, s); @@ -1300,14 +1293,14 @@ static void complete_fetch_locked(void* gs, grpc_error* error) { grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(gs); grpc_chttp2_transport* t = s->t; if (error == GRPC_ERROR_NONE) { - error = grpc_byte_stream_pull(s->fetching_send_message, &s->fetching_slice); + error = s->fetching_send_message->Pull(&s->fetching_slice); if (error == GRPC_ERROR_NONE) { add_fetched_slice_locked(t, s); continue_fetching_send_locked(t, s); } } if (error != GRPC_ERROR_NONE) { - grpc_byte_stream_destroy(s->fetching_send_message); + s->fetching_send_message.reset(); grpc_chttp2_cancel_stream(t, s, error); } } @@ -1439,7 +1432,7 @@ static void perform_stream_op_locked(void* stream_op, GPR_ASSERT(s->id != 0); grpc_chttp2_mark_stream_writable(t, s); if (!(op->send_message && - (op->payload->send_message.send_message->flags & + (op->payload->send_message.send_message->flags() & GRPC_WRITE_BUFFER_HINT))) { grpc_chttp2_initiate_write( t, GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA); @@ -1466,7 +1459,7 @@ static void perform_stream_op_locked(void* stream_op, if (op->send_message) { GRPC_STATS_INC_HTTP2_OP_SEND_MESSAGE(); GRPC_STATS_INC_HTTP2_SEND_MESSAGE_SIZE( - op->payload->send_message.send_message->length); + op->payload->send_message.send_message->length()); on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE; s->fetching_send_message_finished = add_closure_barrier(op->on_complete); if (s->write_closed) { @@ -1475,7 +1468,7 @@ static void perform_stream_op_locked(void* stream_op, // streaming call might send another message before getting a // recv_message failure, breaking out of its loop, and then // starting recv_trailing_metadata. - grpc_byte_stream_destroy(op->payload->send_message.send_message); + op->payload->send_message.send_message.reset(); grpc_chttp2_complete_closure_step( t, s, &s->fetching_send_message_finished, t->is_client && s->received_trailing_metadata @@ -1488,14 +1481,15 @@ static void perform_stream_op_locked(void* stream_op, GPR_ASSERT(s->fetching_send_message == nullptr); uint8_t* frame_hdr = grpc_slice_buffer_tiny_add( &s->flow_controlled_buffer, GRPC_HEADER_SIZE_IN_BYTES); - uint32_t flags = op_payload->send_message.send_message->flags; + uint32_t flags = op_payload->send_message.send_message->flags(); frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0; - size_t len = op_payload->send_message.send_message->length; + size_t len = op_payload->send_message.send_message->length(); frame_hdr[1] = static_cast<uint8_t>(len >> 24); frame_hdr[2] = static_cast<uint8_t>(len >> 16); frame_hdr[3] = static_cast<uint8_t>(len >> 8); frame_hdr[4] = static_cast<uint8_t>(len); - s->fetching_send_message = op_payload->send_message.send_message; + s->fetching_send_message = + std::move(op_payload->send_message.send_message); s->fetched_send_message_length = 0; s->next_message_end_offset = s->flow_controlled_bytes_written + @@ -1947,12 +1941,12 @@ static void remove_stream(grpc_chttp2_transport* t, uint32_t id, } if (s->pending_byte_stream) { if (s->on_next != nullptr) { - grpc_chttp2_incoming_byte_stream* bs = s->data_parser.parsing_frame; + grpc_core::Chttp2IncomingByteStream* bs = s->data_parser.parsing_frame; if (error == GRPC_ERROR_NONE) { error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message"); } - incoming_byte_stream_publish_error(bs, error); - incoming_byte_stream_unref(bs); + bs->PublishError(error); + bs->Unref(); s->data_parser.parsing_frame = nullptr; } else { GRPC_ERROR_UNREF(s->byte_stream_error); @@ -2096,10 +2090,7 @@ void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t, GRPC_ERROR_REF(error), "send_trailing_metadata_finished"); - if (s->fetching_send_message != nullptr) { - grpc_byte_stream_destroy(s->fetching_send_message); - s->fetching_send_message = nullptr; - } + s->fetching_send_message.reset(); grpc_chttp2_complete_closure_step(t, s, &s->fetching_send_message_finished, GRPC_ERROR_REF(error), "fetching_send_message_finished"); @@ -2715,7 +2706,6 @@ static void set_pollset_set(grpc_transport* gt, grpc_stream* gs, static void reset_byte_stream(void* arg, grpc_error* error) { grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(arg); - s->pending_byte_stream = false; if (error == GRPC_ERROR_NONE) { grpc_chttp2_maybe_complete_recv_message(s->t, s); @@ -2731,22 +2721,56 @@ static void reset_byte_stream(void* arg, grpc_error* error) { } } -static void incoming_byte_stream_unref(grpc_chttp2_incoming_byte_stream* bs) { - if (gpr_unref(&bs->refs)) { - gpr_free(bs); +namespace grpc_core { + +Chttp2IncomingByteStream::Chttp2IncomingByteStream( + grpc_chttp2_transport* transport, grpc_chttp2_stream* stream, + uint32_t frame_size, uint32_t flags) + : ByteStream(frame_size, flags), + transport_(transport), + stream_(stream), + remaining_bytes_(frame_size) { + gpr_ref_init(&refs_, 2); + GRPC_ERROR_UNREF(stream->byte_stream_error); + stream->byte_stream_error = GRPC_ERROR_NONE; +} + +void Chttp2IncomingByteStream::OrphanLocked(void* arg, + grpc_error* error_ignored) { + Chttp2IncomingByteStream* bs = static_cast<Chttp2IncomingByteStream*>(arg); + grpc_chttp2_stream* s = bs->stream_; + grpc_chttp2_transport* t = s->t; + bs->Unref(); + s->pending_byte_stream = false; + grpc_chttp2_maybe_complete_recv_message(t, s); + grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s); +} + +void Chttp2IncomingByteStream::Orphan() { + GPR_TIMER_SCOPE("incoming_byte_stream_destroy", 0); + GRPC_CLOSURE_SCHED( + GRPC_CLOSURE_INIT(&destroy_action_, + &Chttp2IncomingByteStream::OrphanLocked, this, + grpc_combiner_scheduler(transport_->combiner)), + GRPC_ERROR_NONE); +} + +void Chttp2IncomingByteStream::Unref() { + if (gpr_unref(&refs_)) { + Delete(this); } } -static void incoming_byte_stream_next_locked(void* argp, - grpc_error* error_ignored) { - grpc_chttp2_incoming_byte_stream* bs = - static_cast<grpc_chttp2_incoming_byte_stream*>(argp); - grpc_chttp2_transport* t = bs->transport; - grpc_chttp2_stream* s = bs->stream; +void Chttp2IncomingByteStream::Ref() { gpr_ref(&refs_); } +void Chttp2IncomingByteStream::NextLocked(void* arg, + grpc_error* error_ignored) { + Chttp2IncomingByteStream* bs = static_cast<Chttp2IncomingByteStream*>(arg); + grpc_chttp2_transport* t = bs->transport_; + grpc_chttp2_stream* s = bs->stream_; size_t cur_length = s->frame_storage.length; if (!s->read_closed) { - s->flow_control->IncomingByteStreamUpdate(bs->next_action.max_size_hint, + s->flow_control->IncomingByteStreamUpdate(bs->next_action_.max_size_hint, cur_length); grpc_chttp2_act_on_flowctl_action(s->flow_control->MakeAction(), t, s); } @@ -2755,22 +2779,22 @@ static void incoming_byte_stream_next_locked(void* argp, grpc_slice_buffer_swap(&s->frame_storage, &s->unprocessed_incoming_frames_buffer); s->unprocessed_incoming_frames_decompressed = false; - GRPC_CLOSURE_SCHED(bs->next_action.on_complete, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(bs->next_action_.on_complete, GRPC_ERROR_NONE); } else if (s->byte_stream_error != GRPC_ERROR_NONE) { - GRPC_CLOSURE_SCHED(bs->next_action.on_complete, + GRPC_CLOSURE_SCHED(bs->next_action_.on_complete, GRPC_ERROR_REF(s->byte_stream_error)); if (s->data_parser.parsing_frame != nullptr) { - incoming_byte_stream_unref(s->data_parser.parsing_frame); + s->data_parser.parsing_frame->Unref(); s->data_parser.parsing_frame = nullptr; } } else if (s->read_closed) { - if (bs->remaining_bytes != 0) { + if (bs->remaining_bytes_ != 0) { s->byte_stream_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message"); - GRPC_CLOSURE_SCHED(bs->next_action.on_complete, + GRPC_CLOSURE_SCHED(bs->next_action_.on_complete, GRPC_ERROR_REF(s->byte_stream_error)); if (s->data_parser.parsing_frame != nullptr) { - incoming_byte_stream_unref(s->data_parser.parsing_frame); + s->data_parser.parsing_frame->Unref(); s->data_parser.parsing_frame = nullptr; } } else { @@ -2778,122 +2802,94 @@ static void incoming_byte_stream_next_locked(void* argp, GPR_ASSERT(false); } } else { - s->on_next = bs->next_action.on_complete; + s->on_next = bs->next_action_.on_complete; } - incoming_byte_stream_unref(bs); + bs->Unref(); } -static bool incoming_byte_stream_next(grpc_byte_stream* byte_stream, - size_t max_size_hint, - grpc_closure* on_complete) { +bool Chttp2IncomingByteStream::Next(size_t max_size_hint, + grpc_closure* on_complete) { GPR_TIMER_SCOPE("incoming_byte_stream_next", 0); - grpc_chttp2_incoming_byte_stream* bs = - reinterpret_cast<grpc_chttp2_incoming_byte_stream*>(byte_stream); - grpc_chttp2_stream* s = bs->stream; - if (s->unprocessed_incoming_frames_buffer.length > 0) { + if (stream_->unprocessed_incoming_frames_buffer.length > 0) { return true; } else { - gpr_ref(&bs->refs); - bs->next_action.max_size_hint = max_size_hint; - bs->next_action.on_complete = on_complete; + Ref(); + next_action_.max_size_hint = max_size_hint; + next_action_.on_complete = on_complete; GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_INIT(&bs->next_action.closure, - incoming_byte_stream_next_locked, bs, - grpc_combiner_scheduler(bs->transport->combiner)), + GRPC_CLOSURE_INIT(&next_action_.closure, + &Chttp2IncomingByteStream::NextLocked, this, + grpc_combiner_scheduler(transport_->combiner)), GRPC_ERROR_NONE); return false; } } -static grpc_error* incoming_byte_stream_pull(grpc_byte_stream* byte_stream, - grpc_slice* slice) { +grpc_error* Chttp2IncomingByteStream::Pull(grpc_slice* slice) { GPR_TIMER_SCOPE("incoming_byte_stream_pull", 0); - grpc_chttp2_incoming_byte_stream* bs = - reinterpret_cast<grpc_chttp2_incoming_byte_stream*>(byte_stream); - grpc_chttp2_stream* s = bs->stream; grpc_error* error; - - if (s->unprocessed_incoming_frames_buffer.length > 0) { - if (!s->unprocessed_incoming_frames_decompressed) { + if (stream_->unprocessed_incoming_frames_buffer.length > 0) { + if (!stream_->unprocessed_incoming_frames_decompressed) { bool end_of_context; - if (!s->stream_decompression_ctx) { - s->stream_decompression_ctx = grpc_stream_compression_context_create( - s->stream_decompression_method); + if (!stream_->stream_decompression_ctx) { + stream_->stream_decompression_ctx = + grpc_stream_compression_context_create( + stream_->stream_decompression_method); } - if (!grpc_stream_decompress(s->stream_decompression_ctx, - &s->unprocessed_incoming_frames_buffer, - &s->decompressed_data_buffer, nullptr, + if (!grpc_stream_decompress(stream_->stream_decompression_ctx, + &stream_->unprocessed_incoming_frames_buffer, + &stream_->decompressed_data_buffer, nullptr, MAX_SIZE_T, &end_of_context)) { error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream decompression error."); return error; } - GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0); - grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer, - &s->decompressed_data_buffer); - s->unprocessed_incoming_frames_decompressed = true; + GPR_ASSERT(stream_->unprocessed_incoming_frames_buffer.length == 0); + grpc_slice_buffer_swap(&stream_->unprocessed_incoming_frames_buffer, + &stream_->decompressed_data_buffer); + stream_->unprocessed_incoming_frames_decompressed = true; if (end_of_context) { - grpc_stream_compression_context_destroy(s->stream_decompression_ctx); - s->stream_decompression_ctx = nullptr; + grpc_stream_compression_context_destroy( + stream_->stream_decompression_ctx); + stream_->stream_decompression_ctx = nullptr; } - if (s->unprocessed_incoming_frames_buffer.length == 0) { + if (stream_->unprocessed_incoming_frames_buffer.length == 0) { *slice = grpc_empty_slice(); } } error = grpc_deframe_unprocessed_incoming_frames( - &s->data_parser, s, &s->unprocessed_incoming_frames_buffer, slice, - nullptr); + &stream_->data_parser, stream_, + &stream_->unprocessed_incoming_frames_buffer, slice, nullptr); if (error != GRPC_ERROR_NONE) { return error; } } else { error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message"); - GRPC_CLOSURE_SCHED(&s->reset_byte_stream, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(&stream_->reset_byte_stream, GRPC_ERROR_REF(error)); return error; } return GRPC_ERROR_NONE; } -static void incoming_byte_stream_destroy_locked(void* byte_stream, - grpc_error* error_ignored); - -static void incoming_byte_stream_destroy(grpc_byte_stream* byte_stream) { - GPR_TIMER_SCOPE("incoming_byte_stream_destroy", 0); - grpc_chttp2_incoming_byte_stream* bs = - reinterpret_cast<grpc_chttp2_incoming_byte_stream*>(byte_stream); - GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_INIT(&bs->destroy_action, - incoming_byte_stream_destroy_locked, bs, - grpc_combiner_scheduler(bs->transport->combiner)), - GRPC_ERROR_NONE); -} - -static void incoming_byte_stream_publish_error( - grpc_chttp2_incoming_byte_stream* bs, grpc_error* error) { - grpc_chttp2_stream* s = bs->stream; - +void Chttp2IncomingByteStream::PublishError(grpc_error* error) { GPR_ASSERT(error != GRPC_ERROR_NONE); - GRPC_CLOSURE_SCHED(s->on_next, GRPC_ERROR_REF(error)); - s->on_next = nullptr; - GRPC_ERROR_UNREF(s->byte_stream_error); - s->byte_stream_error = GRPC_ERROR_REF(error); - grpc_chttp2_cancel_stream(bs->transport, bs->stream, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(stream_->on_next, GRPC_ERROR_REF(error)); + stream_->on_next = nullptr; + GRPC_ERROR_UNREF(stream_->byte_stream_error); + stream_->byte_stream_error = GRPC_ERROR_REF(error); + grpc_chttp2_cancel_stream(transport_, stream_, GRPC_ERROR_REF(error)); } -grpc_error* grpc_chttp2_incoming_byte_stream_push( - grpc_chttp2_incoming_byte_stream* bs, grpc_slice slice, - grpc_slice* slice_out) { - grpc_chttp2_stream* s = bs->stream; - - if (bs->remaining_bytes < GRPC_SLICE_LENGTH(slice)) { +grpc_error* Chttp2IncomingByteStream::Push(grpc_slice slice, + grpc_slice* slice_out) { + if (remaining_bytes_ < GRPC_SLICE_LENGTH(slice)) { grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream"); - - GRPC_CLOSURE_SCHED(&s->reset_byte_stream, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(&stream_->reset_byte_stream, GRPC_ERROR_REF(error)); grpc_slice_unref_internal(slice); return error; } else { - bs->remaining_bytes -= static_cast<uint32_t> GRPC_SLICE_LENGTH(slice); + remaining_bytes_ -= static_cast<uint32_t> GRPC_SLICE_LENGTH(slice); if (slice_out != nullptr) { *slice_out = slice; } @@ -2901,66 +2897,25 @@ grpc_error* grpc_chttp2_incoming_byte_stream_push( } } -grpc_error* grpc_chttp2_incoming_byte_stream_finished( - grpc_chttp2_incoming_byte_stream* bs, grpc_error* error, - bool reset_on_error) { - grpc_chttp2_stream* s = bs->stream; - +grpc_error* Chttp2IncomingByteStream::Finished(grpc_error* error, + bool reset_on_error) { if (error == GRPC_ERROR_NONE) { - if (bs->remaining_bytes != 0) { + if (remaining_bytes_ != 0) { error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message"); } } if (error != GRPC_ERROR_NONE && reset_on_error) { - GRPC_CLOSURE_SCHED(&s->reset_byte_stream, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(&stream_->reset_byte_stream, GRPC_ERROR_REF(error)); } - incoming_byte_stream_unref(bs); + Unref(); return error; } -static void incoming_byte_stream_shutdown(grpc_byte_stream* byte_stream, - grpc_error* error) { - grpc_chttp2_incoming_byte_stream* bs = - reinterpret_cast<grpc_chttp2_incoming_byte_stream*>(byte_stream); - GRPC_ERROR_UNREF(grpc_chttp2_incoming_byte_stream_finished( - bs, error, true /* reset_on_error */)); +void Chttp2IncomingByteStream::Shutdown(grpc_error* error) { + GRPC_ERROR_UNREF(Finished(error, true /* reset_on_error */)); } -static const grpc_byte_stream_vtable grpc_chttp2_incoming_byte_stream_vtable = { - incoming_byte_stream_next, incoming_byte_stream_pull, - incoming_byte_stream_shutdown, incoming_byte_stream_destroy}; - -static void incoming_byte_stream_destroy_locked(void* byte_stream, - grpc_error* error_ignored) { - grpc_chttp2_incoming_byte_stream* bs = - static_cast<grpc_chttp2_incoming_byte_stream*>(byte_stream); - grpc_chttp2_stream* s = bs->stream; - grpc_chttp2_transport* t = s->t; - - GPR_ASSERT(bs->base.vtable == &grpc_chttp2_incoming_byte_stream_vtable); - incoming_byte_stream_unref(bs); - s->pending_byte_stream = false; - grpc_chttp2_maybe_complete_recv_message(t, s); - grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s); -} - -grpc_chttp2_incoming_byte_stream* grpc_chttp2_incoming_byte_stream_create( - grpc_chttp2_transport* t, grpc_chttp2_stream* s, uint32_t frame_size, - uint32_t flags) { - grpc_chttp2_incoming_byte_stream* incoming_byte_stream = - static_cast<grpc_chttp2_incoming_byte_stream*>( - gpr_malloc(sizeof(*incoming_byte_stream))); - incoming_byte_stream->base.length = frame_size; - incoming_byte_stream->remaining_bytes = frame_size; - incoming_byte_stream->base.flags = flags; - incoming_byte_stream->base.vtable = &grpc_chttp2_incoming_byte_stream_vtable; - gpr_ref_init(&incoming_byte_stream->refs, 2); - incoming_byte_stream->transport = t; - incoming_byte_stream->stream = s; - GRPC_ERROR_UNREF(s->byte_stream_error); - s->byte_stream_error = GRPC_ERROR_NONE; - return incoming_byte_stream; -} +} // namespace grpc_core /******************************************************************************* * RESOURCE QUOTAS |