diff options
author | Craig Tiller <ctiller@google.com> | 2015-06-11 16:26:03 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2015-06-11 16:26:03 -0700 |
commit | 99f8055965d8577d876763a70d5feb75f5752d24 (patch) | |
tree | 05b3cd61daa4e0659ffa5c5f2d4637fe5b764fb8 | |
parent | 3cbfcb4dcd3cbf785f7391a8e4c5f36e6615c9d4 (diff) |
Splitting progress
-rw-r--r-- | src/core/transport/chttp2_transport.c | 149 |
1 files changed, 111 insertions, 38 deletions
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index f3af5b1320..c4fa13c86c 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -221,8 +221,6 @@ typedef struct { struct transport { grpc_transport base; /* must be first */ - const grpc_transport_callbacks *cb; - void *cb_user_data; grpc_endpoint *ep; grpc_mdctx *metadata_context; gpr_refcount refs; @@ -233,10 +231,6 @@ struct transport { /* basic state management - what are we doing at the moment? */ gpr_uint8 reading; - gpr_uint8 parsing; - gpr_uint8 writing; - /** are we calling back (via cb) with a channel-level event */ - gpr_uint8 calling_back_channel; /** are we calling back any grpc_transport_op completion events */ gpr_uint8 calling_back_ops; gpr_uint8 destroying; @@ -304,9 +298,6 @@ struct transport { grpc_chttp2_parse_state *state, gpr_slice slice, int is_last); - gpr_slice_buffer outbuf; - gpr_slice_buffer qbuf; - stream_list lists[STREAM_LIST_COUNT]; grpc_chttp2_stream_map stream_map; @@ -318,9 +309,49 @@ struct transport { size_t ping_count; size_t ping_capacity; gpr_int64 ping_counter; + + struct { + /** data to write next write */ + gpr_slice_buffer qbuf; + } global; + + struct { + /** is a thread currently writing */ + gpr_uint8 executing; + /** closure to execute this action */ + grpc_iomgr_closure action; + /** data to write now */ + gpr_slice_buffer outbuf; + } writing; + + struct { + /** is a thread currently parsing */ + gpr_uint8 executing; + /** data to write later - after parsing */ + gpr_slice_buffer qbuf; + } parsing; + + struct { + /** is a thread currently performing channel callbacks */ + gpr_uint8 executing; + const grpc_transport_callbacks *cb; + void *cb_user_data; + } channel_callback; }; struct stream { + struct { + int unused; + } global; + + struct { + int unused; + } writing; + + struct { + int unused; + } parsing; + gpr_uint32 id; gpr_uint32 incoming_window; @@ -361,6 +392,13 @@ struct stream { grpc_stream_op_buffer callback_sopb; }; +#define MAX_POST_ACTIONS 8 + +typedef struct { + size_t num_post_actions; + grpc_iomgr_closure *post_actions[MAX_POST_ACTIONS]; +} unlock_ctx; + static const grpc_transport_vtable vtable; static void push_setting(transport *t, grpc_chttp2_setting_id id, @@ -376,6 +414,12 @@ static void perform_write(transport *t, grpc_endpoint *ep); static void lock(transport *t); static void unlock(transport *t); +static void unlock_check_writes(transport* t, unlock_ctx *uctx); + static void unlock_check_cancellations(transport* t, unlock_ctx *uctx); + static void unlock_check_parser(transport* t, unlock_ctx *uctx); + static void unlock_check_op_callbacks(transport* t, unlock_ctx *uctx); + static void unlock_check_channel_callbacks(transport* t, unlock_ctx *uctx); + static void drop_connection(transport *t); static void end_all_the_calls(transport *t); @@ -426,8 +470,9 @@ static void destruct_transport(transport *t) { GPR_ASSERT(t->ep == NULL); - gpr_slice_buffer_destroy(&t->outbuf); - gpr_slice_buffer_destroy(&t->qbuf); + gpr_slice_buffer_destroy(&t->global.qbuf); + gpr_slice_buffer_destroy(&t->writing.outbuf); + gpr_slice_buffer_destroy(&t->parsing.qbuf); grpc_chttp2_hpack_parser_destroy(&t->hpack_parser); grpc_chttp2_hpack_compressor_destroy(&t->hpack_compressor); grpc_chttp2_goaway_parser_destroy(&t->goaway_parser); @@ -509,12 +554,13 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup, t->ping_counter = gpr_now().tv_nsec; grpc_chttp2_hpack_compressor_init(&t->hpack_compressor, mdctx); grpc_chttp2_goaway_parser_init(&t->goaway_parser); - gpr_slice_buffer_init(&t->outbuf); - gpr_slice_buffer_init(&t->qbuf); + gpr_slice_buffer_init(&t->global.qbuf); + gpr_slice_buffer_init(&t->writing.outbuf); + gpr_slice_buffer_init(&t->parsing.qbuf); grpc_sopb_init(&t->nuke_later_sopb); grpc_chttp2_hpack_parser_init(&t->hpack_parser, t->metadata_context); if (is_client) { - gpr_slice_buffer_add(&t->qbuf, + gpr_slice_buffer_add(&t->global.qbuf, gpr_slice_from_copied_string(CLIENT_CONNECT_STRING)); } /* 8 is a random stab in the dark as to a good initial size: it's small enough @@ -575,16 +621,16 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup, } gpr_mu_lock(&t->mu); - t->calling_back_channel = 1; + t->channel_callback.executing = 1; ref_transport(t); /* matches unref at end of this function */ gpr_mu_unlock(&t->mu); sr = setup(arg, &t->base, t->metadata_context); lock(t); - t->cb = sr.callbacks; - t->cb_user_data = sr.user_data; - t->calling_back_channel = 0; + t->channel_callback.cb = sr.callbacks; + t->channel_callback.cb_user_data = sr.user_data; + t->channel_callback.executing = 0; if (t->destroying) gpr_cv_signal(&t->cv); unlock(t); @@ -605,7 +651,7 @@ static void destroy_transport(grpc_transport *gt) { We need to be not writing as cancellation finalization may produce some callbacks that NEED to be made to close out some streams when t->writing becomes 0. */ - while (t->calling_back_channel || t->writing) { + while (t->channel_callback.executing || t->writing.executing) { gpr_cv_wait(&t->cv, &t->mu, gpr_inf_future); } drop_connection(t); @@ -618,7 +664,7 @@ static void destroy_transport(grpc_transport *gt) { It's shutdown path, so I don't believe an extra lock pair is going to be problematic for performance. */ lock(t); - GPR_ASSERT(!t->cb); + GPR_ASSERT(!t->channel_callback.cb); unlock(t); unref_transport(t); @@ -646,7 +692,7 @@ static void goaway(grpc_transport *gt, grpc_status_code status, lock(t); grpc_chttp2_goaway_append(t->last_incoming_stream_id, grpc_chttp2_grpc_status_to_http2_error(status), - debug_data, &t->qbuf); + debug_data, &t->global.qbuf); unlock(t); } @@ -806,6 +852,26 @@ static void remove_from_stream_map(transport *t, stream *s) { static void lock(transport *t) { gpr_mu_lock(&t->mu); } static void unlock(transport *t) { + unlock_ctx uctx; + size_t i; + + memset(&uctx, 0, sizeof(uctx)); + + unlock_check_writes(t, &uctx); + unlock_check_cancellations(t, &uctx); + unlock_check_parser(t, &uctx); + unlock_check_op_callbacks(t, &uctx); + unlock_check_channel_callbacks(t, &uctx); + + gpr_mu_unlock(&t->mu); + + for (i = 0; i < uctx.num_post_actions; i++) { + grpc_iomgr_closure* closure = uctx.post_actions[i]; + closure->cb(closure->cb_arg, 1); + } + + +#if 0 int start_write = 0; int perform_callbacks = 0; int call_closed = 0; @@ -814,7 +880,7 @@ static void unlock(transport *t) { pending_goaway *goaways = NULL; grpc_endpoint *ep = t->ep; grpc_stream_op_buffer nuke_now; - const grpc_transport_callbacks *cb = t->cb; + const grpc_transport_callbacks *cb = t->channel_callback.cb; GRPC_TIMER_BEGIN(GRPC_PTAG_HTTP2_UNLOCK, 0); @@ -824,18 +890,18 @@ static void unlock(transport *t) { } /* see if we need to trigger a write - and if so, get the data ready */ - if (ep && !t->writing) { - t->writing = start_write = prepare_write(t); + if (ep && !t->writing.executing) { + t->writing.executing = start_write = prepare_write(t); if (start_write) { ref_transport(t); } } - if (!t->writing) { + if (!t->writing.executing) { finalize_cancellations(t); } - if (!t->parsing) { + if (!t->parsing.executing) { finish_reads(t); } @@ -845,8 +911,8 @@ static void unlock(transport *t) { if (perform_callbacks) ref_transport(t); } - if (!t->calling_back_channel && cb) { - if (t->error_state == ERROR_STATE_SEEN && !t->writing) { + if (!t->channel_callback.executing && cb) { + if (t->error_state == ERROR_STATE_SEEN && !t->writing.executing) { call_closed = 1; t->calling_back_channel = 1; t->cb = NULL; /* no more callbacks */ @@ -906,6 +972,7 @@ static void unlock(transport *t) { gpr_free(goaways); GRPC_TIMER_END(GRPC_PTAG_HTTP2_UNLOCK, 0); +#endif } /* @@ -927,19 +994,22 @@ static void push_setting(transport *t, grpc_chttp2_setting_id id, } } -static int prepare_write(transport *t) { +static void unlock_check_writes(transport *t, unlock_ctx *uctx) { stream *s; gpr_uint32 window_delta; - /* simple writes are queued to qbuf, and flushed here */ - if (!t->parsing) { - gpr_slice_buffer_swap(&t->qbuf, &t->outbuf); - GPR_ASSERT(t->qbuf.count == 0); + /* don't do anything if we are already writing */ + if (t->writing.executing) { + return; } + /* simple writes are queued to qbuf, and flushed here */ + gpr_slice_buffer_swap(&t->global.qbuf, &t->writing.outbuf); + GPR_ASSERT(t->global.qbuf.count == 0); + if (t->dirtied_local_settings && !t->sent_local_settings) { gpr_slice_buffer_add( - &t->outbuf, grpc_chttp2_settings_create( + &t->writing.outbuf, grpc_chttp2_settings_create( t->settings[SENT_SETTINGS], t->settings[LOCAL_SETTINGS], t->force_send_settings, GRPC_CHTTP2_NUM_SETTINGS)); t->force_send_settings = 0; @@ -980,7 +1050,7 @@ static int prepare_write(transport *t) { } } - if (!t->parsing) { + if (!t->parsing.executing) { /* for each stream that wants to update its window, add that window here */ while ((s = stream_list_remove_head(t, WINDOW_UPDATE))) { window_delta = @@ -988,7 +1058,7 @@ static int prepare_write(transport *t) { s->incoming_window; if (!s->read_closed && window_delta) { gpr_slice_buffer_add( - &t->outbuf, grpc_chttp2_window_update_create(s->id, window_delta)); + &t->writing.outbuf, grpc_chttp2_window_update_create(s->id, window_delta)); FLOWCTL_TRACE(t, s, incoming, s->id, window_delta); s->incoming_window += window_delta; } @@ -997,14 +1067,17 @@ static int prepare_write(transport *t) { /* if the transport is ready to send a window update, do so here also */ if (t->incoming_window < t->connection_window_target * 3 / 4) { window_delta = t->connection_window_target - t->incoming_window; - gpr_slice_buffer_add(&t->outbuf, + gpr_slice_buffer_add(&t->writing.outbuf, grpc_chttp2_window_update_create(0, window_delta)); FLOWCTL_TRACE(t, t, incoming, 0, window_delta); t->incoming_window += window_delta; } } - return t->outbuf.length > 0 || !stream_list_empty(t, WRITING); + if (t->writing.outbuf.length > 0) { + uctx->post_actions[uctx->num_post_actions++] = &t->writing.action; + t->writing.executing = 1; + } } static void finalize_outbuf(transport *t) { |