diff options
Diffstat (limited to 'src/core/transport')
24 files changed, 199 insertions, 189 deletions
diff --git a/src/core/transport/chttp2/frame_data.c b/src/core/transport/chttp2/frame_data.c index ef12c910cd..a58189a136 100644 --- a/src/core/transport/chttp2/frame_data.c +++ b/src/core/transport/chttp2/frame_data.c @@ -72,7 +72,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_begin_frame( grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { gpr_uint8 *const beg = GPR_SLICE_START_PTR(slice); gpr_uint8 *const end = GPR_SLICE_END_PTR(slice); gpr_uint8 *cur = beg; diff --git a/src/core/transport/chttp2/frame_data.h b/src/core/transport/chttp2/frame_data.h index 7530f0f644..89a503eb7c 100644 --- a/src/core/transport/chttp2/frame_data.h +++ b/src/core/transport/chttp2/frame_data.h @@ -76,7 +76,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_begin_frame( grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last, - grpc_call_list *call_list); + grpc_closure_list *closure_list); /* create a slice with an empty data frame and is_last set */ gpr_slice grpc_chttp2_data_frame_create_empty_close(gpr_uint32 id); diff --git a/src/core/transport/chttp2/frame_goaway.c b/src/core/transport/chttp2/frame_goaway.c index 1c2bce6736..d338b56611 100644 --- a/src/core/transport/chttp2/frame_goaway.c +++ b/src/core/transport/chttp2/frame_goaway.c @@ -65,7 +65,7 @@ grpc_chttp2_parse_error grpc_chttp2_goaway_parser_begin_frame( grpc_chttp2_parse_error grpc_chttp2_goaway_parser_parse( void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { gpr_uint8 *const beg = GPR_SLICE_START_PTR(slice); gpr_uint8 *const end = GPR_SLICE_END_PTR(slice); gpr_uint8 *cur = beg; diff --git a/src/core/transport/chttp2/frame_goaway.h b/src/core/transport/chttp2/frame_goaway.h index ec991f4350..6f9ce94a0c 100644 --- a/src/core/transport/chttp2/frame_goaway.h +++ b/src/core/transport/chttp2/frame_goaway.h @@ -68,7 +68,7 @@ grpc_chttp2_parse_error grpc_chttp2_goaway_parser_begin_frame( grpc_chttp2_parse_error grpc_chttp2_goaway_parser_parse( void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last, - grpc_call_list *call_list); + grpc_closure_list *closure_list); void grpc_chttp2_goaway_append(gpr_uint32 last_stream_id, gpr_uint32 error_code, gpr_slice debug_data, diff --git a/src/core/transport/chttp2/frame_ping.c b/src/core/transport/chttp2/frame_ping.c index 2fb8850a45..d4bf43eb37 100644 --- a/src/core/transport/chttp2/frame_ping.c +++ b/src/core/transport/chttp2/frame_ping.c @@ -71,7 +71,7 @@ grpc_chttp2_parse_error grpc_chttp2_ping_parser_begin_frame( grpc_chttp2_parse_error grpc_chttp2_ping_parser_parse( void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { gpr_uint8 *const beg = GPR_SLICE_START_PTR(slice); gpr_uint8 *const end = GPR_SLICE_END_PTR(slice); gpr_uint8 *cur = beg; @@ -90,7 +90,7 @@ grpc_chttp2_parse_error grpc_chttp2_ping_parser_parse( for (ping = transport_parsing->pings.next; ping != &transport_parsing->pings; ping = ping->next) { if (0 == memcmp(p->opaque_8bytes, ping->id, 8)) { - grpc_call_list_add(call_list, ping->on_recv, 1); + grpc_closure_list_add(closure_list, ping->on_recv, 1); } ping->next->prev = ping->prev; ping->prev->next = ping->next; diff --git a/src/core/transport/chttp2/frame_ping.h b/src/core/transport/chttp2/frame_ping.h index 70e19eb8ab..fb31147ed4 100644 --- a/src/core/transport/chttp2/frame_ping.h +++ b/src/core/transport/chttp2/frame_ping.h @@ -51,6 +51,6 @@ grpc_chttp2_parse_error grpc_chttp2_ping_parser_begin_frame( grpc_chttp2_parse_error grpc_chttp2_ping_parser_parse( void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last, - grpc_call_list *call_list); + grpc_closure_list *closure_list); #endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_PING_H */ diff --git a/src/core/transport/chttp2/frame_rst_stream.c b/src/core/transport/chttp2/frame_rst_stream.c index 7cf8abe88f..4d8323c11d 100644 --- a/src/core/transport/chttp2/frame_rst_stream.c +++ b/src/core/transport/chttp2/frame_rst_stream.c @@ -73,7 +73,7 @@ grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_begin_frame( grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_parse( void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { gpr_uint8 *const beg = GPR_SLICE_START_PTR(slice); gpr_uint8 *const end = GPR_SLICE_END_PTR(slice); gpr_uint8 *cur = beg; diff --git a/src/core/transport/chttp2/frame_rst_stream.h b/src/core/transport/chttp2/frame_rst_stream.h index 17d57fae5e..05d9e560f4 100644 --- a/src/core/transport/chttp2/frame_rst_stream.h +++ b/src/core/transport/chttp2/frame_rst_stream.h @@ -50,6 +50,6 @@ grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_begin_frame( grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_parse( void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last, - grpc_call_list *call_list); + grpc_closure_list *closure_list); #endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_RST_STREAM_H */ diff --git a/src/core/transport/chttp2/frame_settings.c b/src/core/transport/chttp2/frame_settings.c index 78bd4bb09d..235ff5a352 100644 --- a/src/core/transport/chttp2/frame_settings.c +++ b/src/core/transport/chttp2/frame_settings.c @@ -140,7 +140,7 @@ grpc_chttp2_parse_error grpc_chttp2_settings_parser_begin_frame( grpc_chttp2_parse_error grpc_chttp2_settings_parser_parse( void *p, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { grpc_chttp2_settings_parser *parser = p; const gpr_uint8 *cur = GPR_SLICE_START_PTR(slice); const gpr_uint8 *end = GPR_SLICE_END_PTR(slice); diff --git a/src/core/transport/chttp2/frame_settings.h b/src/core/transport/chttp2/frame_settings.h index a04a28b7da..7931b8b2ca 100644 --- a/src/core/transport/chttp2/frame_settings.h +++ b/src/core/transport/chttp2/frame_settings.h @@ -97,6 +97,6 @@ grpc_chttp2_parse_error grpc_chttp2_settings_parser_begin_frame( grpc_chttp2_parse_error grpc_chttp2_settings_parser_parse( void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last, - grpc_call_list *call_list); + grpc_closure_list *closure_list); #endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_SETTINGS_H */ diff --git a/src/core/transport/chttp2/frame_window_update.c b/src/core/transport/chttp2/frame_window_update.c index 51eb261346..b0f5f8698b 100644 --- a/src/core/transport/chttp2/frame_window_update.c +++ b/src/core/transport/chttp2/frame_window_update.c @@ -76,7 +76,7 @@ grpc_chttp2_parse_error grpc_chttp2_window_update_parser_begin_frame( grpc_chttp2_parse_error grpc_chttp2_window_update_parser_parse( void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { gpr_uint8 *const beg = GPR_SLICE_START_PTR(slice); gpr_uint8 *const end = GPR_SLICE_END_PTR(slice); gpr_uint8 *cur = beg; diff --git a/src/core/transport/chttp2/frame_window_update.h b/src/core/transport/chttp2/frame_window_update.h index 7f1168e551..08931044e4 100644 --- a/src/core/transport/chttp2/frame_window_update.h +++ b/src/core/transport/chttp2/frame_window_update.h @@ -53,6 +53,6 @@ grpc_chttp2_parse_error grpc_chttp2_window_update_parser_begin_frame( grpc_chttp2_parse_error grpc_chttp2_window_update_parser_parse( void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last, - grpc_call_list *call_list); + grpc_closure_list *closure_list); #endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_WINDOW_UPDATE_H */ diff --git a/src/core/transport/chttp2/hpack_parser.c b/src/core/transport/chttp2/hpack_parser.c index e3b8e54e8d..e66a918fcd 100644 --- a/src/core/transport/chttp2/hpack_parser.c +++ b/src/core/transport/chttp2/hpack_parser.c @@ -1380,7 +1380,7 @@ int grpc_chttp2_hpack_parser_parse(grpc_chttp2_hpack_parser *p, grpc_chttp2_parse_error grpc_chttp2_header_parser_parse( void *hpack_parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { grpc_chttp2_hpack_parser *parser = hpack_parser; if (!grpc_chttp2_hpack_parser_parse(parser, GPR_SLICE_START_PTR(slice), GPR_SLICE_END_PTR(slice))) { diff --git a/src/core/transport/chttp2/hpack_parser.h b/src/core/transport/chttp2/hpack_parser.h index c9ae6a9767..946f8e34ef 100644 --- a/src/core/transport/chttp2/hpack_parser.h +++ b/src/core/transport/chttp2/hpack_parser.h @@ -110,6 +110,6 @@ int grpc_chttp2_hpack_parser_parse(grpc_chttp2_hpack_parser *p, grpc_chttp2_parse_error grpc_chttp2_header_parser_parse( void *hpack_parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last, - grpc_call_list *call_list); + grpc_closure_list *closure_list); #endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_HPACK_PARSER_H */ diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index b9dbbc25ee..14a6c909ee 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -268,7 +268,7 @@ struct grpc_chttp2_transport_parsing { grpc_chttp2_parse_error (*parser)( void *parser_user_data, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last, - grpc_call_list *call_list); + grpc_closure_list *closure_list); /* received settings */ gpr_uint32 settings[GRPC_CHTTP2_NUM_SETTINGS]; @@ -469,22 +469,22 @@ int grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport_global *global, grpc_chttp2_transport_writing *writing); void grpc_chttp2_perform_writes( grpc_chttp2_transport_writing *transport_writing, grpc_endpoint *endpoint, - grpc_call_list *call_list); + grpc_closure_list *closure_list); void grpc_chttp2_terminate_writing(void *transport_writing, int success, - grpc_call_list *call_list); + grpc_closure_list *closure_list); void grpc_chttp2_cleanup_writing(grpc_chttp2_transport_global *global, grpc_chttp2_transport_writing *writing, - grpc_call_list *call_list); + grpc_closure_list *closure_list); void grpc_chttp2_prepare_to_read(grpc_chttp2_transport_global *global, grpc_chttp2_transport_parsing *parsing); /** Process one slice of incoming data; return 1 if the connection is still viable after reading, or 0 if the connection should be torn down */ int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, - gpr_slice slice, grpc_call_list *call_list); + gpr_slice slice, grpc_closure_list *closure_list); void grpc_chttp2_publish_reads(grpc_chttp2_transport_global *global, grpc_chttp2_transport_parsing *parsing, - grpc_call_list *call_list); + grpc_closure_list *closure_list); /** Get a writable stream returns non-zero if there was a stream available */ @@ -577,7 +577,7 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream( void grpc_chttp2_add_incoming_goaway( grpc_chttp2_transport_global *transport_global, gpr_uint32 goaway_error, - gpr_slice goaway_text, grpc_call_list *call_list); + gpr_slice goaway_text, grpc_closure_list *closure_list); void grpc_chttp2_register_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *s); diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c index 2d95963f1f..f6ce35f9ac 100644 --- a/src/core/transport/chttp2/parsing.c +++ b/src/core/transport/chttp2/parsing.c @@ -60,7 +60,7 @@ static int init_skip_frame_parser( static int parse_frame_slice(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice slice, int is_last, - grpc_call_list *call_list); + grpc_closure_list *closure_list); void grpc_chttp2_prepare_to_read( grpc_chttp2_transport_global *transport_global, @@ -93,7 +93,7 @@ void grpc_chttp2_prepare_to_read( void grpc_chttp2_publish_reads(grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_parsing *transport_parsing, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { grpc_chttp2_stream_global *stream_global; grpc_chttp2_stream_parsing *stream_parsing; @@ -131,9 +131,9 @@ void grpc_chttp2_publish_reads(grpc_chttp2_transport_global *transport_global, /* move goaway to the global state if we received one (it will be published later */ if (transport_parsing->goaway_received) { - grpc_chttp2_add_incoming_goaway(transport_global, - (gpr_uint32)transport_parsing->goaway_error, - transport_parsing->goaway_text, call_list); + grpc_chttp2_add_incoming_goaway( + transport_global, (gpr_uint32)transport_parsing->goaway_error, + transport_parsing->goaway_text, closure_list); transport_parsing->goaway_text = gpr_empty_slice(); transport_parsing->goaway_received = 0; } @@ -236,7 +236,7 @@ void grpc_chttp2_publish_reads(grpc_chttp2_transport_global *transport_global, } int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, - gpr_slice slice, grpc_call_list *call_list) { + gpr_slice slice, grpc_closure_list *closure_list) { gpr_uint8 *beg = GPR_SLICE_START_PTR(slice); gpr_uint8 *end = GPR_SLICE_END_PTR(slice); gpr_uint8 *cur = beg; @@ -366,7 +366,7 @@ int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, } if (transport_parsing->incoming_frame_size == 0) { if (!parse_frame_slice(transport_parsing, gpr_empty_slice(), 1, - call_list)) { + closure_list)) { return 0; } transport_parsing->incoming_stream = NULL; @@ -386,7 +386,7 @@ int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, if (!parse_frame_slice(transport_parsing, gpr_slice_sub_no_ref(slice, (size_t)(cur - beg), (size_t)(end - beg)), - 1, call_list)) { + 1, closure_list)) { return 0; } transport_parsing->deframe_state = GRPC_DTS_FH_0; @@ -400,7 +400,7 @@ int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice_sub_no_ref( slice, cur_offset, cur_offset + transport_parsing->incoming_frame_size), - 1, call_list)) { + 1, closure_list)) { return 0; } cur += transport_parsing->incoming_frame_size; @@ -410,7 +410,7 @@ int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, if (!parse_frame_slice(transport_parsing, gpr_slice_sub_no_ref(slice, (size_t)(cur - beg), (size_t)(end - beg)), - 0, call_list)) { + 0, closure_list)) { return 0; } transport_parsing->incoming_frame_size -= (gpr_uint32)(end - cur); @@ -473,7 +473,7 @@ static int init_frame_parser(grpc_chttp2_transport_parsing *transport_parsing) { static grpc_chttp2_parse_error skip_parser( void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { return GRPC_CHTTP2_PARSE_OK; } @@ -793,12 +793,12 @@ static int is_window_update_legal(gpr_int64 window_update, gpr_int64 window) { static int parse_frame_slice(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice slice, int is_last, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { grpc_chttp2_stream_parsing *stream_parsing = transport_parsing->incoming_stream; switch (transport_parsing->parser(transport_parsing->parser_data, transport_parsing, stream_parsing, slice, - is_last, call_list)) { + is_last, closure_list)) { case GRPC_CHTTP2_PARSE_OK: if (stream_parsing) { grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c index 18f4bfbc77..e6f169c280 100644 --- a/src/core/transport/chttp2/writing.c +++ b/src/core/transport/chttp2/writing.c @@ -164,7 +164,7 @@ int grpc_chttp2_unlocking_check_writes( void grpc_chttp2_perform_writes( grpc_chttp2_transport_writing *transport_writing, grpc_endpoint *endpoint, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { GPR_ASSERT(transport_writing->outbuf.count > 0 || grpc_chttp2_list_have_writing_streams(transport_writing)); @@ -174,7 +174,7 @@ void grpc_chttp2_perform_writes( GPR_ASSERT(endpoint); grpc_endpoint_write(endpoint, &transport_writing->outbuf, - &transport_writing->done_cb, call_list); + &transport_writing->done_cb, closure_list); } static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) { @@ -213,7 +213,7 @@ static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) { void grpc_chttp2_cleanup_writing( grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { grpc_chttp2_stream_writing *stream_writing; grpc_chttp2_stream_global *stream_global; @@ -231,7 +231,8 @@ void grpc_chttp2_cleanup_writing( stream_global->outgoing_sopb->nops == 0) { GPR_ASSERT(stream_global->write_state != GRPC_WRITE_STATE_QUEUED_CLOSE); stream_global->outgoing_sopb = NULL; - grpc_call_list_add(call_list, stream_global->send_done_closure, 1); + grpc_closure_list_add(closure_list, stream_global->send_done_closure, + 1); } } stream_global->writing_now = 0; diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index acd7cbdc1d..766b55b5fa 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -78,31 +78,31 @@ int grpc_flowctl_trace = 0; static const grpc_transport_vtable vtable; static void lock(grpc_chttp2_transport *t); -static void unlock(grpc_chttp2_transport *t, grpc_call_list *call_list); +static void unlock(grpc_chttp2_transport *t, grpc_closure_list *closure_list); static void unlock_check_read_write_state(grpc_chttp2_transport *t, - grpc_call_list *call_list); + grpc_closure_list *closure_list); /* forward declarations of various callbacks that we'll build closures around */ static void writing_action(void *t, int iomgr_success_ignored, - grpc_call_list *call_list); + grpc_closure_list *closure_list); /** Set a transport level setting, and push it to our peer */ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, gpr_uint32 value); /** Endpoint callback to process incoming data */ -static void recv_data(void *tp, int success, grpc_call_list *call_list); +static void recv_data(void *tp, int success, grpc_closure_list *closure_list); /** Start disconnection chain */ static void drop_connection(grpc_chttp2_transport *t, - grpc_call_list *call_list); + grpc_closure_list *closure_list); /** Perform a transport_op */ static void perform_stream_op_locked( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, grpc_transport_stream_op *op, - grpc_call_list *call_list); + grpc_closure_list *closure_list); /** Cancel a stream: coming from the transport API */ static void cancel_from_api(grpc_chttp2_transport_global *transport_global, @@ -117,26 +117,27 @@ static void close_from_api(grpc_chttp2_transport_global *transport_global, /** Add endpoint from this transport to pollset */ static void add_to_pollset_locked(grpc_chttp2_transport *t, grpc_pollset *pollset, - grpc_call_list *call_list); + grpc_closure_list *closure_list); static void add_to_pollset_set_locked(grpc_chttp2_transport *t, grpc_pollset_set *pollset_set, - grpc_call_list *call_list); + grpc_closure_list *closure_list); /** Start new streams that have been created if we can */ static void maybe_start_some_streams( - grpc_chttp2_transport_global *transport_global, grpc_call_list *call_list); + grpc_chttp2_transport_global *transport_global, + grpc_closure_list *closure_list); static void connectivity_state_set( grpc_chttp2_transport_global *transport_global, grpc_connectivity_state state, const char *reason, - grpc_call_list *call_list); + grpc_closure_list *closure_list); /* * CONSTRUCTION/DESTRUCTION/REFCOUNTING */ static void destruct_transport(grpc_chttp2_transport *t, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { size_t i; gpr_mu_lock(&t->mu); @@ -166,7 +167,7 @@ static void destruct_transport(grpc_chttp2_transport *t, grpc_chttp2_stream_map_destroy(&t->parsing_stream_map); grpc_chttp2_stream_map_destroy(&t->new_stream_map); grpc_connectivity_state_destroy(&t->channel_callback.state_tracker, - call_list); + closure_list); gpr_mu_unlock(&t->mu); gpr_mu_destroy(&t->mu); @@ -175,7 +176,7 @@ static void destruct_transport(grpc_chttp2_transport *t, and maybe they hold resources that need to be freed */ while (t->global.pings.next != &t->global.pings) { grpc_chttp2_outstanding_ping *ping = t->global.pings.next; - grpc_call_list_add(call_list, ping->on_recv, 0); + grpc_closure_list_add(closure_list, ping->on_recv, 0); ping->next->prev = ping->prev; ping->prev->next = ping->next; gpr_free(ping); @@ -190,12 +191,13 @@ static void destruct_transport(grpc_chttp2_transport *t, #ifdef REFCOUNTING_DEBUG #define REF_TRANSPORT(t, r) ref_transport(t, r, __FILE__, __LINE__) #define UNREF_TRANSPORT(t, r, cl) unref_transport(t, cl, r, __FILE__, __LINE__) -static void unref_transport(grpc_chttp2_transport *t, grpc_call_list *call_list, - const char *reason, const char *file, int line) { +static void unref_transport(grpc_chttp2_transport *t, + grpc_closure_list *closure_list, const char *reason, + const char *file, int line) { gpr_log(GPR_DEBUG, "chttp2:unref:%p %d->%d %s [%s:%d]", t, t->refs.count, t->refs.count - 1, reason, file, line); if (!gpr_unref(&t->refs)) return; - destruct_transport(t, call_list); + destruct_transport(t, closure_list); } static void ref_transport(grpc_chttp2_transport *t, const char *reason, @@ -208,9 +210,9 @@ static void ref_transport(grpc_chttp2_transport *t, const char *reason, #define REF_TRANSPORT(t, r) ref_transport(t) #define UNREF_TRANSPORT(t, r, cl) unref_transport(t, cl) static void unref_transport(grpc_chttp2_transport *t, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { if (!gpr_unref(&t->refs)) return; - destruct_transport(t, call_list); + destruct_transport(t, closure_list); } static void ref_transport(grpc_chttp2_transport *t) { gpr_ref(&t->refs); } @@ -219,7 +221,8 @@ static void ref_transport(grpc_chttp2_transport *t) { gpr_ref(&t->refs); } static void init_transport(grpc_chttp2_transport *t, const grpc_channel_args *channel_args, grpc_endpoint *ep, grpc_mdctx *mdctx, - gpr_uint8 is_client, grpc_call_list *call_list) { + gpr_uint8 is_client, + grpc_closure_list *closure_list) { size_t i; int j; @@ -339,15 +342,16 @@ static void init_transport(grpc_chttp2_transport *t, } } -static void destroy_transport(grpc_transport *gt, grpc_call_list *call_list) { +static void destroy_transport(grpc_transport *gt, + grpc_closure_list *closure_list) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; lock(t); t->destroying = 1; - drop_connection(t, call_list); - unlock(t, call_list); + drop_connection(t, closure_list); + unlock(t, closure_list); - UNREF_TRANSPORT(t, "destroy", call_list); + UNREF_TRANSPORT(t, "destroy", closure_list); } /** block grpc_endpoint_shutdown being called until a paired @@ -358,41 +362,41 @@ static void prevent_endpoint_shutdown(grpc_chttp2_transport *t) { } static void allow_endpoint_shutdown_locked(grpc_chttp2_transport *t, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { if (gpr_unref(&t->shutdown_ep_refs)) { if (t->ep) { - grpc_endpoint_shutdown(t->ep, call_list); + grpc_endpoint_shutdown(t->ep, closure_list); } } } static void allow_endpoint_shutdown_unlocked(grpc_chttp2_transport *t, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { if (gpr_unref(&t->shutdown_ep_refs)) { gpr_mu_lock(&t->mu); if (t->ep) { - grpc_endpoint_shutdown(t->ep, call_list); + grpc_endpoint_shutdown(t->ep, closure_list); } gpr_mu_unlock(&t->mu); } } static void destroy_endpoint(grpc_chttp2_transport *t, - grpc_call_list *call_list) { - grpc_endpoint_destroy(t->ep, call_list); + grpc_closure_list *closure_list) { + grpc_endpoint_destroy(t->ep, closure_list); t->ep = NULL; /* safe because we'll still have the ref for write */ - UNREF_TRANSPORT(t, "disconnect", call_list); + UNREF_TRANSPORT(t, "disconnect", closure_list); } static void close_transport_locked(grpc_chttp2_transport *t, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { if (!t->closed) { t->closed = 1; connectivity_state_set(&t->global, GRPC_CHANNEL_FATAL_FAILURE, - "close_transport", call_list); + "close_transport", closure_list); if (t->ep) { - allow_endpoint_shutdown_locked(t, call_list); + allow_endpoint_shutdown_locked(t, closure_list); } } } @@ -400,7 +404,7 @@ static void close_transport_locked(grpc_chttp2_transport *t, static int init_stream(grpc_transport *gt, grpc_stream *gs, const void *server_data, grpc_transport_stream_op *initial_op, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; @@ -432,14 +436,14 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs, } if (initial_op) - perform_stream_op_locked(&t->global, &s->global, initial_op, call_list); - unlock(t, call_list); + perform_stream_op_locked(&t->global, &s->global, initial_op, closure_list); + unlock(t, closure_list); return 0; } static void destroy_stream(grpc_transport *gt, grpc_stream *gs, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; int i; @@ -450,7 +454,7 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs, s->global.id == 0); GPR_ASSERT(!s->global.in_stream_map); if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) { - close_transport_locked(t, call_list); + close_transport_locked(t, closure_list); } if (!t->parsing_active && s->global.id) { GPR_ASSERT(grpc_chttp2_stream_map_find(&t->parsing_stream_map, @@ -480,7 +484,7 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs, grpc_chttp2_incoming_metadata_live_op_buffer_end( &s->global.outstanding_metadata); - UNREF_TRANSPORT(t, "stream", call_list); + UNREF_TRANSPORT(t, "stream", closure_list); } grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream( @@ -515,13 +519,13 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream( static void lock(grpc_chttp2_transport *t) { gpr_mu_lock(&t->mu); } -static void unlock(grpc_chttp2_transport *t, grpc_call_list *call_list) { - unlock_check_read_write_state(t, call_list); +static void unlock(grpc_chttp2_transport *t, grpc_closure_list *closure_list) { + unlock_check_read_write_state(t, closure_list); if (!t->writing_active && !t->closed && grpc_chttp2_unlocking_check_writes(&t->global, &t->writing)) { t->writing_active = 1; REF_TRANSPORT(t, "writing"); - grpc_call_list_add(call_list, &t->writing_action, 1); + grpc_closure_list_add(closure_list, &t->writing_action, 1); prevent_endpoint_shutdown(t); } @@ -548,53 +552,54 @@ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, } void grpc_chttp2_terminate_writing(void *transport_writing_ptr, int success, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { grpc_chttp2_transport_writing *transport_writing = transport_writing_ptr; grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing); lock(t); - allow_endpoint_shutdown_locked(t, call_list); + allow_endpoint_shutdown_locked(t, closure_list); if (!success) { - drop_connection(t, call_list); + drop_connection(t, closure_list); } /* cleanup writing related jazz */ - grpc_chttp2_cleanup_writing(&t->global, &t->writing, call_list); + grpc_chttp2_cleanup_writing(&t->global, &t->writing, closure_list); /* leave the writing flag up on shutdown to prevent further writes in unlock() from starting */ t->writing_active = 0; if (t->ep && !t->endpoint_reading) { - destroy_endpoint(t, call_list); + destroy_endpoint(t, closure_list); } - unlock(t, call_list); + unlock(t, closure_list); - UNREF_TRANSPORT(t, "writing", call_list); + UNREF_TRANSPORT(t, "writing", closure_list); } static void writing_action(void *gt, int iomgr_success_ignored, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { grpc_chttp2_transport *t = gt; - grpc_chttp2_perform_writes(&t->writing, t->ep, call_list); + grpc_chttp2_perform_writes(&t->writing, t->ep, closure_list); } void grpc_chttp2_add_incoming_goaway( grpc_chttp2_transport_global *transport_global, gpr_uint32 goaway_error, - gpr_slice goaway_text, grpc_call_list *call_list) { + gpr_slice goaway_text, grpc_closure_list *closure_list) { char *msg = gpr_dump_slice(goaway_text, GPR_DUMP_HEX | GPR_DUMP_ASCII); gpr_log(GPR_DEBUG, "got goaway [%d]: %s", goaway_error, msg); gpr_free(msg); gpr_slice_unref(goaway_text); transport_global->seen_goaway = 1; connectivity_state_set(transport_global, GRPC_CHANNEL_FATAL_FAILURE, - "got_goaway", call_list); + "got_goaway", closure_list); } static void maybe_start_some_streams( - grpc_chttp2_transport_global *transport_global, grpc_call_list *call_list) { + grpc_chttp2_transport_global *transport_global, + grpc_closure_list *closure_list) { grpc_chttp2_stream_global *stream_global; /* start streams where we have free grpc_chttp2_stream ids and free * concurrency */ @@ -616,7 +621,7 @@ static void maybe_start_some_streams( if (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID) { connectivity_state_set(transport_global, GRPC_CHANNEL_TRANSIENT_FAILURE, - "no_more_stream_ids", call_list); + "no_more_stream_ids", closure_list); } stream_global->outgoing_window = @@ -647,7 +652,7 @@ static void maybe_start_some_streams( static void perform_stream_op_locked( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, grpc_transport_stream_op *op, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { if (op->cancel_with_status != GRPC_STATUS_OK) { cancel_from_api(transport_global, stream_global, op->cancel_with_status); } @@ -674,13 +679,13 @@ static void perform_stream_op_locked( transport_global->is_client ? "CLI" : "SVR", stream_global)); grpc_chttp2_list_add_waiting_for_concurrency(transport_global, stream_global); - maybe_start_some_streams(transport_global, call_list); + maybe_start_some_streams(transport_global, closure_list); } else if (stream_global->outgoing_window > 0) { grpc_chttp2_list_add_writable_stream(transport_global, stream_global); } } else { grpc_sopb_reset(op->send_ops); - grpc_call_list_add(call_list, stream_global->send_done_closure, 0); + grpc_closure_list_add(closure_list, stream_global->send_done_closure, 0); } } @@ -715,21 +720,21 @@ static void perform_stream_op_locked( if (op->bind_pollset) { add_to_pollset_locked(TRANSPORT_FROM_GLOBAL(transport_global), - op->bind_pollset, call_list); + op->bind_pollset, closure_list); } - grpc_call_list_add(call_list, op->on_consumed, 1); + grpc_closure_list_add(closure_list, op->on_consumed, 1); } static void perform_stream_op(grpc_transport *gt, grpc_stream *gs, grpc_transport_stream_op *op, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; lock(t); - perform_stream_op_locked(&t->global, &s->global, op, call_list); - unlock(t, call_list); + perform_stream_op_locked(&t->global, &s->global, op, closure_list); + unlock(t, closure_list); } static void send_ping_locked(grpc_chttp2_transport *t, grpc_closure *on_recv) { @@ -750,18 +755,18 @@ static void send_ping_locked(grpc_chttp2_transport *t, grpc_closure *on_recv) { } static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; int close_transport = 0; lock(t); - grpc_call_list_add(call_list, op->on_consumed, 1); + grpc_closure_list_add(closure_list, op->on_consumed, 1); if (op->on_connectivity_state_change) { grpc_connectivity_state_notify_on_state_change( &t->channel_callback.state_tracker, op->connectivity_state, - op->on_connectivity_state_change, call_list); + op->on_connectivity_state_change, closure_list); } if (op->send_goaway) { @@ -780,11 +785,11 @@ static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op, } if (op->bind_pollset) { - add_to_pollset_locked(t, op->bind_pollset, call_list); + add_to_pollset_locked(t, op->bind_pollset, closure_list); } if (op->bind_pollset_set) { - add_to_pollset_set_locked(t, op->bind_pollset_set, call_list); + add_to_pollset_set_locked(t, op->bind_pollset_set, closure_list); } if (op->send_ping) { @@ -792,15 +797,15 @@ static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op, } if (op->disconnect) { - close_transport_locked(t, call_list); + close_transport_locked(t, closure_list); } - unlock(t, call_list); + unlock(t, closure_list); if (close_transport) { lock(t); - close_transport_locked(t, call_list); - unlock(t, call_list); + close_transport_locked(t, closure_list); + unlock(t, closure_list); } } @@ -817,7 +822,7 @@ static grpc_stream_state compute_state(gpr_uint8 write_closed, } static void remove_stream(grpc_chttp2_transport *t, gpr_uint32 id, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { size_t new_stream_count; grpc_chttp2_stream *s = grpc_chttp2_stream_map_delete(&t->parsing_stream_map, id); @@ -832,7 +837,7 @@ static void remove_stream(grpc_chttp2_transport *t, gpr_uint32 id, grpc_chttp2_parsing_become_skip_parser(&t->parsing); } if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) { - close_transport_locked(t, call_list); + close_transport_locked(t, closure_list); } new_stream_count = grpc_chttp2_stream_map_size(&t->parsing_stream_map) + @@ -840,12 +845,12 @@ static void remove_stream(grpc_chttp2_transport *t, gpr_uint32 id, GPR_ASSERT(new_stream_count <= GPR_UINT32_MAX); if (new_stream_count != t->global.concurrent_stream_count) { t->global.concurrent_stream_count = (gpr_uint32)new_stream_count; - maybe_start_some_streams(&t->global, call_list); + maybe_start_some_streams(&t->global, closure_list); } } static void unlock_check_read_write_state(grpc_chttp2_transport *t, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { grpc_chttp2_transport_global *transport_global = &t->global; grpc_chttp2_stream_global *stream_global; grpc_stream_state state; @@ -859,7 +864,7 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t, GPR_ASSERT(stream_global->in_stream_map); GPR_ASSERT(stream_global->write_state != GRPC_WRITE_STATE_OPEN); GPR_ASSERT(stream_global->read_closed); - remove_stream(t, stream_global->id, call_list); + remove_stream(t, stream_global->id, closure_list); grpc_chttp2_list_add_read_write_state_changed(transport_global, stream_global); } @@ -885,7 +890,8 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t, if (stream_global->outgoing_sopb != NULL) { grpc_sopb_reset(stream_global->outgoing_sopb); stream_global->outgoing_sopb = NULL; - grpc_call_list_add(call_list, stream_global->send_done_closure, 1); + grpc_closure_list_add(closure_list, stream_global->send_done_closure, + 1); } stream_global->read_closed = 1; if (!stream_global->published_cancelled) { @@ -907,7 +913,7 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t, grpc_chttp2_list_add_closed_waiting_for_parsing(transport_global, stream_global); } else { - remove_stream(t, stream_global->id, call_list); + remove_stream(t, stream_global->id, closure_list); } } if (!stream_global->publish_sopb) { @@ -935,7 +941,7 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t, &stream_global->outstanding_metadata); grpc_sopb_swap(stream_global->publish_sopb, &stream_global->incoming_sopb); stream_global->published_state = *stream_global->publish_state = state; - grpc_call_list_add(call_list, stream_global->recv_done_closure, 1); + grpc_closure_list_add(closure_list, stream_global->recv_done_closure, 1); stream_global->recv_done_closure = NULL; stream_global->publish_sopb = NULL; stream_global->publish_state = NULL; @@ -1071,8 +1077,8 @@ static void end_all_the_calls(grpc_chttp2_transport *t) { } static void drop_connection(grpc_chttp2_transport *t, - grpc_call_list *call_list) { - close_transport_locked(t, call_list); + grpc_closure_list *closure_list) { + close_transport_locked(t, closure_list); end_all_the_calls(t); } @@ -1098,15 +1104,15 @@ static void update_global_window(void *args, gpr_uint32 id, void *stream) { } static void read_error_locked(grpc_chttp2_transport *t, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { t->endpoint_reading = 0; if (!t->writing_active && t->ep) { - destroy_endpoint(t, call_list); + destroy_endpoint(t, closure_list); } } /* tcp read callback */ -static void recv_data(void *tp, int success, grpc_call_list *call_list) { +static void recv_data(void *tp, int success, grpc_closure_list *closure_list) { size_t i; int keep_reading = 0; grpc_chttp2_transport *t = tp; @@ -1123,12 +1129,12 @@ static void recv_data(void *tp, int success, grpc_call_list *call_list) { gpr_mu_unlock(&t->mu); for (; i < t->read_buffer.count && grpc_chttp2_perform_read(&t->parsing, t->read_buffer.slices[i], - call_list); + closure_list); i++) ; gpr_mu_lock(&t->mu); if (i != t->read_buffer.count) { - drop_connection(t, call_list); + drop_connection(t, closure_list); } /* merge stream lists */ grpc_chttp2_stream_map_move_into(&t->new_stream_map, @@ -1141,26 +1147,26 @@ static void recv_data(void *tp, int success, grpc_call_list *call_list) { t->parsing.initial_window_update = 0; } /* handle higher level things */ - grpc_chttp2_publish_reads(&t->global, &t->parsing, call_list); + grpc_chttp2_publish_reads(&t->global, &t->parsing, closure_list); t->parsing_active = 0; } if (!success || i != t->read_buffer.count) { - drop_connection(t, call_list); - read_error_locked(t, call_list); + drop_connection(t, closure_list); + read_error_locked(t, closure_list); } else if (!t->closed) { keep_reading = 1; REF_TRANSPORT(t, "keep_reading"); prevent_endpoint_shutdown(t); } gpr_slice_buffer_reset_and_unref(&t->read_buffer); - unlock(t, call_list); + unlock(t, closure_list); if (keep_reading) { - grpc_endpoint_read(t->ep, &t->read_buffer, &t->recv_data, call_list); - allow_endpoint_shutdown_unlocked(t, call_list); - UNREF_TRANSPORT(t, "keep_reading", call_list); + grpc_endpoint_read(t->ep, &t->read_buffer, &t->recv_data, closure_list); + allow_endpoint_shutdown_unlocked(t, closure_list); + UNREF_TRANSPORT(t, "keep_reading", closure_list); } else { - UNREF_TRANSPORT(t, "recv_data", call_list); + UNREF_TRANSPORT(t, "recv_data", closure_list); } } @@ -1171,12 +1177,12 @@ static void recv_data(void *tp, int success, grpc_call_list *call_list) { static void connectivity_state_set( grpc_chttp2_transport_global *transport_global, grpc_connectivity_state state, const char *reason, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { GRPC_CHTTP2_IF_TRACING( gpr_log(GPR_DEBUG, "set connectivity_state=%d", state)); grpc_connectivity_state_set( &TRANSPORT_FROM_GLOBAL(transport_global)->channel_callback.state_tracker, - state, reason, call_list); + state, reason, closure_list); } /* @@ -1185,17 +1191,17 @@ static void connectivity_state_set( static void add_to_pollset_locked(grpc_chttp2_transport *t, grpc_pollset *pollset, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { if (t->ep) { - grpc_endpoint_add_to_pollset(t->ep, pollset, call_list); + grpc_endpoint_add_to_pollset(t->ep, pollset, closure_list); } } static void add_to_pollset_set_locked(grpc_chttp2_transport *t, grpc_pollset_set *pollset_set, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { if (t->ep) { - grpc_endpoint_add_to_pollset_set(t->ep, pollset_set, call_list); + grpc_endpoint_add_to_pollset_set(t->ep, pollset_set, closure_list); } } @@ -1234,7 +1240,8 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *reason, * INTEGRATION GLUE */ -static char *chttp2_get_peer(grpc_transport *t, grpc_call_list *call_list) { +static char *chttp2_get_peer(grpc_transport *t, + grpc_closure_list *closure_list) { return gpr_strdup(((grpc_chttp2_transport *)t)->peer_string); } @@ -1248,17 +1255,17 @@ static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream), grpc_transport *grpc_create_chttp2_transport( const grpc_channel_args *channel_args, grpc_endpoint *ep, grpc_mdctx *mdctx, - int is_client, grpc_call_list *call_list) { + int is_client, grpc_closure_list *closure_list) { grpc_chttp2_transport *t = gpr_malloc(sizeof(grpc_chttp2_transport)); - init_transport(t, channel_args, ep, mdctx, is_client != 0, call_list); + init_transport(t, channel_args, ep, mdctx, is_client != 0, closure_list); return &t->base; } void grpc_chttp2_transport_start_reading(grpc_transport *transport, gpr_slice *slices, size_t nslices, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)transport; REF_TRANSPORT(t, "recv_data"); /* matches unref inside recv_data */ gpr_slice_buffer_addn(&t->read_buffer, slices, nslices); - recv_data(t, 1, call_list); + recv_data(t, 1, closure_list); } diff --git a/src/core/transport/chttp2_transport.h b/src/core/transport/chttp2_transport.h index e963e16707..93a654bc83 100644 --- a/src/core/transport/chttp2_transport.h +++ b/src/core/transport/chttp2_transport.h @@ -42,10 +42,11 @@ extern int grpc_flowctl_trace; grpc_transport *grpc_create_chttp2_transport( const grpc_channel_args *channel_args, grpc_endpoint *ep, - grpc_mdctx *metadata_context, int is_client, grpc_call_list *call_list); + grpc_mdctx *metadata_context, int is_client, + grpc_closure_list *closure_list); void grpc_chttp2_transport_start_reading(grpc_transport *transport, gpr_slice *slices, size_t nslices, - grpc_call_list *call_list); + grpc_closure_list *closure_list); #endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_TRANSPORT_H */ diff --git a/src/core/transport/connectivity_state.c b/src/core/transport/connectivity_state.c index d04c3a2209..638ec62e73 100644 --- a/src/core/transport/connectivity_state.c +++ b/src/core/transport/connectivity_state.c @@ -67,7 +67,7 @@ void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker, } void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { int success; grpc_connectivity_state_watcher *w; while ((w = tracker->watchers)) { @@ -79,7 +79,7 @@ void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker, } else { success = 0; } - grpc_call_list_add(call_list, w->notify, success); + grpc_closure_list_add(closure_list, w->notify, success); gpr_free(w); } gpr_free(tracker->name); @@ -96,7 +96,7 @@ grpc_connectivity_state grpc_connectivity_state_check( int grpc_connectivity_state_notify_on_state_change( grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current, - grpc_closure *notify, grpc_call_list *call_list) { + grpc_closure *notify, grpc_closure_list *closure_list) { if (grpc_connectivity_state_trace) { gpr_log(GPR_DEBUG, "CONWATCH: %s: from %s [cur=%s] notify=%p", tracker->name, grpc_connectivity_state_name(*current), @@ -104,7 +104,7 @@ int grpc_connectivity_state_notify_on_state_change( } if (tracker->current_state != *current) { *current = tracker->current_state; - grpc_call_list_add(call_list, notify, 1); + grpc_closure_list_add(closure_list, notify, 1); } else { grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w)); w->current = current; @@ -118,7 +118,7 @@ int grpc_connectivity_state_notify_on_state_change( void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state, const char *reason, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { grpc_connectivity_state_watcher *w; if (grpc_connectivity_state_trace) { gpr_log(GPR_DEBUG, "SET: %s: %s --> %s [%s]", tracker->name, @@ -133,7 +133,7 @@ void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker, while ((w = tracker->watchers) != NULL) { *w->current = tracker->current_state; tracker->watchers = w->next; - grpc_call_list_add(call_list, w->notify, 1); + grpc_closure_list_add(closure_list, w->notify, 1); gpr_free(w); } } diff --git a/src/core/transport/connectivity_state.h b/src/core/transport/connectivity_state.h index 6ade8e19f7..26f8874fbc 100644 --- a/src/core/transport/connectivity_state.h +++ b/src/core/transport/connectivity_state.h @@ -61,13 +61,14 @@ void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state init_state, const char *name); void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker, - grpc_call_list *call_list); + grpc_closure_list *closure_list); /** Set connectivity state; not thread safe; access must be serialized with an * external lock */ void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state, - const char *reason, grpc_call_list *call_list); + const char *reason, + grpc_closure_list *closure_list); grpc_connectivity_state grpc_connectivity_state_check( grpc_connectivity_state_tracker *tracker); @@ -75,6 +76,6 @@ grpc_connectivity_state grpc_connectivity_state_check( /** Return 1 if the channel should start connecting, 0 otherwise */ int grpc_connectivity_state_notify_on_state_change( grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current, - grpc_closure *notify, grpc_call_list *call_list); + grpc_closure *notify, grpc_closure_list *closure_list); #endif /* GRPC_INTERNAL_CORE_TRANSPORT_CONNECTIVITY_STATE_H */ diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c index f7c87c1e90..24d549a29b 100644 --- a/src/core/transport/transport.c +++ b/src/core/transport/transport.c @@ -41,46 +41,46 @@ size_t grpc_transport_stream_size(grpc_transport *transport) { } void grpc_transport_destroy(grpc_transport *transport, - grpc_call_list *call_list) { - transport->vtable->destroy(transport, call_list); + grpc_closure_list *closure_list) { + transport->vtable->destroy(transport, closure_list); } int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream, const void *server_data, grpc_transport_stream_op *initial_op, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { return transport->vtable->init_stream(transport, stream, server_data, - initial_op, call_list); + initial_op, closure_list); } void grpc_transport_perform_stream_op(grpc_transport *transport, grpc_stream *stream, grpc_transport_stream_op *op, - grpc_call_list *call_list) { - transport->vtable->perform_stream_op(transport, stream, op, call_list); + grpc_closure_list *closure_list) { + transport->vtable->perform_stream_op(transport, stream, op, closure_list); } void grpc_transport_perform_op(grpc_transport *transport, grpc_transport_op *op, - grpc_call_list *call_list) { - transport->vtable->perform_op(transport, op, call_list); + grpc_closure_list *closure_list) { + transport->vtable->perform_op(transport, op, closure_list); } void grpc_transport_destroy_stream(grpc_transport *transport, grpc_stream *stream, - grpc_call_list *call_list) { - transport->vtable->destroy_stream(transport, stream, call_list); + grpc_closure_list *closure_list) { + transport->vtable->destroy_stream(transport, stream, closure_list); } char *grpc_transport_get_peer(grpc_transport *transport, - grpc_call_list *call_list) { - return transport->vtable->get_peer(transport, call_list); + grpc_closure_list *closure_list) { + return transport->vtable->get_peer(transport, closure_list); } -void grpc_transport_stream_op_finish_with_failure(grpc_transport_stream_op *op, - grpc_call_list *call_list) { - grpc_call_list_add(call_list, op->on_done_recv, 0); - grpc_call_list_add(call_list, op->on_done_send, 0); - grpc_call_list_add(call_list, op->on_consumed, 0); +void grpc_transport_stream_op_finish_with_failure( + grpc_transport_stream_op *op, grpc_closure_list *closure_list) { + grpc_closure_list_add(closure_list, op->on_done_recv, 0); + grpc_closure_list_add(closure_list, op->on_done_send, 0); + grpc_closure_list_add(closure_list, op->on_consumed, 0); } void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op, @@ -105,11 +105,11 @@ typedef struct { } close_message_data; static void free_message(void *p, int iomgr_success, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { close_message_data *cmd = p; gpr_slice_unref(cmd->message); if (cmd->then_call != NULL) { - cmd->then_call->cb(cmd->then_call->cb_arg, iomgr_success, call_list); + cmd->then_call->cb(cmd->then_call->cb_arg, iomgr_success, closure_list); } gpr_free(cmd); } diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index d5383ad11a..1d6dd1e735 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -137,7 +137,7 @@ size_t grpc_transport_stream_size(grpc_transport *transport); int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream, const void *server_data, grpc_transport_stream_op *initial_op, - grpc_call_list *call_list); + grpc_closure_list *closure_list); /* Destroy transport data for a stream. @@ -151,10 +151,10 @@ int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream, caller, but any child memory must be cleaned up) */ void grpc_transport_destroy_stream(grpc_transport *transport, grpc_stream *stream, - grpc_call_list *call_list); + grpc_closure_list *closure_list); -void grpc_transport_stream_op_finish_with_failure(grpc_transport_stream_op *op, - grpc_call_list *call_list); +void grpc_transport_stream_op_finish_with_failure( + grpc_transport_stream_op *op, grpc_closure_list *closure_list); void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op, grpc_status_code status); @@ -177,10 +177,10 @@ char *grpc_transport_stream_op_string(grpc_transport_stream_op *op); void grpc_transport_perform_stream_op(grpc_transport *transport, grpc_stream *stream, grpc_transport_stream_op *op, - grpc_call_list *call_list); + grpc_closure_list *closure_list); void grpc_transport_perform_op(grpc_transport *transport, grpc_transport_op *op, - grpc_call_list *call_list); + grpc_closure_list *closure_list); /* Send a ping on a transport @@ -196,10 +196,10 @@ void grpc_transport_close(grpc_transport *transport); /* Destroy the transport */ void grpc_transport_destroy(grpc_transport *transport, - grpc_call_list *call_list); + grpc_closure_list *closure_list); /* Get the transports peer */ char *grpc_transport_get_peer(grpc_transport *transport, - grpc_call_list *call_list); + grpc_closure_list *closure_list); #endif /* GRPC_INTERNAL_CORE_TRANSPORT_TRANSPORT_H */ diff --git a/src/core/transport/transport_impl.h b/src/core/transport/transport_impl.h index 9adb16b941..c7eebc9bb2 100644 --- a/src/core/transport/transport_impl.h +++ b/src/core/transport/transport_impl.h @@ -45,26 +45,26 @@ typedef struct grpc_transport_vtable { int (*init_stream)(grpc_transport *self, grpc_stream *stream, const void *server_data, grpc_transport_stream_op *initial_op, - grpc_call_list *call_list); + grpc_closure_list *closure_list); /* implementation of grpc_transport_perform_stream_op */ void (*perform_stream_op)(grpc_transport *self, grpc_stream *stream, grpc_transport_stream_op *op, - grpc_call_list *call_list); + grpc_closure_list *closure_list); /* implementation of grpc_transport_perform_op */ void (*perform_op)(grpc_transport *self, grpc_transport_op *op, - grpc_call_list *call_list); + grpc_closure_list *closure_list); /* implementation of grpc_transport_destroy_stream */ void (*destroy_stream)(grpc_transport *self, grpc_stream *stream, - grpc_call_list *call_list); + grpc_closure_list *closure_list); /* implementation of grpc_transport_destroy */ - void (*destroy)(grpc_transport *self, grpc_call_list *call_list); + void (*destroy)(grpc_transport *self, grpc_closure_list *closure_list); /* implementation of grpc_transport_get_peer */ - char *(*get_peer)(grpc_transport *self, grpc_call_list *call_list); + char *(*get_peer)(grpc_transport *self, grpc_closure_list *closure_list); } grpc_transport_vtable; /* an instance of a grpc transport */ |