diff options
author | Craig Tiller <ctiller@google.com> | 2015-09-18 07:20:29 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2015-09-18 07:20:29 -0700 |
commit | 000cd8f9f7346defc79fe6aa877af11b42ab5f1e (patch) | |
tree | 883d73a97471f63e616d02c1e17efc62b099c8ad /src/core/transport/chttp2_transport.c | |
parent | 38adec97e875c21cd9d6cc9d039664bdf4fdb889 (diff) |
Introduce call lists for moving work outside locks
Diffstat (limited to 'src/core/transport/chttp2_transport.c')
-rw-r--r-- | src/core/transport/chttp2_transport.c | 67 |
1 files changed, 17 insertions, 50 deletions
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 6376c397a2..50d5a3e8a8 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -499,32 +499,20 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream( static void lock(grpc_chttp2_transport *t) { gpr_mu_lock(&t->mu); } static void unlock(grpc_chttp2_transport *t) { - grpc_iomgr_closure *run_closures; - grpc_connectivity_state_flusher f; + grpc_iomgr_call_list run = GRPC_IOMGR_CALL_LIST_INIT; unlock_check_read_write_state(t); if (!t->writing_active && !t->closed && grpc_chttp2_unlocking_check_writes(&t->global, &t->writing)) { t->writing_active = 1; REF_TRANSPORT(t, "writing"); - grpc_chttp2_schedule_closure(&t->global, &t->writing_action, 1); + grpc_iomgr_call_list_add(&t->global.run_at_unlock, &t->writing_action, 1); prevent_endpoint_shutdown(t); } - run_closures = t->global.pending_closures_head; - t->global.pending_closures_head = NULL; - t->global.pending_closures_tail = NULL; - - grpc_connectivity_state_begin_flush(&t->channel_callback.state_tracker, &f); + GPR_SWAP(grpc_iomgr_call_list, run, t->global.run_at_unlock); gpr_mu_unlock(&t->mu); - - grpc_connectivity_state_end_flush(&f); - - while (run_closures) { - grpc_iomgr_closure *next = run_closures->next; - run_closures->cb(run_closures->cb_arg, run_closures->success); - run_closures = next; - } + grpc_iomgr_call_list_run(run); } /* @@ -676,8 +664,8 @@ static void perform_stream_op_locked( } } else { grpc_sopb_reset(op->send_ops); - grpc_chttp2_schedule_closure(transport_global, - stream_global->send_done_closure, 0); + grpc_iomgr_call_list_add(&transport_global->run_at_unlock, + stream_global->send_done_closure, 0); } } @@ -715,9 +703,8 @@ static void perform_stream_op_locked( op->bind_pollset); } - if (op->on_consumed) { - grpc_chttp2_schedule_closure(transport_global, op->on_consumed, 1); - } + grpc_iomgr_call_list_add(&transport_global->run_at_unlock, op->on_consumed, + 1); } static void perform_stream_op(grpc_transport *gt, grpc_stream *gs, @@ -754,18 +741,12 @@ static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op) { lock(t); - if (op->on_consumed) { - grpc_chttp2_schedule_closure(&t->global, op->on_consumed, 1); - } + grpc_iomgr_call_list_add(&t->global.run_at_unlock, op->on_consumed, 1); if (op->on_connectivity_state_change) { - if (grpc_connectivity_state_notify_on_state_change( - &t->channel_callback.state_tracker, op->connectivity_state, - op->on_connectivity_state_change) - .state_already_changed) { - grpc_chttp2_schedule_closure(&t->global, op->on_connectivity_state_change, - 1); - } + grpc_connectivity_state_notify_on_state_change( + &t->channel_callback.state_tracker, op->connectivity_state, + op->on_connectivity_state_change, &t->global.run_at_unlock); } if (op->send_goaway) { @@ -887,8 +868,8 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) { if (stream_global->outgoing_sopb != NULL) { grpc_sopb_reset(stream_global->outgoing_sopb); stream_global->outgoing_sopb = NULL; - grpc_chttp2_schedule_closure(transport_global, - stream_global->send_done_closure, 1); + grpc_iomgr_call_list_add(&transport_global->run_at_unlock, + stream_global->send_done_closure, 1); } stream_global->read_closed = 1; if (!stream_global->published_cancelled) { @@ -938,8 +919,8 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) { &stream_global->outstanding_metadata); grpc_sopb_swap(stream_global->publish_sopb, &stream_global->incoming_sopb); stream_global->published_state = *stream_global->publish_state = state; - grpc_chttp2_schedule_closure(transport_global, - stream_global->recv_done_closure, 1); + grpc_iomgr_call_list_add(&transport_global->run_at_unlock, + stream_global->recv_done_closure, 1); stream_global->recv_done_closure = NULL; stream_global->publish_sopb = NULL; stream_global->publish_state = NULL; @@ -1200,21 +1181,7 @@ static void connectivity_state_set( gpr_log(GPR_DEBUG, "set connectivity_state=%d", state)); grpc_connectivity_state_set( &TRANSPORT_FROM_GLOBAL(transport_global)->channel_callback.state_tracker, - state, reason); -} - -void grpc_chttp2_schedule_closure( - grpc_chttp2_transport_global *transport_global, grpc_iomgr_closure *closure, - int success) { - closure->success = success; - if (transport_global->pending_closures_tail == NULL) { - transport_global->pending_closures_head = - transport_global->pending_closures_tail = closure; - } else { - transport_global->pending_closures_tail->next = closure; - transport_global->pending_closures_tail = closure; - } - closure->next = NULL; + state, reason, &transport_global->run_at_unlock); } /* |