diff options
author | Craig Tiller <ctiller@google.com> | 2017-05-12 13:17:14 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2017-05-12 13:17:14 -0700 |
commit | e9a766593b33a18574cd3dfdaaea46a6dcdddc91 (patch) | |
tree | 54f1d15d38146aec770afef465cfe4aec78ffae0 /src/core | |
parent | 8ef68a570faebde6c79b80b3922a121f0540f342 (diff) |
Progress
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/ext/transport/chttp2/transport/chttp2_transport.c | 38 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/transport/internal.h | 6 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/transport/writing.c | 16 |
3 files changed, 44 insertions, 16 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index e0aa3499c9..a9bc36b0d4 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -909,7 +909,7 @@ static void write_action(grpc_exec_ctx *exec_ctx, void *gt, grpc_error *error) { grpc_chttp2_transport *t = gt; GPR_TIMER_BEGIN("write_action", 0); grpc_endpoint_write( - exec_ctx, t->ep, &t->outbuf, true, + exec_ctx, t->ep, &t->outbuf, t->write_is_covered, grpc_closure_init(&t->write_action_end_locked, write_action_end_locked, t, grpc_combiner_scheduler(t->combiner, false))); GPR_TIMER_END("write_action", 0); @@ -1192,8 +1192,12 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx, cb->call_at_byte = notify_offset; cb->closure = s->fetching_send_message_finished; s->fetching_send_message_finished = NULL; - cb->next = s->on_write_finished_cbs; - s->on_write_finished_cbs = cb; + grpc_chttp2_write_cb **list = + s->fetching_send_message->flags & GRPC_WRITE_THROUGH + ? &s->on_write_finished_cbs + : &s->on_flow_controlled_cbs; + cb->next = *list; + *list = cb; } s->fetching_send_message = NULL; return; /* early out */ @@ -1873,6 +1877,21 @@ static grpc_error *removal_error(grpc_error *extra_error, grpc_chttp2_stream *s, return error; } +static void flush_write_list(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_chttp2_stream *s, grpc_chttp2_write_cb **list, + grpc_error *error) { + while (*list) { + grpc_chttp2_write_cb *cb = *list; + *list = cb->next; + grpc_chttp2_complete_closure_step(exec_ctx, t, s, &cb->closure, + GRPC_ERROR_REF(error), + "on_write_finished_cb"); + cb->next = t->write_cb_pool; + t->write_cb_pool = cb; + } + GRPC_ERROR_UNREF(error); +} + void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_error *error) { @@ -1892,16 +1911,9 @@ void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx, grpc_chttp2_complete_closure_step( exec_ctx, t, s, &s->fetching_send_message_finished, GRPC_ERROR_REF(error), "fetching_send_message_finished"); - while (s->on_write_finished_cbs) { - grpc_chttp2_write_cb *cb = s->on_write_finished_cbs; - s->on_write_finished_cbs = cb->next; - grpc_chttp2_complete_closure_step(exec_ctx, t, s, &cb->closure, - GRPC_ERROR_REF(error), - "on_write_finished_cb"); - cb->next = t->write_cb_pool; - t->write_cb_pool = cb; - } - GRPC_ERROR_UNREF(error); + flush_write_list(exec_ctx, t, s, &s->on_write_finished_cbs, + GRPC_ERROR_REF(error)); + flush_write_list(exec_ctx, t, s, &s->on_flow_controlled_cbs, error); } void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx, diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 8d66e396ee..682c14be3a 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -421,6 +421,10 @@ struct grpc_chttp2_transport { bool keepalive_permit_without_calls; /** keep-alive state machine state */ grpc_chttp2_keepalive_state keepalive_state; + + /** is the next write covered by a poller? set in grpc_chttp2_begin_write, + read in write_action */ + bool write_is_covered; }; typedef enum { @@ -458,6 +462,7 @@ struct grpc_chttp2_stream { grpc_slice fetching_slice; int64_t next_message_end_offset; int64_t flow_controlled_bytes_written; + int64_t flow_controlled_bytes_flowed; bool complete_fetch_covered_by_poller; grpc_closure complete_fetch_locked; grpc_closure *fetching_send_message_finished; @@ -531,6 +536,7 @@ struct grpc_chttp2_stream { uint32_t announce_window; grpc_slice_buffer flow_controlled_buffer; + grpc_chttp2_write_cb *on_flow_controlled_cbs; grpc_chttp2_write_cb *on_write_finished_cbs; grpc_chttp2_write_cb *finish_after_write; size_t sending_bytes; diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index 5be1092946..92ee747e6b 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -138,10 +138,11 @@ static void maybe_initiate_ping(grpc_exec_ctx *exec_ctx, static void update_list(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, int64_t send_bytes, - grpc_chttp2_write_cb **list, grpc_error *error) { + grpc_chttp2_write_cb **list, int64_t *ctr, + grpc_error *error) { grpc_chttp2_write_cb *cb = *list; *list = NULL; - s->flow_controlled_bytes_written += send_bytes; + *ctr += send_bytes; while (cb) { grpc_chttp2_write_cb *next = cb->next; if (cb->call_at_byte <= s->flow_controlled_bytes_written) { @@ -228,6 +229,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( bool sent_initial_metadata = s->sent_initial_metadata; bool now_writing = false; + t->write_is_covered = false; GRPC_CHTTP2_IF_TRACING(gpr_log( GPR_DEBUG, "W:%p %s[%d] im-(sent,send)=(%d,%d) announce=%d", t, @@ -259,6 +261,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( gpr_inf_past(GPR_CLOCK_MONOTONIC); t->ping_recv_state.ping_strikes = 0; } + t->write_is_covered = true; } /* send any window updates */ if (s->announce_window > 0) { @@ -320,11 +323,16 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( } } s->sending_bytes += send_bytes; + update_list(exec_ctx, t, s, send_bytes, &s->on_flow_controlled_cbs, + &s->flow_controlled_bytes_flowed, GRPC_ERROR_NONE); now_writing = true; if (s->flow_controlled_buffer.length > 0) { GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:fork"); grpc_chttp2_list_add_writable_stream(t, s); } + if (s->on_write_finished_cbs != NULL) { + t->write_is_covered = true; + } } else if (t->outgoing_window == 0) { grpc_chttp2_list_add_stalled_by_transport(t, s); now_writing = true; @@ -364,6 +372,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( s->id, GRPC_HTTP2_NO_ERROR, &s->stats.outgoing)); } now_writing = true; + t->write_is_covered = true; } } @@ -430,7 +439,8 @@ void grpc_chttp2_end_write(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } if (s->sending_bytes != 0) { update_list(exec_ctx, t, s, (int64_t)s->sending_bytes, - &s->on_write_finished_cbs, GRPC_ERROR_REF(error)); + &s->on_write_finished_cbs, &s->flow_controlled_bytes_written, + GRPC_ERROR_REF(error)); s->sending_bytes = 0; } if (s->sent_trailing_metadata) { |