diff options
Diffstat (limited to 'src/core/transport')
-rw-r--r-- | src/core/transport/chttp2/internal.h | 15 | ||||
-rw-r--r-- | src/core/transport/chttp2/writing.c | 21 | ||||
-rw-r--r-- | src/core/transport/chttp2_transport.c | 198 |
3 files changed, 139 insertions, 95 deletions
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index 42cf0ecd5b..7a42de9245 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -214,6 +214,8 @@ typedef struct { grpc_chttp2_hpack_compressor hpack_compressor; /** is this a client? */ gpr_uint8 is_client; + /** callback for when writing is done */ + grpc_iomgr_closure done_cb; } grpc_chttp2_transport_writing; struct grpc_chttp2_transport_parsing { @@ -291,6 +293,9 @@ struct grpc_chttp2_transport { gpr_refcount refs; char *peer_string; + /** when this drops to zero it's safe to shutdown the endpoint */ + gpr_refcount shutdown_ep_refs; + gpr_mu mu; /** is the transport destroying itself? */ @@ -329,8 +334,11 @@ struct grpc_chttp2_transport { /** closure to execute writing */ grpc_iomgr_closure writing_action; - /** closure to start reading from the endpoint */ - grpc_iomgr_closure reading_action; + /** closure to finish reading from the endpoint */ + grpc_iomgr_closure recv_data; + + /** incoming read bytes */ + gpr_slice_buffer read_buffer; /** address to place a newly accepted stream - set and unset by grpc_chttp2_parsing_accept_stream; used by init_stream to @@ -463,8 +471,7 @@ int grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport_global *global, grpc_chttp2_transport_writing *writing); void grpc_chttp2_perform_writes( grpc_chttp2_transport_writing *transport_writing, grpc_endpoint *endpoint); -void grpc_chttp2_terminate_writing( - grpc_chttp2_transport_writing *transport_writing, int success); +void grpc_chttp2_terminate_writing(void *transport_writing, int success); void grpc_chttp2_cleanup_writing(grpc_chttp2_transport_global *global, grpc_chttp2_transport_writing *writing); diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c index 123061b3fc..2c8c48f47b 100644 --- a/src/core/transport/chttp2/writing.c +++ b/src/core/transport/chttp2/writing.c @@ -37,7 +37,6 @@ #include <grpc/support/log.h> static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing); -static void finish_write_cb(void *tw, grpc_endpoint_cb_status write_status); int grpc_chttp2_unlocking_check_writes( grpc_chttp2_transport_global *transport_global, @@ -165,16 +164,15 @@ void grpc_chttp2_perform_writes( GPR_ASSERT(transport_writing->outbuf.count > 0); GPR_ASSERT(endpoint); - switch (grpc_endpoint_write(endpoint, transport_writing->outbuf.slices, - transport_writing->outbuf.count, finish_write_cb, - transport_writing)) { - case GRPC_ENDPOINT_WRITE_DONE: + switch (grpc_endpoint_write(endpoint, &transport_writing->outbuf, + &transport_writing->done_cb)) { + case GRPC_ENDPOINT_DONE: grpc_chttp2_terminate_writing(transport_writing, 1); break; - case GRPC_ENDPOINT_WRITE_ERROR: + case GRPC_ENDPOINT_ERROR: grpc_chttp2_terminate_writing(transport_writing, 0); break; - case GRPC_ENDPOINT_WRITE_PENDING: + case GRPC_ENDPOINT_PENDING: break; } } @@ -209,12 +207,6 @@ static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) { } } -static void finish_write_cb(void *tw, grpc_endpoint_cb_status write_status) { - grpc_chttp2_transport_writing *transport_writing = tw; - grpc_chttp2_terminate_writing(transport_writing, - write_status == GRPC_ENDPOINT_CB_OK); -} - void grpc_chttp2_cleanup_writing( grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing) { @@ -243,6 +235,5 @@ void grpc_chttp2_cleanup_writing( grpc_chttp2_list_add_read_write_state_changed(transport_global, stream_global); } - transport_writing->outbuf.count = 0; - transport_writing->outbuf.length = 0; + gpr_slice_buffer_reset_and_unref(&transport_writing->outbuf); } diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 1bbd210e46..aa6a860c67 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -84,15 +84,13 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t); /* forward declarations of various callbacks that we'll build closures around */ static void writing_action(void *t, int iomgr_success_ignored); -static void reading_action(void *t, int iomgr_success_ignored); /** Set a transport level setting, and push it to our peer */ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, gpr_uint32 value); /** Endpoint callback to process incoming data */ -static void recv_data(void *tp, gpr_slice *slices, size_t nslices, - grpc_endpoint_cb_status error); +static void recv_data(void *tp, int success); /** Start disconnection chain */ static void drop_connection(grpc_chttp2_transport *t); @@ -143,6 +141,7 @@ static void destruct_transport(grpc_chttp2_transport *t) { grpc_chttp2_hpack_compressor_destroy(&t->writing.hpack_compressor); gpr_slice_buffer_destroy(&t->parsing.qbuf); + gpr_slice_buffer_destroy(&t->read_buffer); grpc_chttp2_hpack_parser_destroy(&t->parsing.hpack_parser); grpc_chttp2_goaway_parser_destroy(&t->parsing.goaway_parser); @@ -223,6 +222,8 @@ static void init_transport(grpc_chttp2_transport *t, t->ep = ep; /* one ref is for destroy, the other for when ep becomes NULL */ gpr_ref_init(&t->refs, 2); + /* ref is dropped at transport close() */ + gpr_ref_init(&t->shutdown_ep_refs, 1); gpr_mu_init(&t->mu); grpc_mdctx_ref(mdctx); t->peer_string = grpc_endpoint_get_peer(ep); @@ -249,12 +250,16 @@ static void init_transport(grpc_chttp2_transport *t, gpr_slice_buffer_init(&t->writing.outbuf); grpc_chttp2_hpack_compressor_init(&t->writing.hpack_compressor, mdctx); grpc_iomgr_closure_init(&t->writing_action, writing_action, t); - grpc_iomgr_closure_init(&t->reading_action, reading_action, t); gpr_slice_buffer_init(&t->parsing.qbuf); grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser); grpc_chttp2_hpack_parser_init(&t->parsing.hpack_parser, t->metadata_context); + grpc_iomgr_closure_init(&t->writing.done_cb, grpc_chttp2_terminate_writing, + &t->writing); + grpc_iomgr_closure_init(&t->recv_data, recv_data, t); + gpr_slice_buffer_init(&t->read_buffer); + if (is_client) { gpr_slice_buffer_add( &t->global.qbuf, @@ -333,13 +338,45 @@ static void destroy_transport(grpc_transport *gt) { UNREF_TRANSPORT(t, "destroy"); } +/** block grpc_endpoint_shutdown being called until a paired + allow_endpoint_shutdown is made */ +static void prevent_endpoint_shutdown(grpc_chttp2_transport *t) { + GPR_ASSERT(t->ep); + gpr_ref(&t->shutdown_ep_refs); +} + +static void allow_endpoint_shutdown_locked(grpc_chttp2_transport *t) { + if (gpr_unref(&t->shutdown_ep_refs)) { + if (t->ep) { + grpc_endpoint_shutdown(t->ep); + } + } +} + +static void allow_endpoint_shutdown_unlocked(grpc_chttp2_transport *t) { + if (gpr_unref(&t->shutdown_ep_refs)) { + gpr_mu_lock(&t->mu); + if (t->ep) { + grpc_endpoint_shutdown(t->ep); + } + gpr_mu_unlock(&t->mu); + } +} + +static void destroy_endpoint(grpc_chttp2_transport *t) { + grpc_endpoint_destroy(t->ep); + t->ep = NULL; + UNREF_TRANSPORT( + t, "disconnect"); /* safe because we'll still have the ref for write */ +} + static void close_transport_locked(grpc_chttp2_transport *t) { if (!t->closed) { t->closed = 1; connectivity_state_set(&t->global, GRPC_CHANNEL_FATAL_FAILURE, "close_transport"); if (t->ep) { - grpc_endpoint_shutdown(t->ep); + allow_endpoint_shutdown_locked(t); } } } @@ -468,6 +505,7 @@ static void unlock(grpc_chttp2_transport *t) { t->writing_active = 1; REF_TRANSPORT(t, "writing"); grpc_chttp2_schedule_closure(&t->global, &t->writing_action, 1); + prevent_endpoint_shutdown(t); } run_closures = t->global.pending_closures_head; @@ -502,12 +540,14 @@ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, } } -void grpc_chttp2_terminate_writing( - grpc_chttp2_transport_writing *transport_writing, int success) { +void grpc_chttp2_terminate_writing(void *transport_writing_ptr, int success) { + grpc_chttp2_transport_writing *transport_writing = transport_writing_ptr; grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing); lock(t); + allow_endpoint_shutdown_locked(t); + if (!success) { drop_connection(t); } @@ -519,10 +559,7 @@ void grpc_chttp2_terminate_writing( from starting */ t->writing_active = 0; if (t->ep && !t->endpoint_reading) { - grpc_endpoint_destroy(t->ep); - t->ep = NULL; - UNREF_TRANSPORT( - t, "disconnect"); /* safe because we'll still have the ref for write */ + destroy_endpoint(t); } unlock(t); @@ -1052,82 +1089,90 @@ static void update_global_window(void *args, gpr_uint32 id, void *stream) { static void read_error_locked(grpc_chttp2_transport *t) { t->endpoint_reading = 0; if (!t->writing_active && t->ep) { - grpc_endpoint_destroy(t->ep); - t->ep = NULL; - /* safe as we still have a ref for read */ - UNREF_TRANSPORT(t, "disconnect"); + destroy_endpoint(t); } } /* tcp read callback */ -static void recv_data(void *tp, gpr_slice *slices, size_t nslices, - grpc_endpoint_cb_status error) { - grpc_chttp2_transport *t = tp; +static int recv_data_loop(grpc_chttp2_transport *t, int *success) { size_t i; - int unref = 0; + int keep_reading = 0; - switch (error) { - case GRPC_ENDPOINT_CB_SHUTDOWN: - case GRPC_ENDPOINT_CB_EOF: - case GRPC_ENDPOINT_CB_ERROR: - lock(t); + lock(t); + i = 0; + GPR_ASSERT(!t->parsing_active); + if (!t->closed) { + t->parsing_active = 1; + /* merge stream lists */ + grpc_chttp2_stream_map_move_into(&t->new_stream_map, + &t->parsing_stream_map); + grpc_chttp2_prepare_to_read(&t->global, &t->parsing); + gpr_mu_unlock(&t->mu); + for (; i < t->read_buffer.count && + grpc_chttp2_perform_read(&t->parsing, t->read_buffer.slices[i]); + i++) + ; + gpr_mu_lock(&t->mu); + if (i != t->read_buffer.count) { drop_connection(t); - read_error_locked(t); - unlock(t); - unref = 1; - for (i = 0; i < nslices; i++) gpr_slice_unref(slices[i]); - break; - case GRPC_ENDPOINT_CB_OK: - lock(t); - i = 0; - GPR_ASSERT(!t->parsing_active); - if (!t->closed) { - t->parsing_active = 1; - /* merge stream lists */ - grpc_chttp2_stream_map_move_into(&t->new_stream_map, - &t->parsing_stream_map); - grpc_chttp2_prepare_to_read(&t->global, &t->parsing); - gpr_mu_unlock(&t->mu); - for (; i < nslices && grpc_chttp2_perform_read(&t->parsing, slices[i]); - i++) { - gpr_slice_unref(slices[i]); - } - gpr_mu_lock(&t->mu); - if (i != nslices) { - drop_connection(t); - } - /* merge stream lists */ - grpc_chttp2_stream_map_move_into(&t->new_stream_map, - &t->parsing_stream_map); - t->global.concurrent_stream_count = - grpc_chttp2_stream_map_size(&t->parsing_stream_map); - if (t->parsing.initial_window_update != 0) { - grpc_chttp2_stream_map_for_each(&t->parsing_stream_map, - update_global_window, t); - t->parsing.initial_window_update = 0; - } - /* handle higher level things */ - grpc_chttp2_publish_reads(&t->global, &t->parsing); - t->parsing_active = 0; - } - if (i == nslices) { - grpc_chttp2_schedule_closure(&t->global, &t->reading_action, 1); - } else { - read_error_locked(t); - unref = 1; - } - unlock(t); - for (; i < nslices; i++) gpr_slice_unref(slices[i]); - break; + } + /* merge stream lists */ + grpc_chttp2_stream_map_move_into(&t->new_stream_map, + &t->parsing_stream_map); + t->global.concurrent_stream_count = + grpc_chttp2_stream_map_size(&t->parsing_stream_map); + if (t->parsing.initial_window_update != 0) { + grpc_chttp2_stream_map_for_each(&t->parsing_stream_map, + update_global_window, t); + t->parsing.initial_window_update = 0; + } + /* handle higher level things */ + grpc_chttp2_publish_reads(&t->global, &t->parsing); + t->parsing_active = 0; } - if (unref) { + if (!*success || i != t->read_buffer.count) { + drop_connection(t); + read_error_locked(t); + } else if (!t->closed) { + keep_reading = 1; + REF_TRANSPORT(t, "keep_reading"); + prevent_endpoint_shutdown(t); + } + gpr_slice_buffer_reset_and_unref(&t->read_buffer); + unlock(t); + + if (keep_reading) { + int ret = -1; + switch (grpc_endpoint_read(t->ep, &t->read_buffer, &t->recv_data)) { + case GRPC_ENDPOINT_DONE: + *success = 1; + ret = 1; + break; + case GRPC_ENDPOINT_ERROR: + *success = 0; + ret = 1; + break; + case GRPC_ENDPOINT_PENDING: + ret = 0; + break; + } + allow_endpoint_shutdown_unlocked(t); + UNREF_TRANSPORT(t, "keep_reading"); + return ret; + } else { UNREF_TRANSPORT(t, "recv_data"); + return 0; } + + gpr_log(GPR_ERROR, "should never reach here"); + abort(); } -static void reading_action(void *pt, int iomgr_success_ignored) { - grpc_chttp2_transport *t = pt; - grpc_endpoint_notify_on_read(t->ep, recv_data, t); +static void recv_data(void *tp, int success) { + grpc_chttp2_transport *t = tp; + + while (recv_data_loop(t, &success)) + ; } /* @@ -1240,5 +1285,6 @@ void grpc_chttp2_transport_start_reading(grpc_transport *transport, gpr_slice *slices, size_t nslices) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)transport; REF_TRANSPORT(t, "recv_data"); /* matches unref inside recv_data */ - recv_data(t, slices, nslices, GRPC_ENDPOINT_CB_OK); + gpr_slice_buffer_addn(&t->read_buffer, slices, nslices); + recv_data(t, 1); } |