aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/transport/chttp2_transport.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/transport/chttp2_transport.c')
-rw-r--r--src/core/transport/chttp2_transport.c140
1 files changed, 67 insertions, 73 deletions
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 8caa10c938..1bbd210e46 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -84,13 +84,15 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t);
/* forward declarations of various callbacks that we'll build closures around */
static void writing_action(void *t, int iomgr_success_ignored);
+static void reading_action(void *t, int iomgr_success_ignored);
/** Set a transport level setting, and push it to our peer */
static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
gpr_uint32 value);
/** Endpoint callback to process incoming data */
-static void recv_data(void *tp, int success);
+static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
+ grpc_endpoint_cb_status error);
/** Start disconnection chain */
static void drop_connection(grpc_chttp2_transport *t);
@@ -141,7 +143,6 @@ static void destruct_transport(grpc_chttp2_transport *t) {
grpc_chttp2_hpack_compressor_destroy(&t->writing.hpack_compressor);
gpr_slice_buffer_destroy(&t->parsing.qbuf);
- gpr_slice_buffer_destroy(&t->read_buffer);
grpc_chttp2_hpack_parser_destroy(&t->parsing.hpack_parser);
grpc_chttp2_goaway_parser_destroy(&t->parsing.goaway_parser);
@@ -248,16 +249,12 @@ static void init_transport(grpc_chttp2_transport *t,
gpr_slice_buffer_init(&t->writing.outbuf);
grpc_chttp2_hpack_compressor_init(&t->writing.hpack_compressor, mdctx);
grpc_iomgr_closure_init(&t->writing_action, writing_action, t);
+ grpc_iomgr_closure_init(&t->reading_action, reading_action, t);
gpr_slice_buffer_init(&t->parsing.qbuf);
grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser);
grpc_chttp2_hpack_parser_init(&t->parsing.hpack_parser, t->metadata_context);
- grpc_iomgr_closure_init(&t->writing.done_cb, grpc_chttp2_terminate_writing,
- &t->writing);
- grpc_iomgr_closure_init(&t->recv_data, recv_data, t);
- gpr_slice_buffer_init(&t->read_buffer);
-
if (is_client) {
gpr_slice_buffer_add(
&t->global.qbuf,
@@ -505,8 +502,8 @@ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
}
}
-void grpc_chttp2_terminate_writing(void *transport_writing_ptr, int success) {
- grpc_chttp2_transport_writing *transport_writing = transport_writing_ptr;
+void grpc_chttp2_terminate_writing(
+ grpc_chttp2_transport_writing *transport_writing, int success) {
grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing);
lock(t);
@@ -1063,76 +1060,74 @@ static void read_error_locked(grpc_chttp2_transport *t) {
}
/* tcp read callback */
-static int recv_data_loop(grpc_chttp2_transport *t, int *success) {
+static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
+ grpc_endpoint_cb_status error) {
+ grpc_chttp2_transport *t = tp;
size_t i;
- int keep_reading = 0;
+ int unref = 0;
- lock(t);
- i = 0;
- GPR_ASSERT(!t->parsing_active);
- if (!t->closed) {
- t->parsing_active = 1;
- /* merge stream lists */
- grpc_chttp2_stream_map_move_into(&t->new_stream_map,
- &t->parsing_stream_map);
- grpc_chttp2_prepare_to_read(&t->global, &t->parsing);
- gpr_mu_unlock(&t->mu);
- for (; i < t->read_buffer.count &&
- grpc_chttp2_perform_read(&t->parsing, t->read_buffer.slices[i]);
- i++)
- ;
- gpr_mu_lock(&t->mu);
- if (i != t->read_buffer.count) {
+ switch (error) {
+ case GRPC_ENDPOINT_CB_SHUTDOWN:
+ case GRPC_ENDPOINT_CB_EOF:
+ case GRPC_ENDPOINT_CB_ERROR:
+ lock(t);
drop_connection(t);
- }
- /* merge stream lists */
- grpc_chttp2_stream_map_move_into(&t->new_stream_map,
- &t->parsing_stream_map);
- t->global.concurrent_stream_count =
- grpc_chttp2_stream_map_size(&t->parsing_stream_map);
- if (t->parsing.initial_window_update != 0) {
- grpc_chttp2_stream_map_for_each(&t->parsing_stream_map,
- update_global_window, t);
- t->parsing.initial_window_update = 0;
- }
- /* handle higher level things */
- grpc_chttp2_publish_reads(&t->global, &t->parsing);
- t->parsing_active = 0;
- }
- if (!*success || i != t->read_buffer.count) {
- drop_connection(t);
- read_error_locked(t);
- } else {
- keep_reading = 1;
+ read_error_locked(t);
+ unlock(t);
+ unref = 1;
+ for (i = 0; i < nslices; i++) gpr_slice_unref(slices[i]);
+ break;
+ case GRPC_ENDPOINT_CB_OK:
+ lock(t);
+ i = 0;
+ GPR_ASSERT(!t->parsing_active);
+ if (!t->closed) {
+ t->parsing_active = 1;
+ /* merge stream lists */
+ grpc_chttp2_stream_map_move_into(&t->new_stream_map,
+ &t->parsing_stream_map);
+ grpc_chttp2_prepare_to_read(&t->global, &t->parsing);
+ gpr_mu_unlock(&t->mu);
+ for (; i < nslices && grpc_chttp2_perform_read(&t->parsing, slices[i]);
+ i++) {
+ gpr_slice_unref(slices[i]);
+ }
+ gpr_mu_lock(&t->mu);
+ if (i != nslices) {
+ drop_connection(t);
+ }
+ /* merge stream lists */
+ grpc_chttp2_stream_map_move_into(&t->new_stream_map,
+ &t->parsing_stream_map);
+ t->global.concurrent_stream_count =
+ grpc_chttp2_stream_map_size(&t->parsing_stream_map);
+ if (t->parsing.initial_window_update != 0) {
+ grpc_chttp2_stream_map_for_each(&t->parsing_stream_map,
+ update_global_window, t);
+ t->parsing.initial_window_update = 0;
+ }
+ /* handle higher level things */
+ grpc_chttp2_publish_reads(&t->global, &t->parsing);
+ t->parsing_active = 0;
+ }
+ if (i == nslices) {
+ grpc_chttp2_schedule_closure(&t->global, &t->reading_action, 1);
+ } else {
+ read_error_locked(t);
+ unref = 1;
+ }
+ unlock(t);
+ for (; i < nslices; i++) gpr_slice_unref(slices[i]);
+ break;
}
- gpr_slice_buffer_reset_and_unref(&t->read_buffer);
- unlock(t);
-
- if (keep_reading) {
- switch (grpc_endpoint_read(t->ep, &t->read_buffer, &t->recv_data)) {
- case GRPC_ENDPOINT_DONE:
- *success = 1;
- return 1;
- case GRPC_ENDPOINT_ERROR:
- *success = 0;
- return 1;
- case GRPC_ENDPOINT_PENDING:
- return 0;
- }
- } else {
+ if (unref) {
UNREF_TRANSPORT(t, "recv_data");
- return 0;
}
-
- gpr_log(GPR_ERROR, "should never reach here");
- abort();
}
-static void recv_data(void *tp, int success) {
- grpc_chttp2_transport *t = tp;
-
- while (recv_data_loop(t, &success))
- ;
+static void reading_action(void *pt, int iomgr_success_ignored) {
+ grpc_chttp2_transport *t = pt;
+ grpc_endpoint_notify_on_read(t->ep, recv_data, t);
}
/*
@@ -1245,6 +1240,5 @@ void grpc_chttp2_transport_start_reading(grpc_transport *transport,
gpr_slice *slices, size_t nslices) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)transport;
REF_TRANSPORT(t, "recv_data"); /* matches unref inside recv_data */
- gpr_slice_buffer_addn(&t->read_buffer, slices, nslices);
- recv_data(t, 1);
+ recv_data(t, slices, nslices, GRPC_ENDPOINT_CB_OK);
}