diff options
author | Craig Tiller <ctiller@google.com> | 2017-09-11 12:03:36 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-09-11 12:03:36 -0700 |
commit | 55c4b31389d5557b88d39bde6d783d68aa747de7 (patch) | |
tree | 57b63eb6f54ab024326468cfe36828eff3794cd9 /src | |
parent | 4a7e0594cc8434a6834ac61cb0b1198ac859affd (diff) | |
parent | 8e71287bd768dd9b8698363d062a817c39f0e07d (diff) |
Merge pull request #11758 from ctiller/write_completion
Write completion changes
Diffstat (limited to 'src')
19 files changed, 813 insertions, 209 deletions
diff --git a/src/core/ext/filters/client_channel/http_connect_handshaker.c b/src/core/ext/filters/client_channel/http_connect_handshaker.c index d4c3bc0ad3..418bb41ef6 100644 --- a/src/core/ext/filters/client_channel/http_connect_handshaker.c +++ b/src/core/ext/filters/client_channel/http_connect_handshaker.c @@ -309,7 +309,7 @@ static void http_connect_handshaker_do_handshake( grpc_httpcli_request request; memset(&request, 0, sizeof(request)); request.host = server_name; - request.http.method = "CONNECT"; + request.http.method = (char*)"CONNECT"; request.http.path = server_name; request.http.hdrs = headers; request.http.hdr_count = num_headers; diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index adf9846336..3fd701fe2f 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -84,8 +84,6 @@ grpc_tracer_flag grpc_trace_chttp2_refcount = GRPC_TRACER_INITIALIZER(false, "chttp2_refcount"); #endif -static const grpc_transport_vtable vtable; - /* forward declarations of various callbacks that we'll build closures around */ static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); @@ -248,6 +246,8 @@ void grpc_chttp2_unref_transport(grpc_exec_ctx *exec_ctx, void grpc_chttp2_ref_transport(grpc_chttp2_transport *t) { gpr_ref(&t->refs); } #endif +static const grpc_transport_vtable *get_vtable(void); + static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, const grpc_channel_args *channel_args, grpc_endpoint *ep, bool is_client) { @@ -257,7 +257,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) == GRPC_CHTTP2_CLIENT_CONNECT_STRLEN); - t->base.vtable = &vtable; + t->base.vtable = get_vtable(); t->ep = ep; /* one ref is for destroy */ gpr_ref_init(&t->refs, 1); @@ -557,11 +557,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } } - GRPC_CLOSURE_INIT(&t->write_action, write_action, t, - t->opt_target == GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT - ? grpc_executor_scheduler - : grpc_schedule_on_exec_ctx); - t->ping_state.pings_before_data_required = t->ping_policy.max_pings_without_data; t->ping_state.is_delayed_ping_timer_set = false; @@ -799,7 +794,7 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_chttp2_stream *grpc_chttp2_parsing_lookup_stream(grpc_chttp2_transport *t, uint32_t id) { - return grpc_chttp2_stream_map_find(&t->stream_map, id); + return (grpc_chttp2_stream *)grpc_chttp2_stream_map_find(&t->stream_map, id); } grpc_chttp2_stream *grpc_chttp2_parsing_accept_stream(grpc_exec_ctx *exec_ctx, @@ -858,6 +853,7 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, switch (t->write_state) { case GRPC_CHTTP2_WRITE_STATE_IDLE: set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING, reason); + t->is_first_write_in_batch = true; GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); GRPC_CLOSURE_SCHED( exec_ctx, @@ -876,46 +872,94 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, GPR_TIMER_END("grpc_chttp2_initiate_write", 0); } -void grpc_chttp2_become_writable( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, - grpc_chttp2_stream_write_type stream_write_type, const char *reason) { +void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s, + bool also_initiate_write, const char *reason) { if (!t->closed && grpc_chttp2_list_add_writable_stream(t, s)) { GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:become"); } - switch (stream_write_type) { - case GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK: - break; - case GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED: - grpc_chttp2_initiate_write(exec_ctx, t, reason); - break; - case GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED: - grpc_chttp2_initiate_write(exec_ctx, t, reason); - break; + if (also_initiate_write) { + grpc_chttp2_initiate_write(exec_ctx, t, reason); } } +static grpc_closure_scheduler *write_scheduler(grpc_chttp2_transport *t, + bool early_results_scheduled, + bool partial_write) { + /* if it's not the first write in a batch, always offload to the executor: + we'll probably end up queuing against the kernel anyway, so we'll likely + get better latency overall if we switch writing work elsewhere and continue + with application work above */ + if (!t->is_first_write_in_batch) { + return grpc_executor_scheduler(GRPC_EXECUTOR_SHORT); + } + /* equivalently, if it's a partial write, we *know* we're going to be taking a + thread jump to write it because of the above, may as well do so + immediately */ + if (partial_write) { + return grpc_executor_scheduler(GRPC_EXECUTOR_SHORT); + } + switch (t->opt_target) { + case GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT: + /* executor gives us the largest probability of being able to batch a + * write with others on this transport */ + return grpc_executor_scheduler(GRPC_EXECUTOR_SHORT); + case GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY: + return grpc_schedule_on_exec_ctx; + } + GPR_UNREACHABLE_CODE(return NULL); +} + +#define WRITE_STATE_TUPLE_TO_INT(p, i) (2 * (int)(p) + (int)(i)) +static const char *begin_writing_desc(bool partial, bool inlined) { + switch (WRITE_STATE_TUPLE_TO_INT(partial, inlined)) { + case WRITE_STATE_TUPLE_TO_INT(false, false): + return "begin write in background"; + case WRITE_STATE_TUPLE_TO_INT(false, true): + return "begin write in current thread"; + case WRITE_STATE_TUPLE_TO_INT(true, false): + return "begin partial write in background"; + case WRITE_STATE_TUPLE_TO_INT(true, true): + return "begin partial write in current thread"; + } + GPR_UNREACHABLE_CODE(return "bad state tuple"); +} + static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt, grpc_error *error_ignored) { GPR_TIMER_BEGIN("write_action_begin_locked", 0); grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE); - switch (t->closed ? GRPC_CHTTP2_NOTHING_TO_WRITE - : grpc_chttp2_begin_write(exec_ctx, t)) { - case GRPC_CHTTP2_NOTHING_TO_WRITE: - set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_IDLE, - "begin writing nothing"); - GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "writing"); - break; - case GRPC_CHTTP2_PARTIAL_WRITE: - set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE, - "begin writing partial"); - GRPC_CLOSURE_SCHED(exec_ctx, &t->write_action, GRPC_ERROR_NONE); - break; - case GRPC_CHTTP2_FULL_WRITE: - set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING, - "begin writing"); - GRPC_CLOSURE_SCHED(exec_ctx, &t->write_action, GRPC_ERROR_NONE); - break; + grpc_chttp2_begin_write_result r; + if (t->closed) { + r.writing = false; + } else { + r = grpc_chttp2_begin_write(exec_ctx, t); + } + if (r.writing) { + if (r.partial) { + GRPC_STATS_INC_HTTP2_PARTIAL_WRITES(exec_ctx); + } + if (!t->is_first_write_in_batch) { + GRPC_STATS_INC_HTTP2_WRITES_CONTINUED(exec_ctx); + } + grpc_closure_scheduler *scheduler = + write_scheduler(t, r.early_results_scheduled, r.partial); + if (scheduler != grpc_schedule_on_exec_ctx) { + GRPC_STATS_INC_HTTP2_WRITES_OFFLOADED(exec_ctx); + } + set_write_state( + exec_ctx, t, r.partial ? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE + : GRPC_CHTTP2_WRITE_STATE_WRITING, + begin_writing_desc(r.partial, scheduler == grpc_schedule_on_exec_ctx)); + GRPC_CLOSURE_SCHED(exec_ctx, GRPC_CLOSURE_INIT(&t->write_action, + write_action, t, scheduler), + GRPC_ERROR_NONE); + } else { + set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_IDLE, + "begin writing nothing"); + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "writing"); } GPR_TIMER_END("write_action_begin_locked", 0); } @@ -958,7 +1002,8 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp, case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE: GPR_TIMER_MARK("state=writing_stale_no_poller", 0); set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING, - "continue writing [!covered]"); + "continue writing"); + t->is_first_write_in_batch = false; GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); GRPC_CLOSURE_RUN( exec_ctx, @@ -1060,9 +1105,7 @@ static void maybe_start_some_streams(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); post_destructive_reclaimer(exec_ctx, t); - grpc_chttp2_become_writable(exec_ctx, t, s, - GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED, - "new_stream"); + grpc_chttp2_become_writable(exec_ctx, t, s, true, "new_stream"); } /* cancel out streams that will never be started */ while (t->next_stream_id >= MAX_CLIENT_STREAM_ID && @@ -1111,12 +1154,14 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx, closure->next_data.scratch -= CLOSURE_BARRIER_FIRST_REF_BIT; if (GRPC_TRACER_ON(grpc_http_trace)) { const char *errstr = grpc_error_string(error); - gpr_log(GPR_DEBUG, - "complete_closure_step: %p refs=%d flags=0x%04x desc=%s err=%s", - closure, - (int)(closure->next_data.scratch / CLOSURE_BARRIER_FIRST_REF_BIT), - (int)(closure->next_data.scratch % CLOSURE_BARRIER_FIRST_REF_BIT), - desc, errstr); + gpr_log( + GPR_DEBUG, + "complete_closure_step: t=%p %p refs=%d flags=0x%04x desc=%s err=%s " + "write_state=%s", + t, closure, + (int)(closure->next_data.scratch / CLOSURE_BARRIER_FIRST_REF_BIT), + (int)(closure->next_data.scratch % CLOSURE_BARRIER_FIRST_REF_BIT), desc, + errstr, write_state_name(t->write_state)); } if (error != GRPC_ERROR_NONE) { if (closure->error_data.error == GRPC_ERROR_NONE) { @@ -1157,9 +1202,7 @@ static void maybe_become_writable_due_to_send_msg(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream *s) { if (s->id != 0 && (!s->write_buffering || s->flow_controlled_buffer.length > t->write_buffer_size)) { - grpc_chttp2_become_writable(exec_ctx, t, s, - GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED, - "op.send_message"); + grpc_chttp2_become_writable(exec_ctx, t, s, true, "op.send_message"); } } @@ -1198,8 +1241,12 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx, cb->call_at_byte = notify_offset; cb->closure = s->fetching_send_message_finished; s->fetching_send_message_finished = NULL; - cb->next = s->on_write_finished_cbs; - s->on_write_finished_cbs = cb; + grpc_chttp2_write_cb **list = + s->fetching_send_message->flags & GRPC_WRITE_THROUGH + ? &s->on_write_finished_cbs + : &s->on_flow_controlled_cbs; + cb->next = *list; + *list = cb; } s->fetching_send_message = NULL; return; /* early out */ @@ -1357,14 +1404,13 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } } else { GPR_ASSERT(s->id != 0); - grpc_chttp2_stream_write_type write_type = - GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED; + bool initiate_write = true; if (op->send_message && (op->payload->send_message.send_message->flags & GRPC_WRITE_BUFFER_HINT)) { - write_type = GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK; + initiate_write = false; } - grpc_chttp2_become_writable(exec_ctx, t, s, write_type, + grpc_chttp2_become_writable(exec_ctx, t, s, initiate_write, "op.send_initial_metadata"); } } else { @@ -1473,8 +1519,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } else if (s->id != 0) { /* TODO(ctiller): check if there's flow control for any outstanding bytes before going writable */ - grpc_chttp2_become_writable(exec_ctx, t, s, - GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED, + grpc_chttp2_become_writable(exec_ctx, t, s, true, "op.send_trailing_metadata"); } } @@ -1999,6 +2044,21 @@ static grpc_error *removal_error(grpc_error *extra_error, grpc_chttp2_stream *s, return error; } +static void flush_write_list(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_chttp2_stream *s, grpc_chttp2_write_cb **list, + grpc_error *error) { + while (*list) { + grpc_chttp2_write_cb *cb = *list; + *list = cb->next; + grpc_chttp2_complete_closure_step(exec_ctx, t, s, &cb->closure, + GRPC_ERROR_REF(error), + "on_write_finished_cb"); + cb->next = t->write_cb_pool; + t->write_cb_pool = cb; + } + GRPC_ERROR_UNREF(error); +} + void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_error *error) { @@ -2018,16 +2078,9 @@ void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx, grpc_chttp2_complete_closure_step( exec_ctx, t, s, &s->fetching_send_message_finished, GRPC_ERROR_REF(error), "fetching_send_message_finished"); - while (s->on_write_finished_cbs) { - grpc_chttp2_write_cb *cb = s->on_write_finished_cbs; - s->on_write_finished_cbs = cb->next; - grpc_chttp2_complete_closure_step(exec_ctx, t, s, &cb->closure, - GRPC_ERROR_REF(error), - "on_write_finished_cb"); - cb->next = t->write_cb_pool; - t->write_cb_pool = cb; - } - GRPC_ERROR_UNREF(error); + flush_write_list(exec_ctx, t, s, &s->on_write_finished_cbs, + GRPC_ERROR_REF(error)); + flush_write_list(exec_ctx, t, s, &s->on_flow_controlled_cbs, error); } void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx, @@ -2271,13 +2324,11 @@ void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx, case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED: break; case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY: - grpc_chttp2_become_writable(exec_ctx, t, s, - GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED, + grpc_chttp2_become_writable(exec_ctx, t, s, true, "immediate stream flowctl"); break; case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE: - grpc_chttp2_become_writable(exec_ctx, t, s, - GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK, + grpc_chttp2_become_writable(exec_ctx, t, s, false, "queue stream flowctl"); break; } @@ -2390,9 +2441,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, if (t->flow_control.initial_window_update > 0) { grpc_chttp2_stream *s; while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) { - grpc_chttp2_become_writable( - exec_ctx, t, s, GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED, - "unstalled"); + grpc_chttp2_become_writable(exec_ctx, t, s, true, "unstalled"); } } t->flow_control.initial_window_update = 0; @@ -2983,6 +3032,8 @@ static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream), destroy_transport, chttp2_get_endpoint}; +static const grpc_transport_vtable *get_vtable(void) { return &vtable; } + grpc_transport *grpc_create_chttp2_transport( grpc_exec_ctx *exec_ctx, const grpc_channel_args *channel_args, grpc_endpoint *ep, int is_client) { diff --git a/src/core/ext/transport/chttp2/transport/frame_window_update.c b/src/core/ext/transport/chttp2/transport/frame_window_update.c index dc166093a7..c94f7725bf 100644 --- a/src/core/ext/transport/chttp2/transport/frame_window_update.c +++ b/src/core/ext/transport/chttp2/transport/frame_window_update.c @@ -99,9 +99,8 @@ grpc_error *grpc_chttp2_window_update_parser_parse( grpc_chttp2_flowctl_recv_stream_update( &t->flow_control, &s->flow_control, received_update); if (grpc_chttp2_list_remove_stalled_by_stream(t, s)) { - grpc_chttp2_become_writable( - exec_ctx, t, s, GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED, - "stream.read_flow_control"); + grpc_chttp2_become_writable(exec_ctx, t, s, true, + "stream.read_flow_control"); } } } else { diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 9fff30d54f..0fbedd1e56 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -262,6 +262,10 @@ struct grpc_chttp2_transport { /** write execution state of the transport */ grpc_chttp2_write_state write_state; + /** is this the first write in a series of writes? + set when we initiate writing from idle, cleared when we + initiate writing from writing+more */ + bool is_first_write_in_batch; /** is the transport destroying itself? */ uint8_t destroying; @@ -483,6 +487,7 @@ struct grpc_chttp2_stream { grpc_slice fetching_slice; int64_t next_message_end_offset; int64_t flow_controlled_bytes_written; + int64_t flow_controlled_bytes_flowed; grpc_closure complete_fetch_locked; grpc_closure *fetching_send_message_finished; @@ -555,6 +560,7 @@ struct grpc_chttp2_stream { grpc_slice_buffer flow_controlled_buffer; + grpc_chttp2_write_cb *on_flow_controlled_cbs; grpc_chttp2_write_cb *on_write_finished_cbs; grpc_chttp2_write_cb *finish_after_write; size_t sending_bytes; @@ -595,10 +601,13 @@ struct grpc_chttp2_stream { void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, const char *reason); -typedef enum { - GRPC_CHTTP2_NOTHING_TO_WRITE, - GRPC_CHTTP2_PARTIAL_WRITE, - GRPC_CHTTP2_FULL_WRITE, +typedef struct { + /** are we writing? */ + bool writing; + /** if writing: was it a complete flush (false) or a partial flush (true) */ + bool partial; + /** did we queue any completions as part of beginning the write */ + bool early_results_scheduled; } grpc_chttp2_begin_write_result; grpc_chttp2_begin_write_result grpc_chttp2_begin_write( @@ -840,22 +849,12 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, void grpc_chttp2_add_ping_strike(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t); -typedef enum { - /* don't initiate a transport write, but piggyback on the next one */ - GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK, - /* initiate a covered write */ - GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED, - /* initiate an uncovered write */ - GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED -} grpc_chttp2_stream_write_type; - /** add a ref to the stream and add it to the writable list; ref will be dropped in writing.c */ void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, - grpc_chttp2_stream_write_type type, - const char *reason); + bool also_initiate_write, const char *reason); void grpc_chttp2_cancel_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c index 148d649a46..6c12c91365 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.c +++ b/src/core/ext/transport/chttp2/transport/parsing.c @@ -106,7 +106,8 @@ grpc_error *grpc_chttp2_perform_read(grpc_exec_ctx *exec_ctx, return err; } ++cur; - ++t->deframe_state; + t->deframe_state = + (grpc_chttp2_deframe_transport_state)(1 + (int)t->deframe_state); } if (cur == end) { return GRPC_ERROR_NONE; diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index 5e9d97d485..fa224a49a4 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -123,15 +123,18 @@ static void maybe_initiate_ping(grpc_exec_ctx *exec_ctx, (t->ping_state.pings_before_data_required != 0); } -static void update_list(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, +static bool update_list(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, int64_t send_bytes, - grpc_chttp2_write_cb **list, grpc_error *error) { + grpc_chttp2_write_cb **list, int64_t *ctr, + grpc_error *error) { + bool sched_any = false; grpc_chttp2_write_cb *cb = *list; *list = NULL; - s->flow_controlled_bytes_written += send_bytes; + *ctr += send_bytes; while (cb) { grpc_chttp2_write_cb *next = cb->next; - if (cb->call_at_byte <= s->flow_controlled_bytes_written) { + if (cb->call_at_byte <= *ctr) { + sched_any = true; finish_write_cb(exec_ctx, t, s, cb, GRPC_ERROR_REF(error)); } else { add_to_write_list(list, cb); @@ -139,6 +142,7 @@ static void update_list(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, cb = next; } GRPC_ERROR_UNREF(error); + return sched_any; } static bool stream_ref_if_not_destroyed(gpr_refcount *r) { @@ -164,6 +168,13 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { grpc_chttp2_stream *s; + /* stats histogram counters: we increment these throughout this function, + and at the end publish to the central stats histograms */ + int flow_control_writes = 0; + int initial_metadata_writes = 0; + int trailing_metadata_writes = 0; + int message_writes = 0; + GRPC_STATS_INC_HTTP2_WRITES_BEGUN(exec_ctx); GPR_TIMER_BEGIN("grpc_chttp2_begin_write", 0); @@ -177,6 +188,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( t->force_send_settings = 0; t->dirtied_local_settings = 0; t->sent_local_settings = 1; + GRPC_STATS_INC_HTTP2_SETTINGS_WRITES(exec_ctx); } /* simple writes are queued to qbuf, and flushed here */ @@ -196,13 +208,13 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( } } - bool partial_write = false; + grpc_chttp2_begin_write_result result = {false, false, false}; /* for each grpc_chttp2_stream that's become writable, frame it's data (according to available window sizes) and add to the output buffer */ while (true) { if (t->outbuf.length > target_write_size(t)) { - partial_write = true; + result.partial = true; break; } @@ -246,7 +258,6 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( .stats = &s->stats.outgoing}; grpc_chttp2_encode_header(exec_ctx, &t->hpack_compressor, NULL, 0, s->send_initial_metadata, &hopt, &t->outbuf); - now_writing = true; t->ping_state.pings_before_data_required = t->ping_policy.max_pings_without_data; if (!t->is_client) { @@ -254,6 +265,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( gpr_inf_past(GPR_CLOCK_MONOTONIC); t->ping_recv_state.ping_strikes = 0; } + initial_metadata_writes++; } else { GRPC_CHTTP2_IF_TRACING( gpr_log(GPR_INFO, "not sending initial_metadata (Trailers-Only)")); @@ -269,10 +281,15 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( [num_extra_headers_for_trailing_metadata++] = &s->send_initial_metadata->idx.named.content_type->md; } + trailing_metadata_writes++; } s->send_initial_metadata = NULL; s->sent_initial_metadata = true; sent_initial_metadata = true; + result.early_results_scheduled = true; + grpc_chttp2_complete_closure_step( + exec_ctx, t, s, &s->send_initial_metadata_finished, GRPC_ERROR_NONE, + "send_initial_metadata_finished"); } /* send any window updates */ uint32_t stream_announce = grpc_chttp2_flowctl_maybe_send_stream_update( @@ -288,6 +305,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( gpr_inf_past(GPR_CLOCK_MONOTONIC); t->ping_recv_state.ping_strikes = 0; } + flow_control_writes++; } if (sent_initial_metadata) { /* send any body bytes, if allowed by flow control */ @@ -306,6 +324,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( if (max_outgoing > 0) { bool is_last_data_frame = false; bool is_last_frame = false; + size_t sending_bytes_before = s->sending_bytes; if (s->stream_compression_send_enabled) { while ((s->flow_controlled_buffer.length > 0 || s->compressed_data_buffer->length > 0) && @@ -373,6 +392,11 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( &s->stats.outgoing)); } } + result.early_results_scheduled |= + update_list(exec_ctx, t, s, + (int64_t)(s->sending_bytes - sending_bytes_before), + &s->on_flow_controlled_cbs, + &s->flow_controlled_bytes_flowed, GRPC_ERROR_NONE); now_writing = true; if (s->flow_controlled_buffer.length > 0 || (s->stream_compression_send_enabled && @@ -380,6 +404,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:fork"); grpc_chttp2_list_add_writable_stream(t, s); } + message_writes++; } else if (t->flow_control.remote_window == 0) { grpc_chttp2_list_add_stalled_by_transport(t, s); now_writing = true; @@ -415,6 +440,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( num_extra_headers_for_trailing_metadata, s->send_trailing_metadata, &hopt, &t->outbuf); + trailing_metadata_writes++; } s->send_trailing_metadata = NULL; s->sent_trailing_metadata = true; @@ -424,10 +450,22 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( s->id, GRPC_HTTP2_NO_ERROR, &s->stats.outgoing)); } now_writing = true; + result.early_results_scheduled = true; + grpc_chttp2_complete_closure_step( + exec_ctx, t, s, &s->send_trailing_metadata_finished, + GRPC_ERROR_NONE, "send_trailing_metadata_finished"); } } if (now_writing) { + GRPC_STATS_INC_HTTP2_SEND_INITIAL_METADATA_PER_WRITE( + exec_ctx, initial_metadata_writes); + GRPC_STATS_INC_HTTP2_SEND_MESSAGE_PER_WRITE(exec_ctx, message_writes); + GRPC_STATS_INC_HTTP2_SEND_TRAILING_METADATA_PER_WRITE( + exec_ctx, trailing_metadata_writes); + GRPC_STATS_INC_HTTP2_SEND_FLOWCTL_PER_WRITE(exec_ctx, + flow_control_writes); + if (!grpc_chttp2_list_add_writing_stream(t, s)) { /* already in writing list: drop ref */ GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:already_writing"); @@ -465,9 +503,8 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( GPR_TIMER_END("grpc_chttp2_begin_write", 0); - return t->outbuf.count > 0 ? (partial_write ? GRPC_CHTTP2_PARTIAL_WRITE - : GRPC_CHTTP2_FULL_WRITE) - : GRPC_CHTTP2_NOTHING_TO_WRITE; + result.writing = t->outbuf.count > 0; + return result; } void grpc_chttp2_end_write(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, @@ -476,20 +513,13 @@ void grpc_chttp2_end_write(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s; while (grpc_chttp2_list_pop_writing_stream(t, &s)) { - if (s->sent_initial_metadata) { - grpc_chttp2_complete_closure_step( - exec_ctx, t, s, &s->send_initial_metadata_finished, - GRPC_ERROR_REF(error), "send_initial_metadata_finished"); - } if (s->sending_bytes != 0) { update_list(exec_ctx, t, s, (int64_t)s->sending_bytes, - &s->on_write_finished_cbs, GRPC_ERROR_REF(error)); + &s->on_write_finished_cbs, &s->flow_controlled_bytes_written, + GRPC_ERROR_REF(error)); s->sending_bytes = 0; } if (s->sent_trailing_metadata) { - grpc_chttp2_complete_closure_step( - exec_ctx, t, s, &s->send_trailing_metadata_finished, - GRPC_ERROR_REF(error), "send_trailing_metadata_finished"); grpc_chttp2_mark_stream_closed(exec_ctx, t, s, !t->is_client, 1, GRPC_ERROR_REF(error)); } diff --git a/src/core/lib/debug/stats_data.c b/src/core/lib/debug/stats_data.c index 15ccaf21c4..3fd8ee38ef 100644 --- a/src/core/lib/debug/stats_data.c +++ b/src/core/lib/debug/stats_data.c @@ -30,6 +30,8 @@ const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT] = { "histogram_slow_lookups", "syscall_write", "syscall_read", + "tcp_backup_pollers_created", + "tcp_backup_poller_polls", "http2_op_batches", "http2_op_cancel", "http2_op_send_initial_metadata", @@ -38,16 +40,22 @@ const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT] = { "http2_op_recv_initial_metadata", "http2_op_recv_message", "http2_op_recv_trailing_metadata", + "http2_settings_writes", "http2_pings_sent", "http2_writes_begun", + "http2_writes_offloaded", + "http2_writes_continued", + "http2_partial_writes", "combiner_locks_initiated", "combiner_locks_scheduled_items", "combiner_locks_scheduled_final_items", "combiner_locks_offloaded", - "executor_scheduled_items", + "executor_scheduled_short_items", + "executor_scheduled_long_items", "executor_scheduled_to_self", "executor_wakeup_initiated", "executor_queue_drained", + "executor_push_retries", }; const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = { "Number of client side calls created by this process", @@ -59,6 +67,8 @@ const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = { "Number of write syscalls (or equivalent - eg sendmsg) made by this " "process", "Number of read syscalls (or equivalent - eg recvmsg) made by this process", + "Number of times a backup poller has been created (this can be expensive)", + "Number of polls performed on the backup poller", "Number of batches received by HTTP2 transport", "Number of cancelations received by HTTP2 transport", "Number of batches containing send initial metadata", @@ -67,20 +77,39 @@ const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = { "Number of batches containing receive initial metadata", "Number of batches containing receive message", "Number of batches containing receive trailing metadata", - "Number of HTTP2 pings sent by process", "Number of HTTP2 writes initiated", + "Number of settings frames sent", "Number of HTTP2 pings sent by process", + "Number of HTTP2 writes initiated", + "Number of HTTP2 writes offloaded to the executor from application threads", + "Number of HTTP2 writes that finished seeing more data needed to be " + "written", + "Number of HTTP2 writes that were made knowing there was still more data " + "to be written (we cap maximum write size to syscall_write)", "Number of combiner lock entries by process (first items queued to a " "combiner)", "Number of items scheduled against combiner locks", "Number of final items scheduled against combiner locks", "Number of combiner locks offloaded to different threads", - "Number of closures scheduled against the executor (gRPC thread pool)", + "Number of finite runtime closures scheduled against the executor (gRPC " + "thread pool)", + "Number of potentially infinite runtime closures scheduled against the " + "executor (gRPC thread pool)", "Number of closures scheduled by the executor to the executor", "Number of thread wakeups initiated within the executor", "Number of times an executor queue was drained", + "Number of times we raced and were forced to retry pushing a closure to " + "the executor", }; const char *grpc_stats_histogram_name[GRPC_STATS_HISTOGRAM_COUNT] = { - "tcp_write_size", "tcp_write_iov_size", "tcp_read_size", - "tcp_read_offer", "tcp_read_offer_iov_size", "http2_send_message_size", + "tcp_write_size", + "tcp_write_iov_size", + "tcp_read_size", + "tcp_read_offer", + "tcp_read_offer_iov_size", + "http2_send_message_size", + "http2_send_initial_metadata_per_write", + "http2_send_message_per_write", + "http2_send_trailing_metadata_per_write", + "http2_send_flowctl_per_write", }; const char *grpc_stats_histogram_doc[GRPC_STATS_HISTOGRAM_COUNT] = { "Number of bytes offered to each syscall_write", @@ -89,6 +118,10 @@ const char *grpc_stats_histogram_doc[GRPC_STATS_HISTOGRAM_COUNT] = { "Number of bytes offered to each syscall_read", "Number of byte segments offered to each syscall_read", "Size of messages received by HTTP2 transport", + "Number of streams initiated written per TCP write", + "Number of streams whose payload was written per TCP write", + "Number of streams terminated per TCP write", + "Number of flow control updates written per TCP write", }; const int grpc_stats_table_0[65] = { 0, 1, 2, 3, 4, 6, 8, 11, @@ -273,15 +306,135 @@ void grpc_stats_inc_http2_send_message_size(grpc_exec_ctx *exec_ctx, grpc_stats_histo_find_bucket_slow( (exec_ctx), value, grpc_stats_table_0, 64)); } -const int grpc_stats_histo_buckets[6] = {64, 64, 64, 64, 64, 64}; -const int grpc_stats_histo_start[6] = {0, 64, 128, 192, 256, 320}; -const int *const grpc_stats_histo_bucket_boundaries[6] = { +void grpc_stats_inc_http2_send_initial_metadata_per_write( + grpc_exec_ctx *exec_ctx, int value) { + value = GPR_CLAMP(value, 0, 1024); + if (value < 13) { + GRPC_STATS_INC_HISTOGRAM( + (exec_ctx), GRPC_STATS_HISTOGRAM_HTTP2_SEND_INITIAL_METADATA_PER_WRITE, + value); + return; + } + union { + double dbl; + uint64_t uint; + } _val, _bkt; + _val.dbl = value; + if (_val.uint < 4637863191261478912ull) { + int bucket = + grpc_stats_table_3[((_val.uint - 4623507967449235456ull) >> 48)] + 13; + _bkt.dbl = grpc_stats_table_2[bucket]; + bucket -= (_val.uint < _bkt.uint); + GRPC_STATS_INC_HISTOGRAM( + (exec_ctx), GRPC_STATS_HISTOGRAM_HTTP2_SEND_INITIAL_METADATA_PER_WRITE, + bucket); + return; + } + GRPC_STATS_INC_HISTOGRAM( + (exec_ctx), GRPC_STATS_HISTOGRAM_HTTP2_SEND_INITIAL_METADATA_PER_WRITE, + grpc_stats_histo_find_bucket_slow((exec_ctx), value, grpc_stats_table_2, + 64)); +} +void grpc_stats_inc_http2_send_message_per_write(grpc_exec_ctx *exec_ctx, + int value) { + value = GPR_CLAMP(value, 0, 1024); + if (value < 13) { + GRPC_STATS_INC_HISTOGRAM( + (exec_ctx), GRPC_STATS_HISTOGRAM_HTTP2_SEND_MESSAGE_PER_WRITE, value); + return; + } + union { + double dbl; + uint64_t uint; + } _val, _bkt; + _val.dbl = value; + if (_val.uint < 4637863191261478912ull) { + int bucket = + grpc_stats_table_3[((_val.uint - 4623507967449235456ull) >> 48)] + 13; + _bkt.dbl = grpc_stats_table_2[bucket]; + bucket -= (_val.uint < _bkt.uint); + GRPC_STATS_INC_HISTOGRAM( + (exec_ctx), GRPC_STATS_HISTOGRAM_HTTP2_SEND_MESSAGE_PER_WRITE, bucket); + return; + } + GRPC_STATS_INC_HISTOGRAM((exec_ctx), + GRPC_STATS_HISTOGRAM_HTTP2_SEND_MESSAGE_PER_WRITE, + grpc_stats_histo_find_bucket_slow( + (exec_ctx), value, grpc_stats_table_2, 64)); +} +void grpc_stats_inc_http2_send_trailing_metadata_per_write( + grpc_exec_ctx *exec_ctx, int value) { + value = GPR_CLAMP(value, 0, 1024); + if (value < 13) { + GRPC_STATS_INC_HISTOGRAM( + (exec_ctx), GRPC_STATS_HISTOGRAM_HTTP2_SEND_TRAILING_METADATA_PER_WRITE, + value); + return; + } + union { + double dbl; + uint64_t uint; + } _val, _bkt; + _val.dbl = value; + if (_val.uint < 4637863191261478912ull) { + int bucket = + grpc_stats_table_3[((_val.uint - 4623507967449235456ull) >> 48)] + 13; + _bkt.dbl = grpc_stats_table_2[bucket]; + bucket -= (_val.uint < _bkt.uint); + GRPC_STATS_INC_HISTOGRAM( + (exec_ctx), GRPC_STATS_HISTOGRAM_HTTP2_SEND_TRAILING_METADATA_PER_WRITE, + bucket); + return; + } + GRPC_STATS_INC_HISTOGRAM( + (exec_ctx), GRPC_STATS_HISTOGRAM_HTTP2_SEND_TRAILING_METADATA_PER_WRITE, + grpc_stats_histo_find_bucket_slow((exec_ctx), value, grpc_stats_table_2, + 64)); +} +void grpc_stats_inc_http2_send_flowctl_per_write(grpc_exec_ctx *exec_ctx, + int value) { + value = GPR_CLAMP(value, 0, 1024); + if (value < 13) { + GRPC_STATS_INC_HISTOGRAM( + (exec_ctx), GRPC_STATS_HISTOGRAM_HTTP2_SEND_FLOWCTL_PER_WRITE, value); + return; + } + union { + double dbl; + uint64_t uint; + } _val, _bkt; + _val.dbl = value; + if (_val.uint < 4637863191261478912ull) { + int bucket = + grpc_stats_table_3[((_val.uint - 4623507967449235456ull) >> 48)] + 13; + _bkt.dbl = grpc_stats_table_2[bucket]; + bucket -= (_val.uint < _bkt.uint); + GRPC_STATS_INC_HISTOGRAM( + (exec_ctx), GRPC_STATS_HISTOGRAM_HTTP2_SEND_FLOWCTL_PER_WRITE, bucket); + return; + } + GRPC_STATS_INC_HISTOGRAM((exec_ctx), + GRPC_STATS_HISTOGRAM_HTTP2_SEND_FLOWCTL_PER_WRITE, + grpc_stats_histo_find_bucket_slow( + (exec_ctx), value, grpc_stats_table_2, 64)); +} +const int grpc_stats_histo_buckets[10] = {64, 64, 64, 64, 64, + 64, 64, 64, 64, 64}; +const int grpc_stats_histo_start[10] = {0, 64, 128, 192, 256, + 320, 384, 448, 512, 576}; +const int *const grpc_stats_histo_bucket_boundaries[10] = { + grpc_stats_table_0, grpc_stats_table_2, grpc_stats_table_0, grpc_stats_table_0, grpc_stats_table_2, grpc_stats_table_0, - grpc_stats_table_0, grpc_stats_table_2, grpc_stats_table_0}; -void (*const grpc_stats_inc_histogram[6])(grpc_exec_ctx *exec_ctx, int x) = { + grpc_stats_table_2, grpc_stats_table_2, grpc_stats_table_2, + grpc_stats_table_2}; +void (*const grpc_stats_inc_histogram[10])(grpc_exec_ctx *exec_ctx, int x) = { grpc_stats_inc_tcp_write_size, grpc_stats_inc_tcp_write_iov_size, grpc_stats_inc_tcp_read_size, grpc_stats_inc_tcp_read_offer, grpc_stats_inc_tcp_read_offer_iov_size, - grpc_stats_inc_http2_send_message_size}; + grpc_stats_inc_http2_send_message_size, + grpc_stats_inc_http2_send_initial_metadata_per_write, + grpc_stats_inc_http2_send_message_per_write, + grpc_stats_inc_http2_send_trailing_metadata_per_write, + grpc_stats_inc_http2_send_flowctl_per_write}; diff --git a/src/core/lib/debug/stats_data.h b/src/core/lib/debug/stats_data.h index 3151e5ab5c..b7c15c08a5 100644 --- a/src/core/lib/debug/stats_data.h +++ b/src/core/lib/debug/stats_data.h @@ -32,6 +32,8 @@ typedef enum { GRPC_STATS_COUNTER_HISTOGRAM_SLOW_LOOKUPS, GRPC_STATS_COUNTER_SYSCALL_WRITE, GRPC_STATS_COUNTER_SYSCALL_READ, + GRPC_STATS_COUNTER_TCP_BACKUP_POLLERS_CREATED, + GRPC_STATS_COUNTER_TCP_BACKUP_POLLER_POLLS, GRPC_STATS_COUNTER_HTTP2_OP_BATCHES, GRPC_STATS_COUNTER_HTTP2_OP_CANCEL, GRPC_STATS_COUNTER_HTTP2_OP_SEND_INITIAL_METADATA, @@ -40,16 +42,22 @@ typedef enum { GRPC_STATS_COUNTER_HTTP2_OP_RECV_INITIAL_METADATA, GRPC_STATS_COUNTER_HTTP2_OP_RECV_MESSAGE, GRPC_STATS_COUNTER_HTTP2_OP_RECV_TRAILING_METADATA, + GRPC_STATS_COUNTER_HTTP2_SETTINGS_WRITES, GRPC_STATS_COUNTER_HTTP2_PINGS_SENT, GRPC_STATS_COUNTER_HTTP2_WRITES_BEGUN, + GRPC_STATS_COUNTER_HTTP2_WRITES_OFFLOADED, + GRPC_STATS_COUNTER_HTTP2_WRITES_CONTINUED, + GRPC_STATS_COUNTER_HTTP2_PARTIAL_WRITES, GRPC_STATS_COUNTER_COMBINER_LOCKS_INITIATED, GRPC_STATS_COUNTER_COMBINER_LOCKS_SCHEDULED_ITEMS, GRPC_STATS_COUNTER_COMBINER_LOCKS_SCHEDULED_FINAL_ITEMS, GRPC_STATS_COUNTER_COMBINER_LOCKS_OFFLOADED, - GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_ITEMS, + GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_SHORT_ITEMS, + GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_LONG_ITEMS, GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_TO_SELF, GRPC_STATS_COUNTER_EXECUTOR_WAKEUP_INITIATED, GRPC_STATS_COUNTER_EXECUTOR_QUEUE_DRAINED, + GRPC_STATS_COUNTER_EXECUTOR_PUSH_RETRIES, GRPC_STATS_COUNTER_COUNT } grpc_stats_counters; extern const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT]; @@ -61,6 +69,10 @@ typedef enum { GRPC_STATS_HISTOGRAM_TCP_READ_OFFER, GRPC_STATS_HISTOGRAM_TCP_READ_OFFER_IOV_SIZE, GRPC_STATS_HISTOGRAM_HTTP2_SEND_MESSAGE_SIZE, + GRPC_STATS_HISTOGRAM_HTTP2_SEND_INITIAL_METADATA_PER_WRITE, + GRPC_STATS_HISTOGRAM_HTTP2_SEND_MESSAGE_PER_WRITE, + GRPC_STATS_HISTOGRAM_HTTP2_SEND_TRAILING_METADATA_PER_WRITE, + GRPC_STATS_HISTOGRAM_HTTP2_SEND_FLOWCTL_PER_WRITE, GRPC_STATS_HISTOGRAM_COUNT } grpc_stats_histograms; extern const char *grpc_stats_histogram_name[GRPC_STATS_HISTOGRAM_COUNT]; @@ -78,7 +90,15 @@ typedef enum { GRPC_STATS_HISTOGRAM_TCP_READ_OFFER_IOV_SIZE_BUCKETS = 64, GRPC_STATS_HISTOGRAM_HTTP2_SEND_MESSAGE_SIZE_FIRST_SLOT = 320, GRPC_STATS_HISTOGRAM_HTTP2_SEND_MESSAGE_SIZE_BUCKETS = 64, - GRPC_STATS_HISTOGRAM_BUCKETS = 384 + GRPC_STATS_HISTOGRAM_HTTP2_SEND_INITIAL_METADATA_PER_WRITE_FIRST_SLOT = 384, + GRPC_STATS_HISTOGRAM_HTTP2_SEND_INITIAL_METADATA_PER_WRITE_BUCKETS = 64, + GRPC_STATS_HISTOGRAM_HTTP2_SEND_MESSAGE_PER_WRITE_FIRST_SLOT = 448, + GRPC_STATS_HISTOGRAM_HTTP2_SEND_MESSAGE_PER_WRITE_BUCKETS = 64, + GRPC_STATS_HISTOGRAM_HTTP2_SEND_TRAILING_METADATA_PER_WRITE_FIRST_SLOT = 512, + GRPC_STATS_HISTOGRAM_HTTP2_SEND_TRAILING_METADATA_PER_WRITE_BUCKETS = 64, + GRPC_STATS_HISTOGRAM_HTTP2_SEND_FLOWCTL_PER_WRITE_FIRST_SLOT = 576, + GRPC_STATS_HISTOGRAM_HTTP2_SEND_FLOWCTL_PER_WRITE_BUCKETS = 64, + GRPC_STATS_HISTOGRAM_BUCKETS = 640 } grpc_stats_histogram_constants; #define GRPC_STATS_INC_CLIENT_CALLS_CREATED(exec_ctx) \ GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_CLIENT_CALLS_CREATED) @@ -94,6 +114,11 @@ typedef enum { GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_SYSCALL_WRITE) #define GRPC_STATS_INC_SYSCALL_READ(exec_ctx) \ GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_SYSCALL_READ) +#define GRPC_STATS_INC_TCP_BACKUP_POLLERS_CREATED(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), \ + GRPC_STATS_COUNTER_TCP_BACKUP_POLLERS_CREATED) +#define GRPC_STATS_INC_TCP_BACKUP_POLLER_POLLS(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_TCP_BACKUP_POLLER_POLLS) #define GRPC_STATS_INC_HTTP2_OP_BATCHES(exec_ctx) \ GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_HTTP2_OP_BATCHES) #define GRPC_STATS_INC_HTTP2_OP_CANCEL(exec_ctx) \ @@ -114,10 +139,18 @@ typedef enum { #define GRPC_STATS_INC_HTTP2_OP_RECV_TRAILING_METADATA(exec_ctx) \ GRPC_STATS_INC_COUNTER((exec_ctx), \ GRPC_STATS_COUNTER_HTTP2_OP_RECV_TRAILING_METADATA) +#define GRPC_STATS_INC_HTTP2_SETTINGS_WRITES(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_HTTP2_SETTINGS_WRITES) #define GRPC_STATS_INC_HTTP2_PINGS_SENT(exec_ctx) \ GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_HTTP2_PINGS_SENT) #define GRPC_STATS_INC_HTTP2_WRITES_BEGUN(exec_ctx) \ GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_HTTP2_WRITES_BEGUN) +#define GRPC_STATS_INC_HTTP2_WRITES_OFFLOADED(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_HTTP2_WRITES_OFFLOADED) +#define GRPC_STATS_INC_HTTP2_WRITES_CONTINUED(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_HTTP2_WRITES_CONTINUED) +#define GRPC_STATS_INC_HTTP2_PARTIAL_WRITES(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_HTTP2_PARTIAL_WRITES) #define GRPC_STATS_INC_COMBINER_LOCKS_INITIATED(exec_ctx) \ GRPC_STATS_INC_COUNTER((exec_ctx), \ GRPC_STATS_COUNTER_COMBINER_LOCKS_INITIATED) @@ -130,9 +163,12 @@ typedef enum { #define GRPC_STATS_INC_COMBINER_LOCKS_OFFLOADED(exec_ctx) \ GRPC_STATS_INC_COUNTER((exec_ctx), \ GRPC_STATS_COUNTER_COMBINER_LOCKS_OFFLOADED) -#define GRPC_STATS_INC_EXECUTOR_SCHEDULED_ITEMS(exec_ctx) \ - GRPC_STATS_INC_COUNTER((exec_ctx), \ - GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_ITEMS) +#define GRPC_STATS_INC_EXECUTOR_SCHEDULED_SHORT_ITEMS(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), \ + GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_SHORT_ITEMS) +#define GRPC_STATS_INC_EXECUTOR_SCHEDULED_LONG_ITEMS(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), \ + GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_LONG_ITEMS) #define GRPC_STATS_INC_EXECUTOR_SCHEDULED_TO_SELF(exec_ctx) \ GRPC_STATS_INC_COUNTER((exec_ctx), \ GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_TO_SELF) @@ -141,6 +177,8 @@ typedef enum { GRPC_STATS_COUNTER_EXECUTOR_WAKEUP_INITIATED) #define GRPC_STATS_INC_EXECUTOR_QUEUE_DRAINED(exec_ctx) \ GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_EXECUTOR_QUEUE_DRAINED) +#define GRPC_STATS_INC_EXECUTOR_PUSH_RETRIES(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_EXECUTOR_PUSH_RETRIES) #define GRPC_STATS_INC_TCP_WRITE_SIZE(exec_ctx, value) \ grpc_stats_inc_tcp_write_size((exec_ctx), (int)(value)) void grpc_stats_inc_tcp_write_size(grpc_exec_ctx *exec_ctx, int x); @@ -159,10 +197,27 @@ void grpc_stats_inc_tcp_read_offer_iov_size(grpc_exec_ctx *exec_ctx, int x); #define GRPC_STATS_INC_HTTP2_SEND_MESSAGE_SIZE(exec_ctx, value) \ grpc_stats_inc_http2_send_message_size((exec_ctx), (int)(value)) void grpc_stats_inc_http2_send_message_size(grpc_exec_ctx *exec_ctx, int x); -extern const int grpc_stats_histo_buckets[6]; -extern const int grpc_stats_histo_start[6]; -extern const int *const grpc_stats_histo_bucket_boundaries[6]; -extern void (*const grpc_stats_inc_histogram[6])(grpc_exec_ctx *exec_ctx, +#define GRPC_STATS_INC_HTTP2_SEND_INITIAL_METADATA_PER_WRITE(exec_ctx, value) \ + grpc_stats_inc_http2_send_initial_metadata_per_write((exec_ctx), (int)(value)) +void grpc_stats_inc_http2_send_initial_metadata_per_write( + grpc_exec_ctx *exec_ctx, int x); +#define GRPC_STATS_INC_HTTP2_SEND_MESSAGE_PER_WRITE(exec_ctx, value) \ + grpc_stats_inc_http2_send_message_per_write((exec_ctx), (int)(value)) +void grpc_stats_inc_http2_send_message_per_write(grpc_exec_ctx *exec_ctx, + int x); +#define GRPC_STATS_INC_HTTP2_SEND_TRAILING_METADATA_PER_WRITE(exec_ctx, value) \ + grpc_stats_inc_http2_send_trailing_metadata_per_write((exec_ctx), \ + (int)(value)) +void grpc_stats_inc_http2_send_trailing_metadata_per_write( + grpc_exec_ctx *exec_ctx, int x); +#define GRPC_STATS_INC_HTTP2_SEND_FLOWCTL_PER_WRITE(exec_ctx, value) \ + grpc_stats_inc_http2_send_flowctl_per_write((exec_ctx), (int)(value)) +void grpc_stats_inc_http2_send_flowctl_per_write(grpc_exec_ctx *exec_ctx, int x); +extern const int grpc_stats_histo_buckets[10]; +extern const int grpc_stats_histo_start[10]; +extern const int *const grpc_stats_histo_bucket_boundaries[10]; +extern void (*const grpc_stats_inc_histogram[10])(grpc_exec_ctx *exec_ctx, + int x); #endif /* GRPC_CORE_LIB_DEBUG_STATS_DATA_H */ diff --git a/src/core/lib/debug/stats_data.yaml b/src/core/lib/debug/stats_data.yaml index 53f6ff0074..a9d71f4fcb 100644 --- a/src/core/lib/debug/stats_data.yaml +++ b/src/core/lib/debug/stats_data.yaml @@ -54,6 +54,10 @@ max: 1024 buckets: 64 doc: Number of byte segments offered to each syscall_read +- counter: tcp_backup_pollers_created + doc: Number of times a backup poller has been created (this can be expensive) +- counter: tcp_backup_poller_polls + doc: Number of polls performed on the backup poller # chttp2 - counter: http2_op_batches doc: Number of batches received by HTTP2 transport @@ -75,10 +79,36 @@ max: 16777216 buckets: 64 doc: Size of messages received by HTTP2 transport +- histogram: http2_send_initial_metadata_per_write + max: 1024 + buckets: 64 + doc: Number of streams initiated written per TCP write +- histogram: http2_send_message_per_write + max: 1024 + buckets: 64 + doc: Number of streams whose payload was written per TCP write +- histogram: http2_send_trailing_metadata_per_write + max: 1024 + buckets: 64 + doc: Number of streams terminated per TCP write +- histogram: http2_send_flowctl_per_write + max: 1024 + buckets: 64 + doc: Number of flow control updates written per TCP write +- counter: http2_settings_writes + doc: Number of settings frames sent - counter: http2_pings_sent doc: Number of HTTP2 pings sent by process - counter: http2_writes_begun doc: Number of HTTP2 writes initiated +- counter: http2_writes_offloaded + doc: Number of HTTP2 writes offloaded to the executor from application threads +- counter: http2_writes_continued + doc: Number of HTTP2 writes that finished seeing more data needed to be + written +- counter: http2_partial_writes + doc: Number of HTTP2 writes that were made knowing there was still more data + to be written (we cap maximum write size to syscall_write) # combiner locks - counter: combiner_locks_initiated doc: Number of combiner lock entries by process @@ -90,11 +120,18 @@ - counter: combiner_locks_offloaded doc: Number of combiner locks offloaded to different threads # executor -- counter: executor_scheduled_items - doc: Number of closures scheduled against the executor (gRPC thread pool) +- counter: executor_scheduled_short_items + doc: Number of finite runtime closures scheduled against the executor + (gRPC thread pool) +- counter: executor_scheduled_long_items + doc: Number of potentially infinite runtime closures scheduled against the + executor (gRPC thread pool) - counter: executor_scheduled_to_self doc: Number of closures scheduled by the executor to the executor - counter: executor_wakeup_initiated doc: Number of thread wakeups initiated within the executor - counter: executor_queue_drained doc: Number of times an executor queue was drained +- counter: executor_push_retries + doc: Number of times we raced and were forced to retry pushing a closure to + the executor diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c index 3d42f6d920..360967f3ba 100644 --- a/src/core/lib/iomgr/combiner.c +++ b/src/core/lib/iomgr/combiner.c @@ -81,7 +81,8 @@ grpc_combiner *grpc_combiner_create(void) { gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED); gpr_mpscq_init(&lock->queue); grpc_closure_list_init(&lock->final_list); - GRPC_CLOSURE_INIT(&lock->offload, offload, lock, grpc_executor_scheduler); + GRPC_CLOSURE_INIT(&lock->offload, offload, lock, + grpc_executor_scheduler(GRPC_EXECUTOR_SHORT)); GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p create", lock)); return lock; } diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index 6a083f8ade..bcf1d9001b 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -989,6 +989,10 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, r = grpc_poll_function(pfds, pfd_count, timeout); GRPC_SCHEDULING_END_BLOCKING_REGION; + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, "%p poll=%d", pollset, r); + } + if (r < 0) { if (errno != EINTR) { work_combine_error(&error, GRPC_OS_ERROR(errno, "poll")); @@ -1009,6 +1013,9 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } } else { if (pfds[0].revents & POLLIN_CHECK) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, "%p: got_wakeup", pollset); + } work_combine_error( &error, grpc_wakeup_fd_consume_wakeup(&worker.wakeup_fd->fd)); } @@ -1016,6 +1023,11 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (watchers[i].fd == NULL) { fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL); } else { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, "%p got_event: %d r:%d w:%d [%d]", pollset, + pfds[i].fd, (pfds[i].revents & POLLIN_CHECK) != 0, + (pfds[i].revents & POLLOUT_CHECK) != 0, pfds[i].revents); + } fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN_CHECK, pfds[i].revents & POLLOUT_CHECK, pollset); } diff --git a/src/core/lib/iomgr/executor.c b/src/core/lib/iomgr/executor.c index eb8d55678a..892385d7d7 100644 --- a/src/core/lib/iomgr/executor.c +++ b/src/core/lib/iomgr/executor.c @@ -40,6 +40,7 @@ typedef struct { grpc_closure_list elems; size_t depth; bool shutdown; + bool queued_long_job; gpr_thd_id id; } thread_state; @@ -50,6 +51,9 @@ static gpr_spinlock g_adding_thread_lock = GPR_SPINLOCK_STATIC_INITIALIZER; GPR_TLS_DECL(g_this_thread_state); +static grpc_tracer_flag executor_trace = + GRPC_TRACER_INITIALIZER(false, "executor"); + static void executor_thread(void *arg); static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) { @@ -59,6 +63,14 @@ static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) { while (c != NULL) { grpc_closure *next = c->next_data.next; grpc_error *error = c->error_data.error; + if (GRPC_TRACER_ON(executor_trace)) { +#ifndef NDEBUG + gpr_log(GPR_DEBUG, "EXECUTOR: run %p [created by %s:%d]", c, + c->file_created, c->line_created); +#else + gpr_log(GPR_DEBUG, "EXECUTOR: run %p", c); +#endif + } #ifndef NDEBUG c->scheduled = false; #endif @@ -66,6 +78,7 @@ static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) { GRPC_ERROR_UNREF(error); c = next; n++; + grpc_exec_ctx_flush(exec_ctx); } return n; @@ -121,6 +134,7 @@ void grpc_executor_set_threading(grpc_exec_ctx *exec_ctx, bool threading) { } void grpc_executor_init(grpc_exec_ctx *exec_ctx) { + grpc_register_tracer(&executor_trace); gpr_atm_no_barrier_store(&g_cur_threads, 0); grpc_executor_set_threading(exec_ctx, true); } @@ -138,12 +152,21 @@ static void executor_thread(void *arg) { size_t subtract_depth = 0; for (;;) { + if (GRPC_TRACER_ON(executor_trace)) { + gpr_log(GPR_DEBUG, "EXECUTOR[%d]: step (sub_depth=%" PRIdPTR ")", + (int)(ts - g_thread_state), subtract_depth); + } gpr_mu_lock(&ts->mu); ts->depth -= subtract_depth; while (grpc_closure_list_empty(ts->elems) && !ts->shutdown) { + ts->queued_long_job = false; gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_REALTIME)); } if (ts->shutdown) { + if (GRPC_TRACER_ON(executor_trace)) { + gpr_log(GPR_DEBUG, "EXECUTOR[%d]: shutdown", + (int)(ts - g_thread_state)); + } gpr_mu_unlock(&ts->mu); break; } @@ -151,52 +174,128 @@ static void executor_thread(void *arg) { grpc_closure_list exec = ts->elems; ts->elems = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT; gpr_mu_unlock(&ts->mu); + if (GRPC_TRACER_ON(executor_trace)) { + gpr_log(GPR_DEBUG, "EXECUTOR[%d]: execute", (int)(ts - g_thread_state)); + } subtract_depth = run_closures(&exec_ctx, exec); - grpc_exec_ctx_flush(&exec_ctx); } grpc_exec_ctx_finish(&exec_ctx); } static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - grpc_error *error) { - size_t cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads); - GRPC_STATS_INC_EXECUTOR_SCHEDULED_ITEMS(exec_ctx); - if (cur_thread_count == 0) { - grpc_closure_list_append(&exec_ctx->closure_list, closure, error); - return; - } - thread_state *ts = (thread_state *)gpr_tls_get(&g_this_thread_state); - if (ts == NULL) { - ts = &g_thread_state[GPR_HASH_POINTER(exec_ctx, cur_thread_count)]; + grpc_error *error, bool is_short) { + bool retry_push; + if (is_short) { + GRPC_STATS_INC_EXECUTOR_SCHEDULED_SHORT_ITEMS(exec_ctx); } else { - GRPC_STATS_INC_EXECUTOR_SCHEDULED_TO_SELF(exec_ctx); - } - gpr_mu_lock(&ts->mu); - if (grpc_closure_list_empty(ts->elems)) { - GRPC_STATS_INC_EXECUTOR_WAKEUP_INITIATED(exec_ctx); - gpr_cv_signal(&ts->cv); + GRPC_STATS_INC_EXECUTOR_SCHEDULED_LONG_ITEMS(exec_ctx); } - grpc_closure_list_append(&ts->elems, closure, error); - ts->depth++; - bool try_new_thread = ts->depth > MAX_DEPTH && - cur_thread_count < g_max_threads && !ts->shutdown; - gpr_mu_unlock(&ts->mu); - if (try_new_thread && gpr_spinlock_trylock(&g_adding_thread_lock)) { - cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads); - if (cur_thread_count < g_max_threads) { - gpr_atm_no_barrier_store(&g_cur_threads, cur_thread_count + 1); - - gpr_thd_options opt = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&opt); - gpr_thd_new(&g_thread_state[cur_thread_count].id, executor_thread, - &g_thread_state[cur_thread_count], &opt); + do { + retry_push = false; + size_t cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads); + if (cur_thread_count == 0) { + if (GRPC_TRACER_ON(executor_trace)) { +#ifndef NDEBUG + gpr_log(GPR_DEBUG, "EXECUTOR: schedule %p (created %s:%d) inline", + closure, closure->file_created, closure->line_created); +#else + gpr_log(GPR_DEBUG, "EXECUTOR: schedule %p inline", closure); +#endif + } + grpc_closure_list_append(&exec_ctx->closure_list, closure, error); + return; } - gpr_spinlock_unlock(&g_adding_thread_lock); - } + thread_state *ts = (thread_state *)gpr_tls_get(&g_this_thread_state); + if (ts == NULL) { + ts = &g_thread_state[GPR_HASH_POINTER(exec_ctx, cur_thread_count)]; + } else { + GRPC_STATS_INC_EXECUTOR_SCHEDULED_TO_SELF(exec_ctx); + } + thread_state *orig_ts = ts; + + bool try_new_thread; + for (;;) { + if (GRPC_TRACER_ON(executor_trace)) { +#ifndef NDEBUG + gpr_log( + GPR_DEBUG, + "EXECUTOR: try to schedule %p (%s) (created %s:%d) to thread %d", + closure, is_short ? "short" : "long", closure->file_created, + closure->line_created, (int)(ts - g_thread_state)); +#else + gpr_log(GPR_DEBUG, "EXECUTOR: try to schedule %p (%s) to thread %d", + closure, is_short ? "short" : "long", + (int)(ts - g_thread_state)); +#endif + } + gpr_mu_lock(&ts->mu); + if (ts->queued_long_job) { + // if there's a long job queued, we never queue anything else to this + // queue (since long jobs can take 'infinite' time and we need to + // guarantee no starvation) + // ... spin through queues and try again + gpr_mu_unlock(&ts->mu); + size_t idx = (size_t)(ts - g_thread_state); + ts = &g_thread_state[(idx + 1) % cur_thread_count]; + if (ts == orig_ts) { + retry_push = true; + try_new_thread = true; + break; + } + continue; + } + if (grpc_closure_list_empty(ts->elems)) { + GRPC_STATS_INC_EXECUTOR_WAKEUP_INITIATED(exec_ctx); + gpr_cv_signal(&ts->cv); + } + grpc_closure_list_append(&ts->elems, closure, error); + ts->depth++; + try_new_thread = ts->depth > MAX_DEPTH && + cur_thread_count < g_max_threads && !ts->shutdown; + if (!is_short) ts->queued_long_job = true; + gpr_mu_unlock(&ts->mu); + break; + } + if (try_new_thread && gpr_spinlock_trylock(&g_adding_thread_lock)) { + cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads); + if (cur_thread_count < g_max_threads) { + gpr_atm_no_barrier_store(&g_cur_threads, cur_thread_count + 1); + + gpr_thd_options opt = gpr_thd_options_default(); + gpr_thd_options_set_joinable(&opt); + gpr_thd_new(&g_thread_state[cur_thread_count].id, executor_thread, + &g_thread_state[cur_thread_count], &opt); + } + gpr_spinlock_unlock(&g_adding_thread_lock); + } + if (retry_push) { + GRPC_STATS_INC_EXECUTOR_PUSH_RETRIES(exec_ctx); + } + } while (retry_push); } -static const grpc_closure_scheduler_vtable executor_vtable = { - executor_push, executor_push, "executor"}; -static grpc_closure_scheduler executor_scheduler = {&executor_vtable}; -grpc_closure_scheduler *grpc_executor_scheduler = &executor_scheduler; +static void executor_push_short(grpc_exec_ctx *exec_ctx, grpc_closure *closure, + grpc_error *error) { + executor_push(exec_ctx, closure, error, true); +} + +static void executor_push_long(grpc_exec_ctx *exec_ctx, grpc_closure *closure, + grpc_error *error) { + executor_push(exec_ctx, closure, error, false); +} + +static const grpc_closure_scheduler_vtable executor_vtable_short = { + executor_push_short, executor_push_short, "executor"}; +static grpc_closure_scheduler executor_scheduler_short = { + &executor_vtable_short}; + +static const grpc_closure_scheduler_vtable executor_vtable_long = { + executor_push_long, executor_push_long, "executor"}; +static grpc_closure_scheduler executor_scheduler_long = {&executor_vtable_long}; + +grpc_closure_scheduler *grpc_executor_scheduler( + grpc_executor_job_length length) { + return length == GRPC_EXECUTOR_SHORT ? &executor_scheduler_short + : &executor_scheduler_long; +} diff --git a/src/core/lib/iomgr/executor.h b/src/core/lib/iomgr/executor.h index c3382a0a12..0412c02790 100644 --- a/src/core/lib/iomgr/executor.h +++ b/src/core/lib/iomgr/executor.h @@ -21,6 +21,11 @@ #include "src/core/lib/iomgr/closure.h" +typedef enum { + GRPC_EXECUTOR_SHORT, + GRPC_EXECUTOR_LONG +} grpc_executor_job_length; + /** Initialize the global executor. * * This mechanism is meant to outsource work (grpc_closure instances) to a @@ -28,7 +33,7 @@ * non-blocking solution available. */ void grpc_executor_init(grpc_exec_ctx *exec_ctx); -extern grpc_closure_scheduler *grpc_executor_scheduler; +grpc_closure_scheduler *grpc_executor_scheduler(grpc_executor_job_length); /** Shutdown the executor, running all pending work as part of the call */ void grpc_executor_shutdown(grpc_exec_ctx *exec_ctx); diff --git a/src/core/lib/iomgr/resolve_address_posix.c b/src/core/lib/iomgr/resolve_address_posix.c index b515b8f1e6..082e3b7947 100644 --- a/src/core/lib/iomgr/resolve_address_posix.c +++ b/src/core/lib/iomgr/resolve_address_posix.c @@ -177,7 +177,7 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name, grpc_resolved_addresses **addrs) { request *r = (request *)gpr_malloc(sizeof(request)); GRPC_CLOSURE_INIT(&r->request_closure, do_request_thread, r, - grpc_executor_scheduler); + grpc_executor_scheduler(GRPC_EXECUTOR_SHORT)); r->name = gpr_strdup(name); r->default_port = gpr_strdup(default_port); r->on_done = on_done; diff --git a/src/core/lib/iomgr/resolve_address_windows.c b/src/core/lib/iomgr/resolve_address_windows.c index 45cfd7248d..0cb0029f4e 100644 --- a/src/core/lib/iomgr/resolve_address_windows.c +++ b/src/core/lib/iomgr/resolve_address_windows.c @@ -159,7 +159,7 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name, grpc_resolved_addresses **addresses) { request *r = gpr_malloc(sizeof(request)); GRPC_CLOSURE_INIT(&r->request_closure, do_request_thread, r, - grpc_executor_scheduler); + grpc_executor_scheduler(GRPC_EXECUTOR_SHORT)); r->name = gpr_strdup(name); r->default_port = gpr_strdup(default_port); r->on_done = on_done; diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index ba0be9116f..7e271294fd 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -43,6 +43,7 @@ #include "src/core/lib/debug/stats.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/iomgr/ev_posix.h" +#include "src/core/lib/iomgr/executor.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" @@ -90,8 +91,8 @@ typedef struct { grpc_closure *release_fd_cb; int *release_fd; - grpc_closure read_closure; - grpc_closure write_closure; + grpc_closure read_done_closure; + grpc_closure write_done_closure; char *peer_string; @@ -99,6 +100,148 @@ typedef struct { grpc_resource_user_slice_allocator slice_allocator; } grpc_tcp; +typedef struct backup_poller { + gpr_mu *pollset_mu; + grpc_closure run_poller; +} backup_poller; + +#define BACKUP_POLLER_POLLSET(b) ((grpc_pollset *)((b) + 1)) + +static gpr_atm g_uncovered_notifications_pending; +static gpr_atm g_backup_poller; /* backup_poller* */ + +static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, + grpc_error *error); +static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, + grpc_error *error); +static void tcp_drop_uncovered_then_handle_write(grpc_exec_ctx *exec_ctx, + void *arg /* grpc_tcp */, + grpc_error *error); + +static void done_poller(grpc_exec_ctx *exec_ctx, void *bp, + grpc_error *error_ignored) { + backup_poller *p = (backup_poller *)bp; + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p destroy", p); + } + grpc_pollset_destroy(exec_ctx, BACKUP_POLLER_POLLSET(p)); + gpr_free(p); +} + +static void run_poller(grpc_exec_ctx *exec_ctx, void *bp, + grpc_error *error_ignored) { + backup_poller *p = (backup_poller *)bp; + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p run", p); + } + gpr_mu_lock(p->pollset_mu); + gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + gpr_timespec deadline = + gpr_time_add(now, gpr_time_from_seconds(10, GPR_TIMESPAN)); + GRPC_STATS_INC_TCP_BACKUP_POLLER_POLLS(exec_ctx); + GRPC_LOG_IF_ERROR("backup_poller:pollset_work", + grpc_pollset_work(exec_ctx, BACKUP_POLLER_POLLSET(p), NULL, + now, deadline)); + gpr_mu_unlock(p->pollset_mu); + /* last "uncovered" notification is the ref that keeps us polling, if we get + * there try a cas to release it */ + if (gpr_atm_no_barrier_load(&g_uncovered_notifications_pending) == 1 && + gpr_atm_full_cas(&g_uncovered_notifications_pending, 1, 0)) { + gpr_mu_lock(p->pollset_mu); + bool cas_ok = gpr_atm_full_cas(&g_backup_poller, (gpr_atm)p, 0); + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p done cas_ok=%d", p, cas_ok); + } + gpr_mu_unlock(p->pollset_mu); + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p shutdown", p); + } + grpc_pollset_shutdown(exec_ctx, BACKUP_POLLER_POLLSET(p), + GRPC_CLOSURE_INIT(&p->run_poller, done_poller, p, + grpc_schedule_on_exec_ctx)); + } else { + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p reschedule", p); + } + GRPC_CLOSURE_SCHED(exec_ctx, &p->run_poller, GRPC_ERROR_NONE); + } +} + +static void drop_uncovered(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { + backup_poller *p = (backup_poller *)gpr_atm_acq_load(&g_backup_poller); + gpr_atm old_count = + gpr_atm_no_barrier_fetch_add(&g_uncovered_notifications_pending, -1); + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p uncover cnt %d->%d", p, (int)old_count, + (int)old_count - 1); + } + GPR_ASSERT(old_count != 1); +} + +static void cover_self(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { + backup_poller *p; + gpr_atm old_count = + gpr_atm_no_barrier_fetch_add(&g_uncovered_notifications_pending, 2); + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "BACKUP_POLLER: cover cnt %d->%d", (int)old_count, + 2 + (int)old_count); + } + if (old_count == 0) { + GRPC_STATS_INC_TCP_BACKUP_POLLERS_CREATED(exec_ctx); + p = (backup_poller *)gpr_malloc(sizeof(*p) + grpc_pollset_size()); + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p create", p); + } + grpc_pollset_init(BACKUP_POLLER_POLLSET(p), &p->pollset_mu); + gpr_atm_rel_store(&g_backup_poller, (gpr_atm)p); + GRPC_CLOSURE_SCHED( + exec_ctx, + GRPC_CLOSURE_INIT(&p->run_poller, run_poller, p, + grpc_executor_scheduler(GRPC_EXECUTOR_LONG)), + GRPC_ERROR_NONE); + } else { + while ((p = (backup_poller *)gpr_atm_acq_load(&g_backup_poller)) == NULL) { + // spin waiting for backup poller + } + } + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p add %p", p, tcp); + } + grpc_pollset_add_fd(exec_ctx, BACKUP_POLLER_POLLSET(p), tcp->em_fd); + if (old_count != 0) { + drop_uncovered(exec_ctx, tcp); + } +} + +static void notify_on_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "TCP:%p notify_on_read", tcp); + } + GRPC_CLOSURE_INIT(&tcp->read_done_closure, tcp_handle_read, tcp, + grpc_schedule_on_exec_ctx); + grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_done_closure); +} + +static void notify_on_write(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "TCP:%p notify_on_write", tcp); + } + cover_self(exec_ctx, tcp); + GRPC_CLOSURE_INIT(&tcp->write_done_closure, + tcp_drop_uncovered_then_handle_write, tcp, + grpc_schedule_on_exec_ctx); + grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_done_closure); +} + +static void tcp_drop_uncovered_then_handle_write(grpc_exec_ctx *exec_ctx, + void *arg, grpc_error *error) { + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "TCP:%p got_write: %s", arg, grpc_error_string(error)); + } + drop_uncovered(exec_ctx, (grpc_tcp *)arg); + tcp_handle_write(exec_ctx, arg, error); +} + static void add_to_estimate(grpc_tcp *tcp, size_t bytes) { tcp->bytes_read_this_round += (double)bytes; } @@ -214,6 +357,7 @@ static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, grpc_closure *cb = tcp->read_cb; if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "TCP:%p call_cb %p %p:%p", tcp, cb, cb->cb, cb->cb_arg); size_t i; const char *str = grpc_error_string(error); gpr_log(GPR_DEBUG, "read: error=%s", str); @@ -271,7 +415,7 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { if (errno == EAGAIN) { finish_estimate(tcp); /* We've consumed the edge, request a new one */ - grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure); + notify_on_read(exec_ctx, tcp); } else { grpc_slice_buffer_reset_and_unref_internal(exec_ctx, tcp->incoming_buffer); @@ -308,6 +452,10 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { static void tcp_read_allocation_done(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) { grpc_tcp *tcp = (grpc_tcp *)tcpp; + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "TCP:%p read_allocation_done: %s", tcp, + grpc_error_string(error)); + } if (error != GRPC_ERROR_NONE) { grpc_slice_buffer_reset_and_unref_internal(exec_ctx, tcp->incoming_buffer); grpc_slice_buffer_reset_and_unref_internal(exec_ctx, @@ -323,9 +471,15 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { size_t target_read_size = get_target_read_size(tcp); if (tcp->incoming_buffer->length < target_read_size && tcp->incoming_buffer->count < MAX_READ_IOVEC) { + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "TCP:%p alloc_slices", tcp); + } grpc_resource_user_alloc_slices(exec_ctx, &tcp->slice_allocator, target_read_size, 1, tcp->incoming_buffer); } else { + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "TCP:%p do_read", tcp); + } tcp_do_read(exec_ctx, tcp); } } @@ -334,6 +488,9 @@ static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, grpc_error *error) { grpc_tcp *tcp = (grpc_tcp *)arg; GPR_ASSERT(!tcp->finished_edge); + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "TCP:%p got_read: %s", tcp, grpc_error_string(error)); + } if (error != GRPC_ERROR_NONE) { grpc_slice_buffer_reset_and_unref_internal(exec_ctx, tcp->incoming_buffer); @@ -357,9 +514,9 @@ static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, TCP_REF(tcp, "read"); if (tcp->finished_edge) { tcp->finished_edge = false; - grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure); + notify_on_read(exec_ctx, tcp); } else { - GRPC_CLOSURE_SCHED(exec_ctx, &tcp->read_closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, &tcp->read_done_closure, GRPC_ERROR_NONE); } } @@ -472,7 +629,7 @@ static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, if (GRPC_TRACER_ON(grpc_tcp_trace)) { gpr_log(GPR_DEBUG, "write: delayed"); } - grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure); + notify_on_write(exec_ctx, tcp); } else { cb = tcp->write_cb; tcp->write_cb = NULL; @@ -525,7 +682,7 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, if (GRPC_TRACER_ON(grpc_tcp_trace)) { gpr_log(GPR_DEBUG, "write: delayed"); } - grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure); + notify_on_write(exec_ctx, tcp); } else { if (GRPC_TRACER_ON(grpc_tcp_trace)) { const char *str = grpc_error_string(error); @@ -602,7 +759,7 @@ grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_fd *em_fd, strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) { grpc_resource_quota_unref_internal(exec_ctx, resource_quota); resource_quota = grpc_resource_quota_ref_internal( - channel_args->args[i].value.pointer.p); + (grpc_resource_quota *)channel_args->args[i].value.pointer.p); } } } @@ -631,10 +788,6 @@ grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_fd *em_fd, gpr_ref_init(&tcp->refcount, 1); gpr_atm_no_barrier_store(&tcp->shutdown_count, 0); tcp->em_fd = em_fd; - GRPC_CLOSURE_INIT(&tcp->read_closure, tcp_handle_read, tcp, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&tcp->write_closure, tcp_handle_write, tcp, - grpc_schedule_on_exec_ctx); grpc_slice_buffer_init(&tcp->last_read_buffer); tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string); grpc_resource_user_slice_allocator_init( diff --git a/src/core/lib/security/transport/security_handshaker.c b/src/core/lib/security/transport/security_handshaker.c index ea9608f444..975d599523 100644 --- a/src/core/lib/security/transport/security_handshaker.c +++ b/src/core/lib/security/transport/security_handshaker.c @@ -128,13 +128,11 @@ static void security_handshake_failed_locked(grpc_exec_ctx *exec_ctx, GRPC_CLOSURE_SCHED(exec_ctx, h->on_handshake_done, error); } -static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - security_handshaker *h = arg; - gpr_mu_lock(&h->mu); +static void on_peer_checked_inner(grpc_exec_ctx *exec_ctx, + security_handshaker *h, grpc_error *error) { if (error != GRPC_ERROR_NONE || h->shutdown) { security_handshake_failed_locked(exec_ctx, h, GRPC_ERROR_REF(error)); - goto done; + return; } // Create zero-copy frame protector, if implemented. tsi_zero_copy_grpc_protector *zero_copy_protector = NULL; @@ -146,7 +144,7 @@ static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg, "Zero-copy frame protector creation failed"), result); security_handshake_failed_locked(exec_ctx, h, error); - goto done; + return; } // Create frame protector if zero-copy frame protector is NULL. tsi_frame_protector *protector = NULL; @@ -158,7 +156,7 @@ static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg, "Frame protector creation failed"), result); security_handshake_failed_locked(exec_ctx, h, error); - goto done; + return; } } // Get unused bytes. @@ -192,7 +190,13 @@ static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg, // Set shutdown to true so that subsequent calls to // security_handshaker_shutdown() do nothing. h->shutdown = true; -done: +} + +static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + security_handshaker *h = (security_handshaker *)arg; + gpr_mu_lock(&h->mu); + on_peer_checked_inner(exec_ctx, h, error); gpr_mu_unlock(&h->mu); security_handshaker_unref(exec_ctx, h); } @@ -254,7 +258,7 @@ static grpc_error *on_handshake_next_done_locked( static void on_handshake_next_done_grpc_wrapper( tsi_result result, void *user_data, const unsigned char *bytes_to_send, size_t bytes_to_send_size, tsi_handshaker_result *handshaker_result) { - security_handshaker *h = user_data; + security_handshaker *h = (security_handshaker *)user_data; // This callback will be invoked by TSI in a non-grpc thread, so it's // safe to create our own exec_ctx here. grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; @@ -296,7 +300,7 @@ static grpc_error *do_handshaker_next_locked( static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - security_handshaker *h = arg; + security_handshaker *h = (security_handshaker *)arg; gpr_mu_lock(&h->mu); if (error != GRPC_ERROR_NONE || h->shutdown) { security_handshake_failed_locked( @@ -313,7 +317,8 @@ static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx, bytes_received_size += GRPC_SLICE_LENGTH(h->args->read_buffer->slices[i]); } if (bytes_received_size > h->handshake_buffer_size) { - h->handshake_buffer = gpr_realloc(h->handshake_buffer, bytes_received_size); + h->handshake_buffer = + (uint8_t *)gpr_realloc(h->handshake_buffer, bytes_received_size); h->handshake_buffer_size = bytes_received_size; } size_t offset = 0; @@ -338,7 +343,7 @@ static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx, static void on_handshake_data_sent_to_peer(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - security_handshaker *h = arg; + security_handshaker *h = (security_handshaker *)arg; gpr_mu_lock(&h->mu); if (error != GRPC_ERROR_NONE || h->shutdown) { security_handshake_failed_locked( @@ -415,14 +420,15 @@ static const grpc_handshaker_vtable security_handshaker_vtable = { static grpc_handshaker *security_handshaker_create( grpc_exec_ctx *exec_ctx, tsi_handshaker *handshaker, grpc_security_connector *connector) { - security_handshaker *h = gpr_zalloc(sizeof(security_handshaker)); + security_handshaker *h = + (security_handshaker *)gpr_zalloc(sizeof(security_handshaker)); grpc_handshaker_init(&security_handshaker_vtable, &h->base); h->handshaker = handshaker; h->connector = GRPC_SECURITY_CONNECTOR_REF(connector, "handshake"); gpr_mu_init(&h->mu); gpr_ref_init(&h->refs, 1); h->handshake_buffer_size = GRPC_INITIAL_HANDSHAKE_BUFFER_SIZE; - h->handshake_buffer = gpr_malloc(h->handshake_buffer_size); + h->handshake_buffer = (uint8_t *)gpr_malloc(h->handshake_buffer_size); GRPC_CLOSURE_INIT(&h->on_handshake_data_sent_to_peer, on_handshake_data_sent_to_peer, h, grpc_schedule_on_exec_ctx); @@ -465,7 +471,7 @@ static const grpc_handshaker_vtable fail_handshaker_vtable = { fail_handshaker_do_handshake}; static grpc_handshaker *fail_handshaker_create() { - grpc_handshaker *h = gpr_malloc(sizeof(*h)); + grpc_handshaker *h = (grpc_handshaker *)gpr_malloc(sizeof(*h)); grpc_handshaker_init(&fail_handshaker_vtable, h); return h; } diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index a8eb1b0425..f95cc10a6a 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -1117,9 +1117,11 @@ void grpc_server_start(grpc_server *server) { server_ref(server); server->starting = true; - GRPC_CLOSURE_SCHED(&exec_ctx, GRPC_CLOSURE_CREATE(start_listeners, server, - grpc_executor_scheduler), - GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED( + &exec_ctx, + GRPC_CLOSURE_CREATE(start_listeners, server, + grpc_executor_scheduler(GRPC_EXECUTOR_SHORT)), + GRPC_ERROR_NONE); grpc_exec_ctx_finish(&exec_ctx); } diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c index 5f6302ad00..caa11a956e 100644 --- a/src/core/lib/transport/transport.c +++ b/src/core/lib/transport/transport.c @@ -72,7 +72,8 @@ void grpc_stream_unref(grpc_exec_ctx *exec_ctx, cope with. Throw this over to the executor (on a core-owned thread) and process it there. */ - refcount->destroy.scheduler = grpc_executor_scheduler; + refcount->destroy.scheduler = + grpc_executor_scheduler(GRPC_EXECUTOR_SHORT); } GRPC_CLOSURE_SCHED(exec_ctx, &refcount->destroy, GRPC_ERROR_NONE); } |