diff options
Diffstat (limited to 'src/core/transport/chttp2_transport.c')
-rw-r--r-- | src/core/transport/chttp2_transport.c | 140 |
1 files changed, 67 insertions, 73 deletions
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 8caa10c938..1bbd210e46 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -84,13 +84,15 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t); /* forward declarations of various callbacks that we'll build closures around */ static void writing_action(void *t, int iomgr_success_ignored); +static void reading_action(void *t, int iomgr_success_ignored); /** Set a transport level setting, and push it to our peer */ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, gpr_uint32 value); /** Endpoint callback to process incoming data */ -static void recv_data(void *tp, int success); +static void recv_data(void *tp, gpr_slice *slices, size_t nslices, + grpc_endpoint_cb_status error); /** Start disconnection chain */ static void drop_connection(grpc_chttp2_transport *t); @@ -141,7 +143,6 @@ static void destruct_transport(grpc_chttp2_transport *t) { grpc_chttp2_hpack_compressor_destroy(&t->writing.hpack_compressor); gpr_slice_buffer_destroy(&t->parsing.qbuf); - gpr_slice_buffer_destroy(&t->read_buffer); grpc_chttp2_hpack_parser_destroy(&t->parsing.hpack_parser); grpc_chttp2_goaway_parser_destroy(&t->parsing.goaway_parser); @@ -248,16 +249,12 @@ static void init_transport(grpc_chttp2_transport *t, gpr_slice_buffer_init(&t->writing.outbuf); grpc_chttp2_hpack_compressor_init(&t->writing.hpack_compressor, mdctx); grpc_iomgr_closure_init(&t->writing_action, writing_action, t); + grpc_iomgr_closure_init(&t->reading_action, reading_action, t); gpr_slice_buffer_init(&t->parsing.qbuf); grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser); grpc_chttp2_hpack_parser_init(&t->parsing.hpack_parser, t->metadata_context); - grpc_iomgr_closure_init(&t->writing.done_cb, grpc_chttp2_terminate_writing, - &t->writing); - grpc_iomgr_closure_init(&t->recv_data, recv_data, t); - gpr_slice_buffer_init(&t->read_buffer); - if (is_client) { gpr_slice_buffer_add( &t->global.qbuf, @@ -505,8 +502,8 @@ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, } } -void grpc_chttp2_terminate_writing(void *transport_writing_ptr, int success) { - grpc_chttp2_transport_writing *transport_writing = transport_writing_ptr; +void grpc_chttp2_terminate_writing( + grpc_chttp2_transport_writing *transport_writing, int success) { grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing); lock(t); @@ -1063,76 +1060,74 @@ static void read_error_locked(grpc_chttp2_transport *t) { } /* tcp read callback */ -static int recv_data_loop(grpc_chttp2_transport *t, int *success) { +static void recv_data(void *tp, gpr_slice *slices, size_t nslices, + grpc_endpoint_cb_status error) { + grpc_chttp2_transport *t = tp; size_t i; - int keep_reading = 0; + int unref = 0; - lock(t); - i = 0; - GPR_ASSERT(!t->parsing_active); - if (!t->closed) { - t->parsing_active = 1; - /* merge stream lists */ - grpc_chttp2_stream_map_move_into(&t->new_stream_map, - &t->parsing_stream_map); - grpc_chttp2_prepare_to_read(&t->global, &t->parsing); - gpr_mu_unlock(&t->mu); - for (; i < t->read_buffer.count && - grpc_chttp2_perform_read(&t->parsing, t->read_buffer.slices[i]); - i++) - ; - gpr_mu_lock(&t->mu); - if (i != t->read_buffer.count) { + switch (error) { + case GRPC_ENDPOINT_CB_SHUTDOWN: + case GRPC_ENDPOINT_CB_EOF: + case GRPC_ENDPOINT_CB_ERROR: + lock(t); drop_connection(t); - } - /* merge stream lists */ - grpc_chttp2_stream_map_move_into(&t->new_stream_map, - &t->parsing_stream_map); - t->global.concurrent_stream_count = - grpc_chttp2_stream_map_size(&t->parsing_stream_map); - if (t->parsing.initial_window_update != 0) { - grpc_chttp2_stream_map_for_each(&t->parsing_stream_map, - update_global_window, t); - t->parsing.initial_window_update = 0; - } - /* handle higher level things */ - grpc_chttp2_publish_reads(&t->global, &t->parsing); - t->parsing_active = 0; - } - if (!*success || i != t->read_buffer.count) { - drop_connection(t); - read_error_locked(t); - } else { - keep_reading = 1; + read_error_locked(t); + unlock(t); + unref = 1; + for (i = 0; i < nslices; i++) gpr_slice_unref(slices[i]); + break; + case GRPC_ENDPOINT_CB_OK: + lock(t); + i = 0; + GPR_ASSERT(!t->parsing_active); + if (!t->closed) { + t->parsing_active = 1; + /* merge stream lists */ + grpc_chttp2_stream_map_move_into(&t->new_stream_map, + &t->parsing_stream_map); + grpc_chttp2_prepare_to_read(&t->global, &t->parsing); + gpr_mu_unlock(&t->mu); + for (; i < nslices && grpc_chttp2_perform_read(&t->parsing, slices[i]); + i++) { + gpr_slice_unref(slices[i]); + } + gpr_mu_lock(&t->mu); + if (i != nslices) { + drop_connection(t); + } + /* merge stream lists */ + grpc_chttp2_stream_map_move_into(&t->new_stream_map, + &t->parsing_stream_map); + t->global.concurrent_stream_count = + grpc_chttp2_stream_map_size(&t->parsing_stream_map); + if (t->parsing.initial_window_update != 0) { + grpc_chttp2_stream_map_for_each(&t->parsing_stream_map, + update_global_window, t); + t->parsing.initial_window_update = 0; + } + /* handle higher level things */ + grpc_chttp2_publish_reads(&t->global, &t->parsing); + t->parsing_active = 0; + } + if (i == nslices) { + grpc_chttp2_schedule_closure(&t->global, &t->reading_action, 1); + } else { + read_error_locked(t); + unref = 1; + } + unlock(t); + for (; i < nslices; i++) gpr_slice_unref(slices[i]); + break; } - gpr_slice_buffer_reset_and_unref(&t->read_buffer); - unlock(t); - - if (keep_reading) { - switch (grpc_endpoint_read(t->ep, &t->read_buffer, &t->recv_data)) { - case GRPC_ENDPOINT_DONE: - *success = 1; - return 1; - case GRPC_ENDPOINT_ERROR: - *success = 0; - return 1; - case GRPC_ENDPOINT_PENDING: - return 0; - } - } else { + if (unref) { UNREF_TRANSPORT(t, "recv_data"); - return 0; } - - gpr_log(GPR_ERROR, "should never reach here"); - abort(); } -static void recv_data(void *tp, int success) { - grpc_chttp2_transport *t = tp; - - while (recv_data_loop(t, &success)) - ; +static void reading_action(void *pt, int iomgr_success_ignored) { + grpc_chttp2_transport *t = pt; + grpc_endpoint_notify_on_read(t->ep, recv_data, t); } /* @@ -1245,6 +1240,5 @@ void grpc_chttp2_transport_start_reading(grpc_transport *transport, gpr_slice *slices, size_t nslices) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)transport; REF_TRANSPORT(t, "recv_data"); /* matches unref inside recv_data */ - gpr_slice_buffer_addn(&t->read_buffer, slices, nslices); - recv_data(t, 1); + recv_data(t, slices, nslices, GRPC_ENDPOINT_CB_OK); } |