aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-09-11 12:03:36 -0700
committerGravatar GitHub <noreply@github.com>2017-09-11 12:03:36 -0700
commit55c4b31389d5557b88d39bde6d783d68aa747de7 (patch)
tree57b63eb6f54ab024326468cfe36828eff3794cd9 /src/core
parent4a7e0594cc8434a6834ac61cb0b1198ac859affd (diff)
parent8e71287bd768dd9b8698363d062a817c39f0e07d (diff)
Merge pull request #11758 from ctiller/write_completion
Write completion changes
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/filters/client_channel/http_connect_handshaker.c2
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c203
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_window_update.c5
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h29
-rw-r--r--src/core/ext/transport/chttp2/transport/parsing.c3
-rw-r--r--src/core/ext/transport/chttp2/transport/writing.c68
-rw-r--r--src/core/lib/debug/stats_data.c175
-rw-r--r--src/core/lib/debug/stats_data.h73
-rw-r--r--src/core/lib/debug/stats_data.yaml41
-rw-r--r--src/core/lib/iomgr/combiner.c3
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.c12
-rw-r--r--src/core/lib/iomgr/executor.c173
-rw-r--r--src/core/lib/iomgr/executor.h7
-rw-r--r--src/core/lib/iomgr/resolve_address_posix.c2
-rw-r--r--src/core/lib/iomgr/resolve_address_windows.c2
-rw-r--r--src/core/lib/iomgr/tcp_posix.c177
-rw-r--r--src/core/lib/security/transport/security_handshaker.c36
-rw-r--r--src/core/lib/surface/server.c8
-rw-r--r--src/core/lib/transport/transport.c3
19 files changed, 813 insertions, 209 deletions
diff --git a/src/core/ext/filters/client_channel/http_connect_handshaker.c b/src/core/ext/filters/client_channel/http_connect_handshaker.c
index d4c3bc0ad3..418bb41ef6 100644
--- a/src/core/ext/filters/client_channel/http_connect_handshaker.c
+++ b/src/core/ext/filters/client_channel/http_connect_handshaker.c
@@ -309,7 +309,7 @@ static void http_connect_handshaker_do_handshake(
grpc_httpcli_request request;
memset(&request, 0, sizeof(request));
request.host = server_name;
- request.http.method = "CONNECT";
+ request.http.method = (char*)"CONNECT";
request.http.path = server_name;
request.http.hdrs = headers;
request.http.hdr_count = num_headers;
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index adf9846336..3fd701fe2f 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -84,8 +84,6 @@ grpc_tracer_flag grpc_trace_chttp2_refcount =
GRPC_TRACER_INITIALIZER(false, "chttp2_refcount");
#endif
-static const grpc_transport_vtable vtable;
-
/* forward declarations of various callbacks that we'll build closures around */
static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *t,
grpc_error *error);
@@ -248,6 +246,8 @@ void grpc_chttp2_unref_transport(grpc_exec_ctx *exec_ctx,
void grpc_chttp2_ref_transport(grpc_chttp2_transport *t) { gpr_ref(&t->refs); }
#endif
+static const grpc_transport_vtable *get_vtable(void);
+
static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
const grpc_channel_args *channel_args,
grpc_endpoint *ep, bool is_client) {
@@ -257,7 +257,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) ==
GRPC_CHTTP2_CLIENT_CONNECT_STRLEN);
- t->base.vtable = &vtable;
+ t->base.vtable = get_vtable();
t->ep = ep;
/* one ref is for destroy */
gpr_ref_init(&t->refs, 1);
@@ -557,11 +557,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
}
- GRPC_CLOSURE_INIT(&t->write_action, write_action, t,
- t->opt_target == GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT
- ? grpc_executor_scheduler
- : grpc_schedule_on_exec_ctx);
-
t->ping_state.pings_before_data_required =
t->ping_policy.max_pings_without_data;
t->ping_state.is_delayed_ping_timer_set = false;
@@ -799,7 +794,7 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_chttp2_stream *grpc_chttp2_parsing_lookup_stream(grpc_chttp2_transport *t,
uint32_t id) {
- return grpc_chttp2_stream_map_find(&t->stream_map, id);
+ return (grpc_chttp2_stream *)grpc_chttp2_stream_map_find(&t->stream_map, id);
}
grpc_chttp2_stream *grpc_chttp2_parsing_accept_stream(grpc_exec_ctx *exec_ctx,
@@ -858,6 +853,7 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
switch (t->write_state) {
case GRPC_CHTTP2_WRITE_STATE_IDLE:
set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING, reason);
+ t->is_first_write_in_batch = true;
GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
GRPC_CLOSURE_SCHED(
exec_ctx,
@@ -876,46 +872,94 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
GPR_TIMER_END("grpc_chttp2_initiate_write", 0);
}
-void grpc_chttp2_become_writable(
- grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s,
- grpc_chttp2_stream_write_type stream_write_type, const char *reason) {
+void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s,
+ bool also_initiate_write, const char *reason) {
if (!t->closed && grpc_chttp2_list_add_writable_stream(t, s)) {
GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:become");
}
- switch (stream_write_type) {
- case GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK:
- break;
- case GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED:
- grpc_chttp2_initiate_write(exec_ctx, t, reason);
- break;
- case GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED:
- grpc_chttp2_initiate_write(exec_ctx, t, reason);
- break;
+ if (also_initiate_write) {
+ grpc_chttp2_initiate_write(exec_ctx, t, reason);
}
}
+static grpc_closure_scheduler *write_scheduler(grpc_chttp2_transport *t,
+ bool early_results_scheduled,
+ bool partial_write) {
+ /* if it's not the first write in a batch, always offload to the executor:
+ we'll probably end up queuing against the kernel anyway, so we'll likely
+ get better latency overall if we switch writing work elsewhere and continue
+ with application work above */
+ if (!t->is_first_write_in_batch) {
+ return grpc_executor_scheduler(GRPC_EXECUTOR_SHORT);
+ }
+ /* equivalently, if it's a partial write, we *know* we're going to be taking a
+ thread jump to write it because of the above, may as well do so
+ immediately */
+ if (partial_write) {
+ return grpc_executor_scheduler(GRPC_EXECUTOR_SHORT);
+ }
+ switch (t->opt_target) {
+ case GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT:
+ /* executor gives us the largest probability of being able to batch a
+ * write with others on this transport */
+ return grpc_executor_scheduler(GRPC_EXECUTOR_SHORT);
+ case GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY:
+ return grpc_schedule_on_exec_ctx;
+ }
+ GPR_UNREACHABLE_CODE(return NULL);
+}
+
+#define WRITE_STATE_TUPLE_TO_INT(p, i) (2 * (int)(p) + (int)(i))
+static const char *begin_writing_desc(bool partial, bool inlined) {
+ switch (WRITE_STATE_TUPLE_TO_INT(partial, inlined)) {
+ case WRITE_STATE_TUPLE_TO_INT(false, false):
+ return "begin write in background";
+ case WRITE_STATE_TUPLE_TO_INT(false, true):
+ return "begin write in current thread";
+ case WRITE_STATE_TUPLE_TO_INT(true, false):
+ return "begin partial write in background";
+ case WRITE_STATE_TUPLE_TO_INT(true, true):
+ return "begin partial write in current thread";
+ }
+ GPR_UNREACHABLE_CODE(return "bad state tuple");
+}
+
static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt,
grpc_error *error_ignored) {
GPR_TIMER_BEGIN("write_action_begin_locked", 0);
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE);
- switch (t->closed ? GRPC_CHTTP2_NOTHING_TO_WRITE
- : grpc_chttp2_begin_write(exec_ctx, t)) {
- case GRPC_CHTTP2_NOTHING_TO_WRITE:
- set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_IDLE,
- "begin writing nothing");
- GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "writing");
- break;
- case GRPC_CHTTP2_PARTIAL_WRITE:
- set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE,
- "begin writing partial");
- GRPC_CLOSURE_SCHED(exec_ctx, &t->write_action, GRPC_ERROR_NONE);
- break;
- case GRPC_CHTTP2_FULL_WRITE:
- set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING,
- "begin writing");
- GRPC_CLOSURE_SCHED(exec_ctx, &t->write_action, GRPC_ERROR_NONE);
- break;
+ grpc_chttp2_begin_write_result r;
+ if (t->closed) {
+ r.writing = false;
+ } else {
+ r = grpc_chttp2_begin_write(exec_ctx, t);
+ }
+ if (r.writing) {
+ if (r.partial) {
+ GRPC_STATS_INC_HTTP2_PARTIAL_WRITES(exec_ctx);
+ }
+ if (!t->is_first_write_in_batch) {
+ GRPC_STATS_INC_HTTP2_WRITES_CONTINUED(exec_ctx);
+ }
+ grpc_closure_scheduler *scheduler =
+ write_scheduler(t, r.early_results_scheduled, r.partial);
+ if (scheduler != grpc_schedule_on_exec_ctx) {
+ GRPC_STATS_INC_HTTP2_WRITES_OFFLOADED(exec_ctx);
+ }
+ set_write_state(
+ exec_ctx, t, r.partial ? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE
+ : GRPC_CHTTP2_WRITE_STATE_WRITING,
+ begin_writing_desc(r.partial, scheduler == grpc_schedule_on_exec_ctx));
+ GRPC_CLOSURE_SCHED(exec_ctx, GRPC_CLOSURE_INIT(&t->write_action,
+ write_action, t, scheduler),
+ GRPC_ERROR_NONE);
+ } else {
+ set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_IDLE,
+ "begin writing nothing");
+ GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "writing");
}
GPR_TIMER_END("write_action_begin_locked", 0);
}
@@ -958,7 +1002,8 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp,
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
GPR_TIMER_MARK("state=writing_stale_no_poller", 0);
set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING,
- "continue writing [!covered]");
+ "continue writing");
+ t->is_first_write_in_batch = false;
GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
GRPC_CLOSURE_RUN(
exec_ctx,
@@ -1060,9 +1105,7 @@ static void maybe_start_some_streams(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
post_destructive_reclaimer(exec_ctx, t);
- grpc_chttp2_become_writable(exec_ctx, t, s,
- GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED,
- "new_stream");
+ grpc_chttp2_become_writable(exec_ctx, t, s, true, "new_stream");
}
/* cancel out streams that will never be started */
while (t->next_stream_id >= MAX_CLIENT_STREAM_ID &&
@@ -1111,12 +1154,14 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx,
closure->next_data.scratch -= CLOSURE_BARRIER_FIRST_REF_BIT;
if (GRPC_TRACER_ON(grpc_http_trace)) {
const char *errstr = grpc_error_string(error);
- gpr_log(GPR_DEBUG,
- "complete_closure_step: %p refs=%d flags=0x%04x desc=%s err=%s",
- closure,
- (int)(closure->next_data.scratch / CLOSURE_BARRIER_FIRST_REF_BIT),
- (int)(closure->next_data.scratch % CLOSURE_BARRIER_FIRST_REF_BIT),
- desc, errstr);
+ gpr_log(
+ GPR_DEBUG,
+ "complete_closure_step: t=%p %p refs=%d flags=0x%04x desc=%s err=%s "
+ "write_state=%s",
+ t, closure,
+ (int)(closure->next_data.scratch / CLOSURE_BARRIER_FIRST_REF_BIT),
+ (int)(closure->next_data.scratch % CLOSURE_BARRIER_FIRST_REF_BIT), desc,
+ errstr, write_state_name(t->write_state));
}
if (error != GRPC_ERROR_NONE) {
if (closure->error_data.error == GRPC_ERROR_NONE) {
@@ -1157,9 +1202,7 @@ static void maybe_become_writable_due_to_send_msg(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream *s) {
if (s->id != 0 && (!s->write_buffering ||
s->flow_controlled_buffer.length > t->write_buffer_size)) {
- grpc_chttp2_become_writable(exec_ctx, t, s,
- GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED,
- "op.send_message");
+ grpc_chttp2_become_writable(exec_ctx, t, s, true, "op.send_message");
}
}
@@ -1198,8 +1241,12 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx,
cb->call_at_byte = notify_offset;
cb->closure = s->fetching_send_message_finished;
s->fetching_send_message_finished = NULL;
- cb->next = s->on_write_finished_cbs;
- s->on_write_finished_cbs = cb;
+ grpc_chttp2_write_cb **list =
+ s->fetching_send_message->flags & GRPC_WRITE_THROUGH
+ ? &s->on_write_finished_cbs
+ : &s->on_flow_controlled_cbs;
+ cb->next = *list;
+ *list = cb;
}
s->fetching_send_message = NULL;
return; /* early out */
@@ -1357,14 +1404,13 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
}
} else {
GPR_ASSERT(s->id != 0);
- grpc_chttp2_stream_write_type write_type =
- GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED;
+ bool initiate_write = true;
if (op->send_message &&
(op->payload->send_message.send_message->flags &
GRPC_WRITE_BUFFER_HINT)) {
- write_type = GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK;
+ initiate_write = false;
}
- grpc_chttp2_become_writable(exec_ctx, t, s, write_type,
+ grpc_chttp2_become_writable(exec_ctx, t, s, initiate_write,
"op.send_initial_metadata");
}
} else {
@@ -1473,8 +1519,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
} else if (s->id != 0) {
/* TODO(ctiller): check if there's flow control for any outstanding
bytes before going writable */
- grpc_chttp2_become_writable(exec_ctx, t, s,
- GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED,
+ grpc_chttp2_become_writable(exec_ctx, t, s, true,
"op.send_trailing_metadata");
}
}
@@ -1999,6 +2044,21 @@ static grpc_error *removal_error(grpc_error *extra_error, grpc_chttp2_stream *s,
return error;
}
+static void flush_write_list(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s, grpc_chttp2_write_cb **list,
+ grpc_error *error) {
+ while (*list) {
+ grpc_chttp2_write_cb *cb = *list;
+ *list = cb->next;
+ grpc_chttp2_complete_closure_step(exec_ctx, t, s, &cb->closure,
+ GRPC_ERROR_REF(error),
+ "on_write_finished_cb");
+ cb->next = t->write_cb_pool;
+ t->write_cb_pool = cb;
+ }
+ GRPC_ERROR_UNREF(error);
+}
+
void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s, grpc_error *error) {
@@ -2018,16 +2078,9 @@ void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_complete_closure_step(
exec_ctx, t, s, &s->fetching_send_message_finished, GRPC_ERROR_REF(error),
"fetching_send_message_finished");
- while (s->on_write_finished_cbs) {
- grpc_chttp2_write_cb *cb = s->on_write_finished_cbs;
- s->on_write_finished_cbs = cb->next;
- grpc_chttp2_complete_closure_step(exec_ctx, t, s, &cb->closure,
- GRPC_ERROR_REF(error),
- "on_write_finished_cb");
- cb->next = t->write_cb_pool;
- t->write_cb_pool = cb;
- }
- GRPC_ERROR_UNREF(error);
+ flush_write_list(exec_ctx, t, s, &s->on_write_finished_cbs,
+ GRPC_ERROR_REF(error));
+ flush_write_list(exec_ctx, t, s, &s->on_flow_controlled_cbs, error);
}
void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx,
@@ -2271,13 +2324,11 @@ void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx,
case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED:
break;
case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY:
- grpc_chttp2_become_writable(exec_ctx, t, s,
- GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED,
+ grpc_chttp2_become_writable(exec_ctx, t, s, true,
"immediate stream flowctl");
break;
case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE:
- grpc_chttp2_become_writable(exec_ctx, t, s,
- GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK,
+ grpc_chttp2_become_writable(exec_ctx, t, s, false,
"queue stream flowctl");
break;
}
@@ -2390,9 +2441,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
if (t->flow_control.initial_window_update > 0) {
grpc_chttp2_stream *s;
while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) {
- grpc_chttp2_become_writable(
- exec_ctx, t, s, GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED,
- "unstalled");
+ grpc_chttp2_become_writable(exec_ctx, t, s, true, "unstalled");
}
}
t->flow_control.initial_window_update = 0;
@@ -2983,6 +3032,8 @@ static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream),
destroy_transport,
chttp2_get_endpoint};
+static const grpc_transport_vtable *get_vtable(void) { return &vtable; }
+
grpc_transport *grpc_create_chttp2_transport(
grpc_exec_ctx *exec_ctx, const grpc_channel_args *channel_args,
grpc_endpoint *ep, int is_client) {
diff --git a/src/core/ext/transport/chttp2/transport/frame_window_update.c b/src/core/ext/transport/chttp2/transport/frame_window_update.c
index dc166093a7..c94f7725bf 100644
--- a/src/core/ext/transport/chttp2/transport/frame_window_update.c
+++ b/src/core/ext/transport/chttp2/transport/frame_window_update.c
@@ -99,9 +99,8 @@ grpc_error *grpc_chttp2_window_update_parser_parse(
grpc_chttp2_flowctl_recv_stream_update(
&t->flow_control, &s->flow_control, received_update);
if (grpc_chttp2_list_remove_stalled_by_stream(t, s)) {
- grpc_chttp2_become_writable(
- exec_ctx, t, s, GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED,
- "stream.read_flow_control");
+ grpc_chttp2_become_writable(exec_ctx, t, s, true,
+ "stream.read_flow_control");
}
}
} else {
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 9fff30d54f..0fbedd1e56 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -262,6 +262,10 @@ struct grpc_chttp2_transport {
/** write execution state of the transport */
grpc_chttp2_write_state write_state;
+ /** is this the first write in a series of writes?
+ set when we initiate writing from idle, cleared when we
+ initiate writing from writing+more */
+ bool is_first_write_in_batch;
/** is the transport destroying itself? */
uint8_t destroying;
@@ -483,6 +487,7 @@ struct grpc_chttp2_stream {
grpc_slice fetching_slice;
int64_t next_message_end_offset;
int64_t flow_controlled_bytes_written;
+ int64_t flow_controlled_bytes_flowed;
grpc_closure complete_fetch_locked;
grpc_closure *fetching_send_message_finished;
@@ -555,6 +560,7 @@ struct grpc_chttp2_stream {
grpc_slice_buffer flow_controlled_buffer;
+ grpc_chttp2_write_cb *on_flow_controlled_cbs;
grpc_chttp2_write_cb *on_write_finished_cbs;
grpc_chttp2_write_cb *finish_after_write;
size_t sending_bytes;
@@ -595,10 +601,13 @@ struct grpc_chttp2_stream {
void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t, const char *reason);
-typedef enum {
- GRPC_CHTTP2_NOTHING_TO_WRITE,
- GRPC_CHTTP2_PARTIAL_WRITE,
- GRPC_CHTTP2_FULL_WRITE,
+typedef struct {
+ /** are we writing? */
+ bool writing;
+ /** if writing: was it a complete flush (false) or a partial flush (true) */
+ bool partial;
+ /** did we queue any completions as part of beginning the write */
+ bool early_results_scheduled;
} grpc_chttp2_begin_write_result;
grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
@@ -840,22 +849,12 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
void grpc_chttp2_add_ping_strike(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t);
-typedef enum {
- /* don't initiate a transport write, but piggyback on the next one */
- GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK,
- /* initiate a covered write */
- GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED,
- /* initiate an uncovered write */
- GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED
-} grpc_chttp2_stream_write_type;
-
/** add a ref to the stream and add it to the writable list;
ref will be dropped in writing.c */
void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s,
- grpc_chttp2_stream_write_type type,
- const char *reason);
+ bool also_initiate_write, const char *reason);
void grpc_chttp2_cancel_stream(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t, grpc_chttp2_stream *s,
diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c
index 148d649a46..6c12c91365 100644
--- a/src/core/ext/transport/chttp2/transport/parsing.c
+++ b/src/core/ext/transport/chttp2/transport/parsing.c
@@ -106,7 +106,8 @@ grpc_error *grpc_chttp2_perform_read(grpc_exec_ctx *exec_ctx,
return err;
}
++cur;
- ++t->deframe_state;
+ t->deframe_state =
+ (grpc_chttp2_deframe_transport_state)(1 + (int)t->deframe_state);
}
if (cur == end) {
return GRPC_ERROR_NONE;
diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c
index 5e9d97d485..fa224a49a4 100644
--- a/src/core/ext/transport/chttp2/transport/writing.c
+++ b/src/core/ext/transport/chttp2/transport/writing.c
@@ -123,15 +123,18 @@ static void maybe_initiate_ping(grpc_exec_ctx *exec_ctx,
(t->ping_state.pings_before_data_required != 0);
}
-static void update_list(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+static bool update_list(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_stream *s, int64_t send_bytes,
- grpc_chttp2_write_cb **list, grpc_error *error) {
+ grpc_chttp2_write_cb **list, int64_t *ctr,
+ grpc_error *error) {
+ bool sched_any = false;
grpc_chttp2_write_cb *cb = *list;
*list = NULL;
- s->flow_controlled_bytes_written += send_bytes;
+ *ctr += send_bytes;
while (cb) {
grpc_chttp2_write_cb *next = cb->next;
- if (cb->call_at_byte <= s->flow_controlled_bytes_written) {
+ if (cb->call_at_byte <= *ctr) {
+ sched_any = true;
finish_write_cb(exec_ctx, t, s, cb, GRPC_ERROR_REF(error));
} else {
add_to_write_list(list, cb);
@@ -139,6 +142,7 @@ static void update_list(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
cb = next;
}
GRPC_ERROR_UNREF(error);
+ return sched_any;
}
static bool stream_ref_if_not_destroyed(gpr_refcount *r) {
@@ -164,6 +168,13 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
grpc_chttp2_stream *s;
+ /* stats histogram counters: we increment these throughout this function,
+ and at the end publish to the central stats histograms */
+ int flow_control_writes = 0;
+ int initial_metadata_writes = 0;
+ int trailing_metadata_writes = 0;
+ int message_writes = 0;
+
GRPC_STATS_INC_HTTP2_WRITES_BEGUN(exec_ctx);
GPR_TIMER_BEGIN("grpc_chttp2_begin_write", 0);
@@ -177,6 +188,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
t->force_send_settings = 0;
t->dirtied_local_settings = 0;
t->sent_local_settings = 1;
+ GRPC_STATS_INC_HTTP2_SETTINGS_WRITES(exec_ctx);
}
/* simple writes are queued to qbuf, and flushed here */
@@ -196,13 +208,13 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
}
}
- bool partial_write = false;
+ grpc_chttp2_begin_write_result result = {false, false, false};
/* for each grpc_chttp2_stream that's become writable, frame it's data
(according to available window sizes) and add to the output buffer */
while (true) {
if (t->outbuf.length > target_write_size(t)) {
- partial_write = true;
+ result.partial = true;
break;
}
@@ -246,7 +258,6 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
.stats = &s->stats.outgoing};
grpc_chttp2_encode_header(exec_ctx, &t->hpack_compressor, NULL, 0,
s->send_initial_metadata, &hopt, &t->outbuf);
- now_writing = true;
t->ping_state.pings_before_data_required =
t->ping_policy.max_pings_without_data;
if (!t->is_client) {
@@ -254,6 +265,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
gpr_inf_past(GPR_CLOCK_MONOTONIC);
t->ping_recv_state.ping_strikes = 0;
}
+ initial_metadata_writes++;
} else {
GRPC_CHTTP2_IF_TRACING(
gpr_log(GPR_INFO, "not sending initial_metadata (Trailers-Only)"));
@@ -269,10 +281,15 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
[num_extra_headers_for_trailing_metadata++] =
&s->send_initial_metadata->idx.named.content_type->md;
}
+ trailing_metadata_writes++;
}
s->send_initial_metadata = NULL;
s->sent_initial_metadata = true;
sent_initial_metadata = true;
+ result.early_results_scheduled = true;
+ grpc_chttp2_complete_closure_step(
+ exec_ctx, t, s, &s->send_initial_metadata_finished, GRPC_ERROR_NONE,
+ "send_initial_metadata_finished");
}
/* send any window updates */
uint32_t stream_announce = grpc_chttp2_flowctl_maybe_send_stream_update(
@@ -288,6 +305,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
gpr_inf_past(GPR_CLOCK_MONOTONIC);
t->ping_recv_state.ping_strikes = 0;
}
+ flow_control_writes++;
}
if (sent_initial_metadata) {
/* send any body bytes, if allowed by flow control */
@@ -306,6 +324,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
if (max_outgoing > 0) {
bool is_last_data_frame = false;
bool is_last_frame = false;
+ size_t sending_bytes_before = s->sending_bytes;
if (s->stream_compression_send_enabled) {
while ((s->flow_controlled_buffer.length > 0 ||
s->compressed_data_buffer->length > 0) &&
@@ -373,6 +392,11 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
&s->stats.outgoing));
}
}
+ result.early_results_scheduled |=
+ update_list(exec_ctx, t, s,
+ (int64_t)(s->sending_bytes - sending_bytes_before),
+ &s->on_flow_controlled_cbs,
+ &s->flow_controlled_bytes_flowed, GRPC_ERROR_NONE);
now_writing = true;
if (s->flow_controlled_buffer.length > 0 ||
(s->stream_compression_send_enabled &&
@@ -380,6 +404,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:fork");
grpc_chttp2_list_add_writable_stream(t, s);
}
+ message_writes++;
} else if (t->flow_control.remote_window == 0) {
grpc_chttp2_list_add_stalled_by_transport(t, s);
now_writing = true;
@@ -415,6 +440,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
num_extra_headers_for_trailing_metadata,
s->send_trailing_metadata, &hopt,
&t->outbuf);
+ trailing_metadata_writes++;
}
s->send_trailing_metadata = NULL;
s->sent_trailing_metadata = true;
@@ -424,10 +450,22 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
s->id, GRPC_HTTP2_NO_ERROR, &s->stats.outgoing));
}
now_writing = true;
+ result.early_results_scheduled = true;
+ grpc_chttp2_complete_closure_step(
+ exec_ctx, t, s, &s->send_trailing_metadata_finished,
+ GRPC_ERROR_NONE, "send_trailing_metadata_finished");
}
}
if (now_writing) {
+ GRPC_STATS_INC_HTTP2_SEND_INITIAL_METADATA_PER_WRITE(
+ exec_ctx, initial_metadata_writes);
+ GRPC_STATS_INC_HTTP2_SEND_MESSAGE_PER_WRITE(exec_ctx, message_writes);
+ GRPC_STATS_INC_HTTP2_SEND_TRAILING_METADATA_PER_WRITE(
+ exec_ctx, trailing_metadata_writes);
+ GRPC_STATS_INC_HTTP2_SEND_FLOWCTL_PER_WRITE(exec_ctx,
+ flow_control_writes);
+
if (!grpc_chttp2_list_add_writing_stream(t, s)) {
/* already in writing list: drop ref */
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:already_writing");
@@ -465,9 +503,8 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
GPR_TIMER_END("grpc_chttp2_begin_write", 0);
- return t->outbuf.count > 0 ? (partial_write ? GRPC_CHTTP2_PARTIAL_WRITE
- : GRPC_CHTTP2_FULL_WRITE)
- : GRPC_CHTTP2_NOTHING_TO_WRITE;
+ result.writing = t->outbuf.count > 0;
+ return result;
}
void grpc_chttp2_end_write(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
@@ -476,20 +513,13 @@ void grpc_chttp2_end_write(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_stream *s;
while (grpc_chttp2_list_pop_writing_stream(t, &s)) {
- if (s->sent_initial_metadata) {
- grpc_chttp2_complete_closure_step(
- exec_ctx, t, s, &s->send_initial_metadata_finished,
- GRPC_ERROR_REF(error), "send_initial_metadata_finished");
- }
if (s->sending_bytes != 0) {
update_list(exec_ctx, t, s, (int64_t)s->sending_bytes,
- &s->on_write_finished_cbs, GRPC_ERROR_REF(error));
+ &s->on_write_finished_cbs, &s->flow_controlled_bytes_written,
+ GRPC_ERROR_REF(error));
s->sending_bytes = 0;
}
if (s->sent_trailing_metadata) {
- grpc_chttp2_complete_closure_step(
- exec_ctx, t, s, &s->send_trailing_metadata_finished,
- GRPC_ERROR_REF(error), "send_trailing_metadata_finished");
grpc_chttp2_mark_stream_closed(exec_ctx, t, s, !t->is_client, 1,
GRPC_ERROR_REF(error));
}
diff --git a/src/core/lib/debug/stats_data.c b/src/core/lib/debug/stats_data.c
index 15ccaf21c4..3fd8ee38ef 100644
--- a/src/core/lib/debug/stats_data.c
+++ b/src/core/lib/debug/stats_data.c
@@ -30,6 +30,8 @@ const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT] = {
"histogram_slow_lookups",
"syscall_write",
"syscall_read",
+ "tcp_backup_pollers_created",
+ "tcp_backup_poller_polls",
"http2_op_batches",
"http2_op_cancel",
"http2_op_send_initial_metadata",
@@ -38,16 +40,22 @@ const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT] = {
"http2_op_recv_initial_metadata",
"http2_op_recv_message",
"http2_op_recv_trailing_metadata",
+ "http2_settings_writes",
"http2_pings_sent",
"http2_writes_begun",
+ "http2_writes_offloaded",
+ "http2_writes_continued",
+ "http2_partial_writes",
"combiner_locks_initiated",
"combiner_locks_scheduled_items",
"combiner_locks_scheduled_final_items",
"combiner_locks_offloaded",
- "executor_scheduled_items",
+ "executor_scheduled_short_items",
+ "executor_scheduled_long_items",
"executor_scheduled_to_self",
"executor_wakeup_initiated",
"executor_queue_drained",
+ "executor_push_retries",
};
const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = {
"Number of client side calls created by this process",
@@ -59,6 +67,8 @@ const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = {
"Number of write syscalls (or equivalent - eg sendmsg) made by this "
"process",
"Number of read syscalls (or equivalent - eg recvmsg) made by this process",
+ "Number of times a backup poller has been created (this can be expensive)",
+ "Number of polls performed on the backup poller",
"Number of batches received by HTTP2 transport",
"Number of cancelations received by HTTP2 transport",
"Number of batches containing send initial metadata",
@@ -67,20 +77,39 @@ const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = {
"Number of batches containing receive initial metadata",
"Number of batches containing receive message",
"Number of batches containing receive trailing metadata",
- "Number of HTTP2 pings sent by process", "Number of HTTP2 writes initiated",
+ "Number of settings frames sent", "Number of HTTP2 pings sent by process",
+ "Number of HTTP2 writes initiated",
+ "Number of HTTP2 writes offloaded to the executor from application threads",
+ "Number of HTTP2 writes that finished seeing more data needed to be "
+ "written",
+ "Number of HTTP2 writes that were made knowing there was still more data "
+ "to be written (we cap maximum write size to syscall_write)",
"Number of combiner lock entries by process (first items queued to a "
"combiner)",
"Number of items scheduled against combiner locks",
"Number of final items scheduled against combiner locks",
"Number of combiner locks offloaded to different threads",
- "Number of closures scheduled against the executor (gRPC thread pool)",
+ "Number of finite runtime closures scheduled against the executor (gRPC "
+ "thread pool)",
+ "Number of potentially infinite runtime closures scheduled against the "
+ "executor (gRPC thread pool)",
"Number of closures scheduled by the executor to the executor",
"Number of thread wakeups initiated within the executor",
"Number of times an executor queue was drained",
+ "Number of times we raced and were forced to retry pushing a closure to "
+ "the executor",
};
const char *grpc_stats_histogram_name[GRPC_STATS_HISTOGRAM_COUNT] = {
- "tcp_write_size", "tcp_write_iov_size", "tcp_read_size",
- "tcp_read_offer", "tcp_read_offer_iov_size", "http2_send_message_size",
+ "tcp_write_size",
+ "tcp_write_iov_size",
+ "tcp_read_size",
+ "tcp_read_offer",
+ "tcp_read_offer_iov_size",
+ "http2_send_message_size",
+ "http2_send_initial_metadata_per_write",
+ "http2_send_message_per_write",
+ "http2_send_trailing_metadata_per_write",
+ "http2_send_flowctl_per_write",
};
const char *grpc_stats_histogram_doc[GRPC_STATS_HISTOGRAM_COUNT] = {
"Number of bytes offered to each syscall_write",
@@ -89,6 +118,10 @@ const char *grpc_stats_histogram_doc[GRPC_STATS_HISTOGRAM_COUNT] = {
"Number of bytes offered to each syscall_read",
"Number of byte segments offered to each syscall_read",
"Size of messages received by HTTP2 transport",
+ "Number of streams initiated written per TCP write",
+ "Number of streams whose payload was written per TCP write",
+ "Number of streams terminated per TCP write",
+ "Number of flow control updates written per TCP write",
};
const int grpc_stats_table_0[65] = {
0, 1, 2, 3, 4, 6, 8, 11,
@@ -273,15 +306,135 @@ void grpc_stats_inc_http2_send_message_size(grpc_exec_ctx *exec_ctx,
grpc_stats_histo_find_bucket_slow(
(exec_ctx), value, grpc_stats_table_0, 64));
}
-const int grpc_stats_histo_buckets[6] = {64, 64, 64, 64, 64, 64};
-const int grpc_stats_histo_start[6] = {0, 64, 128, 192, 256, 320};
-const int *const grpc_stats_histo_bucket_boundaries[6] = {
+void grpc_stats_inc_http2_send_initial_metadata_per_write(
+ grpc_exec_ctx *exec_ctx, int value) {
+ value = GPR_CLAMP(value, 0, 1024);
+ if (value < 13) {
+ GRPC_STATS_INC_HISTOGRAM(
+ (exec_ctx), GRPC_STATS_HISTOGRAM_HTTP2_SEND_INITIAL_METADATA_PER_WRITE,
+ value);
+ return;
+ }
+ union {
+ double dbl;
+ uint64_t uint;
+ } _val, _bkt;
+ _val.dbl = value;
+ if (_val.uint < 4637863191261478912ull) {
+ int bucket =
+ grpc_stats_table_3[((_val.uint - 4623507967449235456ull) >> 48)] + 13;
+ _bkt.dbl = grpc_stats_table_2[bucket];
+ bucket -= (_val.uint < _bkt.uint);
+ GRPC_STATS_INC_HISTOGRAM(
+ (exec_ctx), GRPC_STATS_HISTOGRAM_HTTP2_SEND_INITIAL_METADATA_PER_WRITE,
+ bucket);
+ return;
+ }
+ GRPC_STATS_INC_HISTOGRAM(
+ (exec_ctx), GRPC_STATS_HISTOGRAM_HTTP2_SEND_INITIAL_METADATA_PER_WRITE,
+ grpc_stats_histo_find_bucket_slow((exec_ctx), value, grpc_stats_table_2,
+ 64));
+}
+void grpc_stats_inc_http2_send_message_per_write(grpc_exec_ctx *exec_ctx,
+ int value) {
+ value = GPR_CLAMP(value, 0, 1024);
+ if (value < 13) {
+ GRPC_STATS_INC_HISTOGRAM(
+ (exec_ctx), GRPC_STATS_HISTOGRAM_HTTP2_SEND_MESSAGE_PER_WRITE, value);
+ return;
+ }
+ union {
+ double dbl;
+ uint64_t uint;
+ } _val, _bkt;
+ _val.dbl = value;
+ if (_val.uint < 4637863191261478912ull) {
+ int bucket =
+ grpc_stats_table_3[((_val.uint - 4623507967449235456ull) >> 48)] + 13;
+ _bkt.dbl = grpc_stats_table_2[bucket];
+ bucket -= (_val.uint < _bkt.uint);
+ GRPC_STATS_INC_HISTOGRAM(
+ (exec_ctx), GRPC_STATS_HISTOGRAM_HTTP2_SEND_MESSAGE_PER_WRITE, bucket);
+ return;
+ }
+ GRPC_STATS_INC_HISTOGRAM((exec_ctx),
+ GRPC_STATS_HISTOGRAM_HTTP2_SEND_MESSAGE_PER_WRITE,
+ grpc_stats_histo_find_bucket_slow(
+ (exec_ctx), value, grpc_stats_table_2, 64));
+}
+void grpc_stats_inc_http2_send_trailing_metadata_per_write(
+ grpc_exec_ctx *exec_ctx, int value) {
+ value = GPR_CLAMP(value, 0, 1024);
+ if (value < 13) {
+ GRPC_STATS_INC_HISTOGRAM(
+ (exec_ctx), GRPC_STATS_HISTOGRAM_HTTP2_SEND_TRAILING_METADATA_PER_WRITE,
+ value);
+ return;
+ }
+ union {
+ double dbl;
+ uint64_t uint;
+ } _val, _bkt;
+ _val.dbl = value;
+ if (_val.uint < 4637863191261478912ull) {
+ int bucket =
+ grpc_stats_table_3[((_val.uint - 4623507967449235456ull) >> 48)] + 13;
+ _bkt.dbl = grpc_stats_table_2[bucket];
+ bucket -= (_val.uint < _bkt.uint);
+ GRPC_STATS_INC_HISTOGRAM(
+ (exec_ctx), GRPC_STATS_HISTOGRAM_HTTP2_SEND_TRAILING_METADATA_PER_WRITE,
+ bucket);
+ return;
+ }
+ GRPC_STATS_INC_HISTOGRAM(
+ (exec_ctx), GRPC_STATS_HISTOGRAM_HTTP2_SEND_TRAILING_METADATA_PER_WRITE,
+ grpc_stats_histo_find_bucket_slow((exec_ctx), value, grpc_stats_table_2,
+ 64));
+}
+void grpc_stats_inc_http2_send_flowctl_per_write(grpc_exec_ctx *exec_ctx,
+ int value) {
+ value = GPR_CLAMP(value, 0, 1024);
+ if (value < 13) {
+ GRPC_STATS_INC_HISTOGRAM(
+ (exec_ctx), GRPC_STATS_HISTOGRAM_HTTP2_SEND_FLOWCTL_PER_WRITE, value);
+ return;
+ }
+ union {
+ double dbl;
+ uint64_t uint;
+ } _val, _bkt;
+ _val.dbl = value;
+ if (_val.uint < 4637863191261478912ull) {
+ int bucket =
+ grpc_stats_table_3[((_val.uint - 4623507967449235456ull) >> 48)] + 13;
+ _bkt.dbl = grpc_stats_table_2[bucket];
+ bucket -= (_val.uint < _bkt.uint);
+ GRPC_STATS_INC_HISTOGRAM(
+ (exec_ctx), GRPC_STATS_HISTOGRAM_HTTP2_SEND_FLOWCTL_PER_WRITE, bucket);
+ return;
+ }
+ GRPC_STATS_INC_HISTOGRAM((exec_ctx),
+ GRPC_STATS_HISTOGRAM_HTTP2_SEND_FLOWCTL_PER_WRITE,
+ grpc_stats_histo_find_bucket_slow(
+ (exec_ctx), value, grpc_stats_table_2, 64));
+}
+const int grpc_stats_histo_buckets[10] = {64, 64, 64, 64, 64,
+ 64, 64, 64, 64, 64};
+const int grpc_stats_histo_start[10] = {0, 64, 128, 192, 256,
+ 320, 384, 448, 512, 576};
+const int *const grpc_stats_histo_bucket_boundaries[10] = {
+ grpc_stats_table_0, grpc_stats_table_2, grpc_stats_table_0,
grpc_stats_table_0, grpc_stats_table_2, grpc_stats_table_0,
- grpc_stats_table_0, grpc_stats_table_2, grpc_stats_table_0};
-void (*const grpc_stats_inc_histogram[6])(grpc_exec_ctx *exec_ctx, int x) = {
+ grpc_stats_table_2, grpc_stats_table_2, grpc_stats_table_2,
+ grpc_stats_table_2};
+void (*const grpc_stats_inc_histogram[10])(grpc_exec_ctx *exec_ctx, int x) = {
grpc_stats_inc_tcp_write_size,
grpc_stats_inc_tcp_write_iov_size,
grpc_stats_inc_tcp_read_size,
grpc_stats_inc_tcp_read_offer,
grpc_stats_inc_tcp_read_offer_iov_size,
- grpc_stats_inc_http2_send_message_size};
+ grpc_stats_inc_http2_send_message_size,
+ grpc_stats_inc_http2_send_initial_metadata_per_write,
+ grpc_stats_inc_http2_send_message_per_write,
+ grpc_stats_inc_http2_send_trailing_metadata_per_write,
+ grpc_stats_inc_http2_send_flowctl_per_write};
diff --git a/src/core/lib/debug/stats_data.h b/src/core/lib/debug/stats_data.h
index 3151e5ab5c..b7c15c08a5 100644
--- a/src/core/lib/debug/stats_data.h
+++ b/src/core/lib/debug/stats_data.h
@@ -32,6 +32,8 @@ typedef enum {
GRPC_STATS_COUNTER_HISTOGRAM_SLOW_LOOKUPS,
GRPC_STATS_COUNTER_SYSCALL_WRITE,
GRPC_STATS_COUNTER_SYSCALL_READ,
+ GRPC_STATS_COUNTER_TCP_BACKUP_POLLERS_CREATED,
+ GRPC_STATS_COUNTER_TCP_BACKUP_POLLER_POLLS,
GRPC_STATS_COUNTER_HTTP2_OP_BATCHES,
GRPC_STATS_COUNTER_HTTP2_OP_CANCEL,
GRPC_STATS_COUNTER_HTTP2_OP_SEND_INITIAL_METADATA,
@@ -40,16 +42,22 @@ typedef enum {
GRPC_STATS_COUNTER_HTTP2_OP_RECV_INITIAL_METADATA,
GRPC_STATS_COUNTER_HTTP2_OP_RECV_MESSAGE,
GRPC_STATS_COUNTER_HTTP2_OP_RECV_TRAILING_METADATA,
+ GRPC_STATS_COUNTER_HTTP2_SETTINGS_WRITES,
GRPC_STATS_COUNTER_HTTP2_PINGS_SENT,
GRPC_STATS_COUNTER_HTTP2_WRITES_BEGUN,
+ GRPC_STATS_COUNTER_HTTP2_WRITES_OFFLOADED,
+ GRPC_STATS_COUNTER_HTTP2_WRITES_CONTINUED,
+ GRPC_STATS_COUNTER_HTTP2_PARTIAL_WRITES,
GRPC_STATS_COUNTER_COMBINER_LOCKS_INITIATED,
GRPC_STATS_COUNTER_COMBINER_LOCKS_SCHEDULED_ITEMS,
GRPC_STATS_COUNTER_COMBINER_LOCKS_SCHEDULED_FINAL_ITEMS,
GRPC_STATS_COUNTER_COMBINER_LOCKS_OFFLOADED,
- GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_ITEMS,
+ GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_SHORT_ITEMS,
+ GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_LONG_ITEMS,
GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_TO_SELF,
GRPC_STATS_COUNTER_EXECUTOR_WAKEUP_INITIATED,
GRPC_STATS_COUNTER_EXECUTOR_QUEUE_DRAINED,
+ GRPC_STATS_COUNTER_EXECUTOR_PUSH_RETRIES,
GRPC_STATS_COUNTER_COUNT
} grpc_stats_counters;
extern const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT];
@@ -61,6 +69,10 @@ typedef enum {
GRPC_STATS_HISTOGRAM_TCP_READ_OFFER,
GRPC_STATS_HISTOGRAM_TCP_READ_OFFER_IOV_SIZE,
GRPC_STATS_HISTOGRAM_HTTP2_SEND_MESSAGE_SIZE,
+ GRPC_STATS_HISTOGRAM_HTTP2_SEND_INITIAL_METADATA_PER_WRITE,
+ GRPC_STATS_HISTOGRAM_HTTP2_SEND_MESSAGE_PER_WRITE,
+ GRPC_STATS_HISTOGRAM_HTTP2_SEND_TRAILING_METADATA_PER_WRITE,
+ GRPC_STATS_HISTOGRAM_HTTP2_SEND_FLOWCTL_PER_WRITE,
GRPC_STATS_HISTOGRAM_COUNT
} grpc_stats_histograms;
extern const char *grpc_stats_histogram_name[GRPC_STATS_HISTOGRAM_COUNT];
@@ -78,7 +90,15 @@ typedef enum {
GRPC_STATS_HISTOGRAM_TCP_READ_OFFER_IOV_SIZE_BUCKETS = 64,
GRPC_STATS_HISTOGRAM_HTTP2_SEND_MESSAGE_SIZE_FIRST_SLOT = 320,
GRPC_STATS_HISTOGRAM_HTTP2_SEND_MESSAGE_SIZE_BUCKETS = 64,
- GRPC_STATS_HISTOGRAM_BUCKETS = 384
+ GRPC_STATS_HISTOGRAM_HTTP2_SEND_INITIAL_METADATA_PER_WRITE_FIRST_SLOT = 384,
+ GRPC_STATS_HISTOGRAM_HTTP2_SEND_INITIAL_METADATA_PER_WRITE_BUCKETS = 64,
+ GRPC_STATS_HISTOGRAM_HTTP2_SEND_MESSAGE_PER_WRITE_FIRST_SLOT = 448,
+ GRPC_STATS_HISTOGRAM_HTTP2_SEND_MESSAGE_PER_WRITE_BUCKETS = 64,
+ GRPC_STATS_HISTOGRAM_HTTP2_SEND_TRAILING_METADATA_PER_WRITE_FIRST_SLOT = 512,
+ GRPC_STATS_HISTOGRAM_HTTP2_SEND_TRAILING_METADATA_PER_WRITE_BUCKETS = 64,
+ GRPC_STATS_HISTOGRAM_HTTP2_SEND_FLOWCTL_PER_WRITE_FIRST_SLOT = 576,
+ GRPC_STATS_HISTOGRAM_HTTP2_SEND_FLOWCTL_PER_WRITE_BUCKETS = 64,
+ GRPC_STATS_HISTOGRAM_BUCKETS = 640
} grpc_stats_histogram_constants;
#define GRPC_STATS_INC_CLIENT_CALLS_CREATED(exec_ctx) \
GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_CLIENT_CALLS_CREATED)
@@ -94,6 +114,11 @@ typedef enum {
GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_SYSCALL_WRITE)
#define GRPC_STATS_INC_SYSCALL_READ(exec_ctx) \
GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_SYSCALL_READ)
+#define GRPC_STATS_INC_TCP_BACKUP_POLLERS_CREATED(exec_ctx) \
+ GRPC_STATS_INC_COUNTER((exec_ctx), \
+ GRPC_STATS_COUNTER_TCP_BACKUP_POLLERS_CREATED)
+#define GRPC_STATS_INC_TCP_BACKUP_POLLER_POLLS(exec_ctx) \
+ GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_TCP_BACKUP_POLLER_POLLS)
#define GRPC_STATS_INC_HTTP2_OP_BATCHES(exec_ctx) \
GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_HTTP2_OP_BATCHES)
#define GRPC_STATS_INC_HTTP2_OP_CANCEL(exec_ctx) \
@@ -114,10 +139,18 @@ typedef enum {
#define GRPC_STATS_INC_HTTP2_OP_RECV_TRAILING_METADATA(exec_ctx) \
GRPC_STATS_INC_COUNTER((exec_ctx), \
GRPC_STATS_COUNTER_HTTP2_OP_RECV_TRAILING_METADATA)
+#define GRPC_STATS_INC_HTTP2_SETTINGS_WRITES(exec_ctx) \
+ GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_HTTP2_SETTINGS_WRITES)
#define GRPC_STATS_INC_HTTP2_PINGS_SENT(exec_ctx) \
GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_HTTP2_PINGS_SENT)
#define GRPC_STATS_INC_HTTP2_WRITES_BEGUN(exec_ctx) \
GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_HTTP2_WRITES_BEGUN)
+#define GRPC_STATS_INC_HTTP2_WRITES_OFFLOADED(exec_ctx) \
+ GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_HTTP2_WRITES_OFFLOADED)
+#define GRPC_STATS_INC_HTTP2_WRITES_CONTINUED(exec_ctx) \
+ GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_HTTP2_WRITES_CONTINUED)
+#define GRPC_STATS_INC_HTTP2_PARTIAL_WRITES(exec_ctx) \
+ GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_HTTP2_PARTIAL_WRITES)
#define GRPC_STATS_INC_COMBINER_LOCKS_INITIATED(exec_ctx) \
GRPC_STATS_INC_COUNTER((exec_ctx), \
GRPC_STATS_COUNTER_COMBINER_LOCKS_INITIATED)
@@ -130,9 +163,12 @@ typedef enum {
#define GRPC_STATS_INC_COMBINER_LOCKS_OFFLOADED(exec_ctx) \
GRPC_STATS_INC_COUNTER((exec_ctx), \
GRPC_STATS_COUNTER_COMBINER_LOCKS_OFFLOADED)
-#define GRPC_STATS_INC_EXECUTOR_SCHEDULED_ITEMS(exec_ctx) \
- GRPC_STATS_INC_COUNTER((exec_ctx), \
- GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_ITEMS)
+#define GRPC_STATS_INC_EXECUTOR_SCHEDULED_SHORT_ITEMS(exec_ctx) \
+ GRPC_STATS_INC_COUNTER((exec_ctx), \
+ GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_SHORT_ITEMS)
+#define GRPC_STATS_INC_EXECUTOR_SCHEDULED_LONG_ITEMS(exec_ctx) \
+ GRPC_STATS_INC_COUNTER((exec_ctx), \
+ GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_LONG_ITEMS)
#define GRPC_STATS_INC_EXECUTOR_SCHEDULED_TO_SELF(exec_ctx) \
GRPC_STATS_INC_COUNTER((exec_ctx), \
GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_TO_SELF)
@@ -141,6 +177,8 @@ typedef enum {
GRPC_STATS_COUNTER_EXECUTOR_WAKEUP_INITIATED)
#define GRPC_STATS_INC_EXECUTOR_QUEUE_DRAINED(exec_ctx) \
GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_EXECUTOR_QUEUE_DRAINED)
+#define GRPC_STATS_INC_EXECUTOR_PUSH_RETRIES(exec_ctx) \
+ GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_EXECUTOR_PUSH_RETRIES)
#define GRPC_STATS_INC_TCP_WRITE_SIZE(exec_ctx, value) \
grpc_stats_inc_tcp_write_size((exec_ctx), (int)(value))
void grpc_stats_inc_tcp_write_size(grpc_exec_ctx *exec_ctx, int x);
@@ -159,10 +197,27 @@ void grpc_stats_inc_tcp_read_offer_iov_size(grpc_exec_ctx *exec_ctx, int x);
#define GRPC_STATS_INC_HTTP2_SEND_MESSAGE_SIZE(exec_ctx, value) \
grpc_stats_inc_http2_send_message_size((exec_ctx), (int)(value))
void grpc_stats_inc_http2_send_message_size(grpc_exec_ctx *exec_ctx, int x);
-extern const int grpc_stats_histo_buckets[6];
-extern const int grpc_stats_histo_start[6];
-extern const int *const grpc_stats_histo_bucket_boundaries[6];
-extern void (*const grpc_stats_inc_histogram[6])(grpc_exec_ctx *exec_ctx,
+#define GRPC_STATS_INC_HTTP2_SEND_INITIAL_METADATA_PER_WRITE(exec_ctx, value) \
+ grpc_stats_inc_http2_send_initial_metadata_per_write((exec_ctx), (int)(value))
+void grpc_stats_inc_http2_send_initial_metadata_per_write(
+ grpc_exec_ctx *exec_ctx, int x);
+#define GRPC_STATS_INC_HTTP2_SEND_MESSAGE_PER_WRITE(exec_ctx, value) \
+ grpc_stats_inc_http2_send_message_per_write((exec_ctx), (int)(value))
+void grpc_stats_inc_http2_send_message_per_write(grpc_exec_ctx *exec_ctx,
+ int x);
+#define GRPC_STATS_INC_HTTP2_SEND_TRAILING_METADATA_PER_WRITE(exec_ctx, value) \
+ grpc_stats_inc_http2_send_trailing_metadata_per_write((exec_ctx), \
+ (int)(value))
+void grpc_stats_inc_http2_send_trailing_metadata_per_write(
+ grpc_exec_ctx *exec_ctx, int x);
+#define GRPC_STATS_INC_HTTP2_SEND_FLOWCTL_PER_WRITE(exec_ctx, value) \
+ grpc_stats_inc_http2_send_flowctl_per_write((exec_ctx), (int)(value))
+void grpc_stats_inc_http2_send_flowctl_per_write(grpc_exec_ctx *exec_ctx,
int x);
+extern const int grpc_stats_histo_buckets[10];
+extern const int grpc_stats_histo_start[10];
+extern const int *const grpc_stats_histo_bucket_boundaries[10];
+extern void (*const grpc_stats_inc_histogram[10])(grpc_exec_ctx *exec_ctx,
+ int x);
#endif /* GRPC_CORE_LIB_DEBUG_STATS_DATA_H */
diff --git a/src/core/lib/debug/stats_data.yaml b/src/core/lib/debug/stats_data.yaml
index 53f6ff0074..a9d71f4fcb 100644
--- a/src/core/lib/debug/stats_data.yaml
+++ b/src/core/lib/debug/stats_data.yaml
@@ -54,6 +54,10 @@
max: 1024
buckets: 64
doc: Number of byte segments offered to each syscall_read
+- counter: tcp_backup_pollers_created
+ doc: Number of times a backup poller has been created (this can be expensive)
+- counter: tcp_backup_poller_polls
+ doc: Number of polls performed on the backup poller
# chttp2
- counter: http2_op_batches
doc: Number of batches received by HTTP2 transport
@@ -75,10 +79,36 @@
max: 16777216
buckets: 64
doc: Size of messages received by HTTP2 transport
+- histogram: http2_send_initial_metadata_per_write
+ max: 1024
+ buckets: 64
+ doc: Number of streams initiated written per TCP write
+- histogram: http2_send_message_per_write
+ max: 1024
+ buckets: 64
+ doc: Number of streams whose payload was written per TCP write
+- histogram: http2_send_trailing_metadata_per_write
+ max: 1024
+ buckets: 64
+ doc: Number of streams terminated per TCP write
+- histogram: http2_send_flowctl_per_write
+ max: 1024
+ buckets: 64
+ doc: Number of flow control updates written per TCP write
+- counter: http2_settings_writes
+ doc: Number of settings frames sent
- counter: http2_pings_sent
doc: Number of HTTP2 pings sent by process
- counter: http2_writes_begun
doc: Number of HTTP2 writes initiated
+- counter: http2_writes_offloaded
+ doc: Number of HTTP2 writes offloaded to the executor from application threads
+- counter: http2_writes_continued
+ doc: Number of HTTP2 writes that finished seeing more data needed to be
+ written
+- counter: http2_partial_writes
+ doc: Number of HTTP2 writes that were made knowing there was still more data
+ to be written (we cap maximum write size to syscall_write)
# combiner locks
- counter: combiner_locks_initiated
doc: Number of combiner lock entries by process
@@ -90,11 +120,18 @@
- counter: combiner_locks_offloaded
doc: Number of combiner locks offloaded to different threads
# executor
-- counter: executor_scheduled_items
- doc: Number of closures scheduled against the executor (gRPC thread pool)
+- counter: executor_scheduled_short_items
+ doc: Number of finite runtime closures scheduled against the executor
+ (gRPC thread pool)
+- counter: executor_scheduled_long_items
+ doc: Number of potentially infinite runtime closures scheduled against the
+ executor (gRPC thread pool)
- counter: executor_scheduled_to_self
doc: Number of closures scheduled by the executor to the executor
- counter: executor_wakeup_initiated
doc: Number of thread wakeups initiated within the executor
- counter: executor_queue_drained
doc: Number of times an executor queue was drained
+- counter: executor_push_retries
+ doc: Number of times we raced and were forced to retry pushing a closure to
+ the executor
diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c
index 3d42f6d920..360967f3ba 100644
--- a/src/core/lib/iomgr/combiner.c
+++ b/src/core/lib/iomgr/combiner.c
@@ -81,7 +81,8 @@ grpc_combiner *grpc_combiner_create(void) {
gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED);
gpr_mpscq_init(&lock->queue);
grpc_closure_list_init(&lock->final_list);
- GRPC_CLOSURE_INIT(&lock->offload, offload, lock, grpc_executor_scheduler);
+ GRPC_CLOSURE_INIT(&lock->offload, offload, lock,
+ grpc_executor_scheduler(GRPC_EXECUTOR_SHORT));
GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p create", lock));
return lock;
}
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c
index 6a083f8ade..bcf1d9001b 100644
--- a/src/core/lib/iomgr/ev_poll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_posix.c
@@ -989,6 +989,10 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
r = grpc_poll_function(pfds, pfd_count, timeout);
GRPC_SCHEDULING_END_BLOCKING_REGION;
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG, "%p poll=%d", pollset, r);
+ }
+
if (r < 0) {
if (errno != EINTR) {
work_combine_error(&error, GRPC_OS_ERROR(errno, "poll"));
@@ -1009,6 +1013,9 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
} else {
if (pfds[0].revents & POLLIN_CHECK) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG, "%p: got_wakeup", pollset);
+ }
work_combine_error(
&error, grpc_wakeup_fd_consume_wakeup(&worker.wakeup_fd->fd));
}
@@ -1016,6 +1023,11 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
if (watchers[i].fd == NULL) {
fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL);
} else {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG, "%p got_event: %d r:%d w:%d [%d]", pollset,
+ pfds[i].fd, (pfds[i].revents & POLLIN_CHECK) != 0,
+ (pfds[i].revents & POLLOUT_CHECK) != 0, pfds[i].revents);
+ }
fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN_CHECK,
pfds[i].revents & POLLOUT_CHECK, pollset);
}
diff --git a/src/core/lib/iomgr/executor.c b/src/core/lib/iomgr/executor.c
index eb8d55678a..892385d7d7 100644
--- a/src/core/lib/iomgr/executor.c
+++ b/src/core/lib/iomgr/executor.c
@@ -40,6 +40,7 @@ typedef struct {
grpc_closure_list elems;
size_t depth;
bool shutdown;
+ bool queued_long_job;
gpr_thd_id id;
} thread_state;
@@ -50,6 +51,9 @@ static gpr_spinlock g_adding_thread_lock = GPR_SPINLOCK_STATIC_INITIALIZER;
GPR_TLS_DECL(g_this_thread_state);
+static grpc_tracer_flag executor_trace =
+ GRPC_TRACER_INITIALIZER(false, "executor");
+
static void executor_thread(void *arg);
static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) {
@@ -59,6 +63,14 @@ static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) {
while (c != NULL) {
grpc_closure *next = c->next_data.next;
grpc_error *error = c->error_data.error;
+ if (GRPC_TRACER_ON(executor_trace)) {
+#ifndef NDEBUG
+ gpr_log(GPR_DEBUG, "EXECUTOR: run %p [created by %s:%d]", c,
+ c->file_created, c->line_created);
+#else
+ gpr_log(GPR_DEBUG, "EXECUTOR: run %p", c);
+#endif
+ }
#ifndef NDEBUG
c->scheduled = false;
#endif
@@ -66,6 +78,7 @@ static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) {
GRPC_ERROR_UNREF(error);
c = next;
n++;
+ grpc_exec_ctx_flush(exec_ctx);
}
return n;
@@ -121,6 +134,7 @@ void grpc_executor_set_threading(grpc_exec_ctx *exec_ctx, bool threading) {
}
void grpc_executor_init(grpc_exec_ctx *exec_ctx) {
+ grpc_register_tracer(&executor_trace);
gpr_atm_no_barrier_store(&g_cur_threads, 0);
grpc_executor_set_threading(exec_ctx, true);
}
@@ -138,12 +152,21 @@ static void executor_thread(void *arg) {
size_t subtract_depth = 0;
for (;;) {
+ if (GRPC_TRACER_ON(executor_trace)) {
+ gpr_log(GPR_DEBUG, "EXECUTOR[%d]: step (sub_depth=%" PRIdPTR ")",
+ (int)(ts - g_thread_state), subtract_depth);
+ }
gpr_mu_lock(&ts->mu);
ts->depth -= subtract_depth;
while (grpc_closure_list_empty(ts->elems) && !ts->shutdown) {
+ ts->queued_long_job = false;
gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_REALTIME));
}
if (ts->shutdown) {
+ if (GRPC_TRACER_ON(executor_trace)) {
+ gpr_log(GPR_DEBUG, "EXECUTOR[%d]: shutdown",
+ (int)(ts - g_thread_state));
+ }
gpr_mu_unlock(&ts->mu);
break;
}
@@ -151,52 +174,128 @@ static void executor_thread(void *arg) {
grpc_closure_list exec = ts->elems;
ts->elems = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
gpr_mu_unlock(&ts->mu);
+ if (GRPC_TRACER_ON(executor_trace)) {
+ gpr_log(GPR_DEBUG, "EXECUTOR[%d]: execute", (int)(ts - g_thread_state));
+ }
subtract_depth = run_closures(&exec_ctx, exec);
- grpc_exec_ctx_flush(&exec_ctx);
}
grpc_exec_ctx_finish(&exec_ctx);
}
static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
- grpc_error *error) {
- size_t cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads);
- GRPC_STATS_INC_EXECUTOR_SCHEDULED_ITEMS(exec_ctx);
- if (cur_thread_count == 0) {
- grpc_closure_list_append(&exec_ctx->closure_list, closure, error);
- return;
- }
- thread_state *ts = (thread_state *)gpr_tls_get(&g_this_thread_state);
- if (ts == NULL) {
- ts = &g_thread_state[GPR_HASH_POINTER(exec_ctx, cur_thread_count)];
+ grpc_error *error, bool is_short) {
+ bool retry_push;
+ if (is_short) {
+ GRPC_STATS_INC_EXECUTOR_SCHEDULED_SHORT_ITEMS(exec_ctx);
} else {
- GRPC_STATS_INC_EXECUTOR_SCHEDULED_TO_SELF(exec_ctx);
- }
- gpr_mu_lock(&ts->mu);
- if (grpc_closure_list_empty(ts->elems)) {
- GRPC_STATS_INC_EXECUTOR_WAKEUP_INITIATED(exec_ctx);
- gpr_cv_signal(&ts->cv);
+ GRPC_STATS_INC_EXECUTOR_SCHEDULED_LONG_ITEMS(exec_ctx);
}
- grpc_closure_list_append(&ts->elems, closure, error);
- ts->depth++;
- bool try_new_thread = ts->depth > MAX_DEPTH &&
- cur_thread_count < g_max_threads && !ts->shutdown;
- gpr_mu_unlock(&ts->mu);
- if (try_new_thread && gpr_spinlock_trylock(&g_adding_thread_lock)) {
- cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads);
- if (cur_thread_count < g_max_threads) {
- gpr_atm_no_barrier_store(&g_cur_threads, cur_thread_count + 1);
-
- gpr_thd_options opt = gpr_thd_options_default();
- gpr_thd_options_set_joinable(&opt);
- gpr_thd_new(&g_thread_state[cur_thread_count].id, executor_thread,
- &g_thread_state[cur_thread_count], &opt);
+ do {
+ retry_push = false;
+ size_t cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads);
+ if (cur_thread_count == 0) {
+ if (GRPC_TRACER_ON(executor_trace)) {
+#ifndef NDEBUG
+ gpr_log(GPR_DEBUG, "EXECUTOR: schedule %p (created %s:%d) inline",
+ closure, closure->file_created, closure->line_created);
+#else
+ gpr_log(GPR_DEBUG, "EXECUTOR: schedule %p inline", closure);
+#endif
+ }
+ grpc_closure_list_append(&exec_ctx->closure_list, closure, error);
+ return;
}
- gpr_spinlock_unlock(&g_adding_thread_lock);
- }
+ thread_state *ts = (thread_state *)gpr_tls_get(&g_this_thread_state);
+ if (ts == NULL) {
+ ts = &g_thread_state[GPR_HASH_POINTER(exec_ctx, cur_thread_count)];
+ } else {
+ GRPC_STATS_INC_EXECUTOR_SCHEDULED_TO_SELF(exec_ctx);
+ }
+ thread_state *orig_ts = ts;
+
+ bool try_new_thread;
+ for (;;) {
+ if (GRPC_TRACER_ON(executor_trace)) {
+#ifndef NDEBUG
+ gpr_log(
+ GPR_DEBUG,
+ "EXECUTOR: try to schedule %p (%s) (created %s:%d) to thread %d",
+ closure, is_short ? "short" : "long", closure->file_created,
+ closure->line_created, (int)(ts - g_thread_state));
+#else
+ gpr_log(GPR_DEBUG, "EXECUTOR: try to schedule %p (%s) to thread %d",
+ closure, is_short ? "short" : "long",
+ (int)(ts - g_thread_state));
+#endif
+ }
+ gpr_mu_lock(&ts->mu);
+ if (ts->queued_long_job) {
+ // if there's a long job queued, we never queue anything else to this
+ // queue (since long jobs can take 'infinite' time and we need to
+ // guarantee no starvation)
+ // ... spin through queues and try again
+ gpr_mu_unlock(&ts->mu);
+ size_t idx = (size_t)(ts - g_thread_state);
+ ts = &g_thread_state[(idx + 1) % cur_thread_count];
+ if (ts == orig_ts) {
+ retry_push = true;
+ try_new_thread = true;
+ break;
+ }
+ continue;
+ }
+ if (grpc_closure_list_empty(ts->elems)) {
+ GRPC_STATS_INC_EXECUTOR_WAKEUP_INITIATED(exec_ctx);
+ gpr_cv_signal(&ts->cv);
+ }
+ grpc_closure_list_append(&ts->elems, closure, error);
+ ts->depth++;
+ try_new_thread = ts->depth > MAX_DEPTH &&
+ cur_thread_count < g_max_threads && !ts->shutdown;
+ if (!is_short) ts->queued_long_job = true;
+ gpr_mu_unlock(&ts->mu);
+ break;
+ }
+ if (try_new_thread && gpr_spinlock_trylock(&g_adding_thread_lock)) {
+ cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads);
+ if (cur_thread_count < g_max_threads) {
+ gpr_atm_no_barrier_store(&g_cur_threads, cur_thread_count + 1);
+
+ gpr_thd_options opt = gpr_thd_options_default();
+ gpr_thd_options_set_joinable(&opt);
+ gpr_thd_new(&g_thread_state[cur_thread_count].id, executor_thread,
+ &g_thread_state[cur_thread_count], &opt);
+ }
+ gpr_spinlock_unlock(&g_adding_thread_lock);
+ }
+ if (retry_push) {
+ GRPC_STATS_INC_EXECUTOR_PUSH_RETRIES(exec_ctx);
+ }
+ } while (retry_push);
}
-static const grpc_closure_scheduler_vtable executor_vtable = {
- executor_push, executor_push, "executor"};
-static grpc_closure_scheduler executor_scheduler = {&executor_vtable};
-grpc_closure_scheduler *grpc_executor_scheduler = &executor_scheduler;
+static void executor_push_short(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
+ grpc_error *error) {
+ executor_push(exec_ctx, closure, error, true);
+}
+
+static void executor_push_long(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
+ grpc_error *error) {
+ executor_push(exec_ctx, closure, error, false);
+}
+
+static const grpc_closure_scheduler_vtable executor_vtable_short = {
+ executor_push_short, executor_push_short, "executor"};
+static grpc_closure_scheduler executor_scheduler_short = {
+ &executor_vtable_short};
+
+static const grpc_closure_scheduler_vtable executor_vtable_long = {
+ executor_push_long, executor_push_long, "executor"};
+static grpc_closure_scheduler executor_scheduler_long = {&executor_vtable_long};
+
+grpc_closure_scheduler *grpc_executor_scheduler(
+ grpc_executor_job_length length) {
+ return length == GRPC_EXECUTOR_SHORT ? &executor_scheduler_short
+ : &executor_scheduler_long;
+}
diff --git a/src/core/lib/iomgr/executor.h b/src/core/lib/iomgr/executor.h
index c3382a0a12..0412c02790 100644
--- a/src/core/lib/iomgr/executor.h
+++ b/src/core/lib/iomgr/executor.h
@@ -21,6 +21,11 @@
#include "src/core/lib/iomgr/closure.h"
+typedef enum {
+ GRPC_EXECUTOR_SHORT,
+ GRPC_EXECUTOR_LONG
+} grpc_executor_job_length;
+
/** Initialize the global executor.
*
* This mechanism is meant to outsource work (grpc_closure instances) to a
@@ -28,7 +33,7 @@
* non-blocking solution available. */
void grpc_executor_init(grpc_exec_ctx *exec_ctx);
-extern grpc_closure_scheduler *grpc_executor_scheduler;
+grpc_closure_scheduler *grpc_executor_scheduler(grpc_executor_job_length);
/** Shutdown the executor, running all pending work as part of the call */
void grpc_executor_shutdown(grpc_exec_ctx *exec_ctx);
diff --git a/src/core/lib/iomgr/resolve_address_posix.c b/src/core/lib/iomgr/resolve_address_posix.c
index b515b8f1e6..082e3b7947 100644
--- a/src/core/lib/iomgr/resolve_address_posix.c
+++ b/src/core/lib/iomgr/resolve_address_posix.c
@@ -177,7 +177,7 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
grpc_resolved_addresses **addrs) {
request *r = (request *)gpr_malloc(sizeof(request));
GRPC_CLOSURE_INIT(&r->request_closure, do_request_thread, r,
- grpc_executor_scheduler);
+ grpc_executor_scheduler(GRPC_EXECUTOR_SHORT));
r->name = gpr_strdup(name);
r->default_port = gpr_strdup(default_port);
r->on_done = on_done;
diff --git a/src/core/lib/iomgr/resolve_address_windows.c b/src/core/lib/iomgr/resolve_address_windows.c
index 45cfd7248d..0cb0029f4e 100644
--- a/src/core/lib/iomgr/resolve_address_windows.c
+++ b/src/core/lib/iomgr/resolve_address_windows.c
@@ -159,7 +159,7 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
grpc_resolved_addresses **addresses) {
request *r = gpr_malloc(sizeof(request));
GRPC_CLOSURE_INIT(&r->request_closure, do_request_thread, r,
- grpc_executor_scheduler);
+ grpc_executor_scheduler(GRPC_EXECUTOR_SHORT));
r->name = gpr_strdup(name);
r->default_port = gpr_strdup(default_port);
r->on_done = on_done;
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index ba0be9116f..7e271294fd 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -43,6 +43,7 @@
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/ev_posix.h"
+#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
@@ -90,8 +91,8 @@ typedef struct {
grpc_closure *release_fd_cb;
int *release_fd;
- grpc_closure read_closure;
- grpc_closure write_closure;
+ grpc_closure read_done_closure;
+ grpc_closure write_done_closure;
char *peer_string;
@@ -99,6 +100,148 @@ typedef struct {
grpc_resource_user_slice_allocator slice_allocator;
} grpc_tcp;
+typedef struct backup_poller {
+ gpr_mu *pollset_mu;
+ grpc_closure run_poller;
+} backup_poller;
+
+#define BACKUP_POLLER_POLLSET(b) ((grpc_pollset *)((b) + 1))
+
+static gpr_atm g_uncovered_notifications_pending;
+static gpr_atm g_backup_poller; /* backup_poller* */
+
+static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
+ grpc_error *error);
+static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
+ grpc_error *error);
+static void tcp_drop_uncovered_then_handle_write(grpc_exec_ctx *exec_ctx,
+ void *arg /* grpc_tcp */,
+ grpc_error *error);
+
+static void done_poller(grpc_exec_ctx *exec_ctx, void *bp,
+ grpc_error *error_ignored) {
+ backup_poller *p = (backup_poller *)bp;
+ if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p destroy", p);
+ }
+ grpc_pollset_destroy(exec_ctx, BACKUP_POLLER_POLLSET(p));
+ gpr_free(p);
+}
+
+static void run_poller(grpc_exec_ctx *exec_ctx, void *bp,
+ grpc_error *error_ignored) {
+ backup_poller *p = (backup_poller *)bp;
+ if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p run", p);
+ }
+ gpr_mu_lock(p->pollset_mu);
+ gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
+ gpr_timespec deadline =
+ gpr_time_add(now, gpr_time_from_seconds(10, GPR_TIMESPAN));
+ GRPC_STATS_INC_TCP_BACKUP_POLLER_POLLS(exec_ctx);
+ GRPC_LOG_IF_ERROR("backup_poller:pollset_work",
+ grpc_pollset_work(exec_ctx, BACKUP_POLLER_POLLSET(p), NULL,
+ now, deadline));
+ gpr_mu_unlock(p->pollset_mu);
+ /* last "uncovered" notification is the ref that keeps us polling, if we get
+ * there try a cas to release it */
+ if (gpr_atm_no_barrier_load(&g_uncovered_notifications_pending) == 1 &&
+ gpr_atm_full_cas(&g_uncovered_notifications_pending, 1, 0)) {
+ gpr_mu_lock(p->pollset_mu);
+ bool cas_ok = gpr_atm_full_cas(&g_backup_poller, (gpr_atm)p, 0);
+ if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p done cas_ok=%d", p, cas_ok);
+ }
+ gpr_mu_unlock(p->pollset_mu);
+ if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p shutdown", p);
+ }
+ grpc_pollset_shutdown(exec_ctx, BACKUP_POLLER_POLLSET(p),
+ GRPC_CLOSURE_INIT(&p->run_poller, done_poller, p,
+ grpc_schedule_on_exec_ctx));
+ } else {
+ if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p reschedule", p);
+ }
+ GRPC_CLOSURE_SCHED(exec_ctx, &p->run_poller, GRPC_ERROR_NONE);
+ }
+}
+
+static void drop_uncovered(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
+ backup_poller *p = (backup_poller *)gpr_atm_acq_load(&g_backup_poller);
+ gpr_atm old_count =
+ gpr_atm_no_barrier_fetch_add(&g_uncovered_notifications_pending, -1);
+ if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p uncover cnt %d->%d", p, (int)old_count,
+ (int)old_count - 1);
+ }
+ GPR_ASSERT(old_count != 1);
+}
+
+static void cover_self(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
+ backup_poller *p;
+ gpr_atm old_count =
+ gpr_atm_no_barrier_fetch_add(&g_uncovered_notifications_pending, 2);
+ if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ gpr_log(GPR_DEBUG, "BACKUP_POLLER: cover cnt %d->%d", (int)old_count,
+ 2 + (int)old_count);
+ }
+ if (old_count == 0) {
+ GRPC_STATS_INC_TCP_BACKUP_POLLERS_CREATED(exec_ctx);
+ p = (backup_poller *)gpr_malloc(sizeof(*p) + grpc_pollset_size());
+ if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p create", p);
+ }
+ grpc_pollset_init(BACKUP_POLLER_POLLSET(p), &p->pollset_mu);
+ gpr_atm_rel_store(&g_backup_poller, (gpr_atm)p);
+ GRPC_CLOSURE_SCHED(
+ exec_ctx,
+ GRPC_CLOSURE_INIT(&p->run_poller, run_poller, p,
+ grpc_executor_scheduler(GRPC_EXECUTOR_LONG)),
+ GRPC_ERROR_NONE);
+ } else {
+ while ((p = (backup_poller *)gpr_atm_acq_load(&g_backup_poller)) == NULL) {
+ // spin waiting for backup poller
+ }
+ }
+ if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p add %p", p, tcp);
+ }
+ grpc_pollset_add_fd(exec_ctx, BACKUP_POLLER_POLLSET(p), tcp->em_fd);
+ if (old_count != 0) {
+ drop_uncovered(exec_ctx, tcp);
+ }
+}
+
+static void notify_on_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
+ if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ gpr_log(GPR_DEBUG, "TCP:%p notify_on_read", tcp);
+ }
+ GRPC_CLOSURE_INIT(&tcp->read_done_closure, tcp_handle_read, tcp,
+ grpc_schedule_on_exec_ctx);
+ grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_done_closure);
+}
+
+static void notify_on_write(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
+ if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ gpr_log(GPR_DEBUG, "TCP:%p notify_on_write", tcp);
+ }
+ cover_self(exec_ctx, tcp);
+ GRPC_CLOSURE_INIT(&tcp->write_done_closure,
+ tcp_drop_uncovered_then_handle_write, tcp,
+ grpc_schedule_on_exec_ctx);
+ grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_done_closure);
+}
+
+static void tcp_drop_uncovered_then_handle_write(grpc_exec_ctx *exec_ctx,
+ void *arg, grpc_error *error) {
+ if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ gpr_log(GPR_DEBUG, "TCP:%p got_write: %s", arg, grpc_error_string(error));
+ }
+ drop_uncovered(exec_ctx, (grpc_tcp *)arg);
+ tcp_handle_write(exec_ctx, arg, error);
+}
+
static void add_to_estimate(grpc_tcp *tcp, size_t bytes) {
tcp->bytes_read_this_round += (double)bytes;
}
@@ -214,6 +357,7 @@ static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp,
grpc_closure *cb = tcp->read_cb;
if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ gpr_log(GPR_DEBUG, "TCP:%p call_cb %p %p:%p", tcp, cb, cb->cb, cb->cb_arg);
size_t i;
const char *str = grpc_error_string(error);
gpr_log(GPR_DEBUG, "read: error=%s", str);
@@ -271,7 +415,7 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
if (errno == EAGAIN) {
finish_estimate(tcp);
/* We've consumed the edge, request a new one */
- grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure);
+ notify_on_read(exec_ctx, tcp);
} else {
grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
tcp->incoming_buffer);
@@ -308,6 +452,10 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
static void tcp_read_allocation_done(grpc_exec_ctx *exec_ctx, void *tcpp,
grpc_error *error) {
grpc_tcp *tcp = (grpc_tcp *)tcpp;
+ if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ gpr_log(GPR_DEBUG, "TCP:%p read_allocation_done: %s", tcp,
+ grpc_error_string(error));
+ }
if (error != GRPC_ERROR_NONE) {
grpc_slice_buffer_reset_and_unref_internal(exec_ctx, tcp->incoming_buffer);
grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
@@ -323,9 +471,15 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
size_t target_read_size = get_target_read_size(tcp);
if (tcp->incoming_buffer->length < target_read_size &&
tcp->incoming_buffer->count < MAX_READ_IOVEC) {
+ if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ gpr_log(GPR_DEBUG, "TCP:%p alloc_slices", tcp);
+ }
grpc_resource_user_alloc_slices(exec_ctx, &tcp->slice_allocator,
target_read_size, 1, tcp->incoming_buffer);
} else {
+ if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ gpr_log(GPR_DEBUG, "TCP:%p do_read", tcp);
+ }
tcp_do_read(exec_ctx, tcp);
}
}
@@ -334,6 +488,9 @@ static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
grpc_error *error) {
grpc_tcp *tcp = (grpc_tcp *)arg;
GPR_ASSERT(!tcp->finished_edge);
+ if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ gpr_log(GPR_DEBUG, "TCP:%p got_read: %s", tcp, grpc_error_string(error));
+ }
if (error != GRPC_ERROR_NONE) {
grpc_slice_buffer_reset_and_unref_internal(exec_ctx, tcp->incoming_buffer);
@@ -357,9 +514,9 @@ static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
TCP_REF(tcp, "read");
if (tcp->finished_edge) {
tcp->finished_edge = false;
- grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure);
+ notify_on_read(exec_ctx, tcp);
} else {
- GRPC_CLOSURE_SCHED(exec_ctx, &tcp->read_closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, &tcp->read_done_closure, GRPC_ERROR_NONE);
}
}
@@ -472,7 +629,7 @@ static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
if (GRPC_TRACER_ON(grpc_tcp_trace)) {
gpr_log(GPR_DEBUG, "write: delayed");
}
- grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure);
+ notify_on_write(exec_ctx, tcp);
} else {
cb = tcp->write_cb;
tcp->write_cb = NULL;
@@ -525,7 +682,7 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
if (GRPC_TRACER_ON(grpc_tcp_trace)) {
gpr_log(GPR_DEBUG, "write: delayed");
}
- grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure);
+ notify_on_write(exec_ctx, tcp);
} else {
if (GRPC_TRACER_ON(grpc_tcp_trace)) {
const char *str = grpc_error_string(error);
@@ -602,7 +759,7 @@ grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_fd *em_fd,
strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
resource_quota = grpc_resource_quota_ref_internal(
- channel_args->args[i].value.pointer.p);
+ (grpc_resource_quota *)channel_args->args[i].value.pointer.p);
}
}
}
@@ -631,10 +788,6 @@ grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_fd *em_fd,
gpr_ref_init(&tcp->refcount, 1);
gpr_atm_no_barrier_store(&tcp->shutdown_count, 0);
tcp->em_fd = em_fd;
- GRPC_CLOSURE_INIT(&tcp->read_closure, tcp_handle_read, tcp,
- grpc_schedule_on_exec_ctx);
- GRPC_CLOSURE_INIT(&tcp->write_closure, tcp_handle_write, tcp,
- grpc_schedule_on_exec_ctx);
grpc_slice_buffer_init(&tcp->last_read_buffer);
tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string);
grpc_resource_user_slice_allocator_init(
diff --git a/src/core/lib/security/transport/security_handshaker.c b/src/core/lib/security/transport/security_handshaker.c
index ea9608f444..975d599523 100644
--- a/src/core/lib/security/transport/security_handshaker.c
+++ b/src/core/lib/security/transport/security_handshaker.c
@@ -128,13 +128,11 @@ static void security_handshake_failed_locked(grpc_exec_ctx *exec_ctx,
GRPC_CLOSURE_SCHED(exec_ctx, h->on_handshake_done, error);
}
-static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- security_handshaker *h = arg;
- gpr_mu_lock(&h->mu);
+static void on_peer_checked_inner(grpc_exec_ctx *exec_ctx,
+ security_handshaker *h, grpc_error *error) {
if (error != GRPC_ERROR_NONE || h->shutdown) {
security_handshake_failed_locked(exec_ctx, h, GRPC_ERROR_REF(error));
- goto done;
+ return;
}
// Create zero-copy frame protector, if implemented.
tsi_zero_copy_grpc_protector *zero_copy_protector = NULL;
@@ -146,7 +144,7 @@ static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg,
"Zero-copy frame protector creation failed"),
result);
security_handshake_failed_locked(exec_ctx, h, error);
- goto done;
+ return;
}
// Create frame protector if zero-copy frame protector is NULL.
tsi_frame_protector *protector = NULL;
@@ -158,7 +156,7 @@ static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg,
"Frame protector creation failed"),
result);
security_handshake_failed_locked(exec_ctx, h, error);
- goto done;
+ return;
}
}
// Get unused bytes.
@@ -192,7 +190,13 @@ static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg,
// Set shutdown to true so that subsequent calls to
// security_handshaker_shutdown() do nothing.
h->shutdown = true;
-done:
+}
+
+static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ security_handshaker *h = (security_handshaker *)arg;
+ gpr_mu_lock(&h->mu);
+ on_peer_checked_inner(exec_ctx, h, error);
gpr_mu_unlock(&h->mu);
security_handshaker_unref(exec_ctx, h);
}
@@ -254,7 +258,7 @@ static grpc_error *on_handshake_next_done_locked(
static void on_handshake_next_done_grpc_wrapper(
tsi_result result, void *user_data, const unsigned char *bytes_to_send,
size_t bytes_to_send_size, tsi_handshaker_result *handshaker_result) {
- security_handshaker *h = user_data;
+ security_handshaker *h = (security_handshaker *)user_data;
// This callback will be invoked by TSI in a non-grpc thread, so it's
// safe to create our own exec_ctx here.
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
@@ -296,7 +300,7 @@ static grpc_error *do_handshaker_next_locked(
static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx,
void *arg, grpc_error *error) {
- security_handshaker *h = arg;
+ security_handshaker *h = (security_handshaker *)arg;
gpr_mu_lock(&h->mu);
if (error != GRPC_ERROR_NONE || h->shutdown) {
security_handshake_failed_locked(
@@ -313,7 +317,8 @@ static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx,
bytes_received_size += GRPC_SLICE_LENGTH(h->args->read_buffer->slices[i]);
}
if (bytes_received_size > h->handshake_buffer_size) {
- h->handshake_buffer = gpr_realloc(h->handshake_buffer, bytes_received_size);
+ h->handshake_buffer =
+ (uint8_t *)gpr_realloc(h->handshake_buffer, bytes_received_size);
h->handshake_buffer_size = bytes_received_size;
}
size_t offset = 0;
@@ -338,7 +343,7 @@ static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx,
static void on_handshake_data_sent_to_peer(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
- security_handshaker *h = arg;
+ security_handshaker *h = (security_handshaker *)arg;
gpr_mu_lock(&h->mu);
if (error != GRPC_ERROR_NONE || h->shutdown) {
security_handshake_failed_locked(
@@ -415,14 +420,15 @@ static const grpc_handshaker_vtable security_handshaker_vtable = {
static grpc_handshaker *security_handshaker_create(
grpc_exec_ctx *exec_ctx, tsi_handshaker *handshaker,
grpc_security_connector *connector) {
- security_handshaker *h = gpr_zalloc(sizeof(security_handshaker));
+ security_handshaker *h =
+ (security_handshaker *)gpr_zalloc(sizeof(security_handshaker));
grpc_handshaker_init(&security_handshaker_vtable, &h->base);
h->handshaker = handshaker;
h->connector = GRPC_SECURITY_CONNECTOR_REF(connector, "handshake");
gpr_mu_init(&h->mu);
gpr_ref_init(&h->refs, 1);
h->handshake_buffer_size = GRPC_INITIAL_HANDSHAKE_BUFFER_SIZE;
- h->handshake_buffer = gpr_malloc(h->handshake_buffer_size);
+ h->handshake_buffer = (uint8_t *)gpr_malloc(h->handshake_buffer_size);
GRPC_CLOSURE_INIT(&h->on_handshake_data_sent_to_peer,
on_handshake_data_sent_to_peer, h,
grpc_schedule_on_exec_ctx);
@@ -465,7 +471,7 @@ static const grpc_handshaker_vtable fail_handshaker_vtable = {
fail_handshaker_do_handshake};
static grpc_handshaker *fail_handshaker_create() {
- grpc_handshaker *h = gpr_malloc(sizeof(*h));
+ grpc_handshaker *h = (grpc_handshaker *)gpr_malloc(sizeof(*h));
grpc_handshaker_init(&fail_handshaker_vtable, h);
return h;
}
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index a8eb1b0425..f95cc10a6a 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -1117,9 +1117,11 @@ void grpc_server_start(grpc_server *server) {
server_ref(server);
server->starting = true;
- GRPC_CLOSURE_SCHED(&exec_ctx, GRPC_CLOSURE_CREATE(start_listeners, server,
- grpc_executor_scheduler),
- GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(
+ &exec_ctx,
+ GRPC_CLOSURE_CREATE(start_listeners, server,
+ grpc_executor_scheduler(GRPC_EXECUTOR_SHORT)),
+ GRPC_ERROR_NONE);
grpc_exec_ctx_finish(&exec_ctx);
}
diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c
index 5f6302ad00..caa11a956e 100644
--- a/src/core/lib/transport/transport.c
+++ b/src/core/lib/transport/transport.c
@@ -72,7 +72,8 @@ void grpc_stream_unref(grpc_exec_ctx *exec_ctx,
cope with.
Throw this over to the executor (on a core-owned thread) and process it
there. */
- refcount->destroy.scheduler = grpc_executor_scheduler;
+ refcount->destroy.scheduler =
+ grpc_executor_scheduler(GRPC_EXECUTOR_SHORT);
}
GRPC_CLOSURE_SCHED(exec_ctx, &refcount->destroy, GRPC_ERROR_NONE);
}