aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/filters/client_channel/http_connect_handshaker.c12
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c223
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_window_update.c5
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h29
-rw-r--r--src/core/ext/transport/chttp2/transport/parsing.c12
-rw-r--r--src/core/ext/transport/chttp2/transport/writing.c47
-rw-r--r--src/core/lib/http/httpcli.c13
-rw-r--r--src/core/lib/iomgr/combiner.c3
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.c12
-rw-r--r--src/core/lib/iomgr/executor.c158
-rw-r--r--src/core/lib/iomgr/executor.h7
-rw-r--r--src/core/lib/iomgr/resolve_address_posix.c2
-rw-r--r--src/core/lib/iomgr/resolve_address_windows.c2
-rw-r--r--src/core/lib/iomgr/tcp_posix.c176
-rw-r--r--src/core/lib/security/transport/security_handshaker.c34
-rw-r--r--src/core/lib/surface/server.c8
-rw-r--r--src/core/lib/transport/transport.c3
17 files changed, 534 insertions, 212 deletions
diff --git a/src/core/ext/filters/client_channel/http_connect_handshaker.c b/src/core/ext/filters/client_channel/http_connect_handshaker.c
index 0952dc6d4e..418bb41ef6 100644
--- a/src/core/ext/filters/client_channel/http_connect_handshaker.c
+++ b/src/core/ext/filters/client_channel/http_connect_handshaker.c
@@ -124,7 +124,7 @@ static void handshake_failed_locked(grpc_exec_ctx* exec_ctx,
// Callback invoked when finished writing HTTP CONNECT request.
static void on_write_done(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
- http_connect_handshaker* handshaker = arg;
+ http_connect_handshaker* handshaker = (http_connect_handshaker*)arg;
gpr_mu_lock(&handshaker->mu);
if (error != GRPC_ERROR_NONE || handshaker->shutdown) {
// If the write failed or we're shutting down, clean up and invoke the
@@ -145,7 +145,7 @@ static void on_write_done(grpc_exec_ctx* exec_ctx, void* arg,
// Callback invoked for reading HTTP CONNECT response.
static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
- http_connect_handshaker* handshaker = arg;
+ http_connect_handshaker* handshaker = (http_connect_handshaker*)arg;
gpr_mu_lock(&handshaker->mu);
if (error != GRPC_ERROR_NONE || handshaker->shutdown) {
// If the read failed or we're shutting down, clean up and invoke the
@@ -281,7 +281,8 @@ static void http_connect_handshaker_do_handshake(
GPR_ASSERT(arg->type == GRPC_ARG_STRING);
gpr_string_split(arg->value.string, "\n", &header_strings,
&num_header_strings);
- headers = gpr_malloc(sizeof(grpc_http_header) * num_header_strings);
+ headers = (grpc_http_header*)gpr_malloc(sizeof(grpc_http_header) *
+ num_header_strings);
for (size_t i = 0; i < num_header_strings; ++i) {
char* sep = strchr(header_strings[i], ':');
if (sep == NULL) {
@@ -308,7 +309,7 @@ static void http_connect_handshaker_do_handshake(
grpc_httpcli_request request;
memset(&request, 0, sizeof(request));
request.host = server_name;
- request.http.method = "CONNECT";
+ request.http.method = (char*)"CONNECT";
request.http.path = server_name;
request.http.hdrs = headers;
request.http.hdr_count = num_headers;
@@ -333,7 +334,8 @@ static const grpc_handshaker_vtable http_connect_handshaker_vtable = {
http_connect_handshaker_do_handshake};
static grpc_handshaker* grpc_http_connect_handshaker_create() {
- http_connect_handshaker* handshaker = gpr_malloc(sizeof(*handshaker));
+ http_connect_handshaker* handshaker =
+ (http_connect_handshaker*)gpr_malloc(sizeof(*handshaker));
memset(handshaker, 0, sizeof(*handshaker));
grpc_handshaker_init(&http_connect_handshaker_vtable, &handshaker->base);
gpr_mu_init(&handshaker->mu);
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 7541bd5c92..9d9e4ea2ba 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -83,8 +83,6 @@ grpc_tracer_flag grpc_trace_chttp2_refcount =
GRPC_TRACER_INITIALIZER(false, "chttp2_refcount");
#endif
-static const grpc_transport_vtable vtable;
-
/* forward declarations of various callbacks that we'll build closures around */
static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *t,
grpc_error *error);
@@ -247,6 +245,8 @@ void grpc_chttp2_unref_transport(grpc_exec_ctx *exec_ctx,
void grpc_chttp2_ref_transport(grpc_chttp2_transport *t) { gpr_ref(&t->refs); }
#endif
+static const grpc_transport_vtable *get_vtable(void);
+
static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
const grpc_channel_args *channel_args,
grpc_endpoint *ep, bool is_client) {
@@ -256,7 +256,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) ==
GRPC_CHTTP2_CLIENT_CONNECT_STRLEN);
- t->base.vtable = &vtable;
+ t->base.vtable = get_vtable();
t->ep = ep;
/* one ref is for destroy */
gpr_ref_init(&t->refs, 1);
@@ -556,11 +556,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
}
- GRPC_CLOSURE_INIT(&t->write_action, write_action, t,
- t->opt_target == GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT
- ? grpc_executor_scheduler
- : grpc_schedule_on_exec_ctx);
-
t->ping_state.pings_before_data_required =
t->ping_policy.max_pings_without_data;
t->ping_state.is_delayed_ping_timer_set = false;
@@ -588,7 +583,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
static void destroy_transport_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error) {
- grpc_chttp2_transport *t = tp;
+ grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp;
t->destroying = 1;
close_transport_locked(
exec_ctx, t,
@@ -714,7 +709,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
grpc_error *error) {
- grpc_chttp2_stream *s = sp;
+ grpc_chttp2_stream *s = (grpc_chttp2_stream *)sp;
grpc_chttp2_transport *t = s->t;
GPR_TIMER_BEGIN("destroy_stream", 0);
@@ -798,7 +793,7 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_chttp2_stream *grpc_chttp2_parsing_lookup_stream(grpc_chttp2_transport *t,
uint32_t id) {
- return grpc_chttp2_stream_map_find(&t->stream_map, id);
+ return (grpc_chttp2_stream *)grpc_chttp2_stream_map_find(&t->stream_map, id);
}
grpc_chttp2_stream *grpc_chttp2_parsing_accept_stream(grpc_exec_ctx *exec_ctx,
@@ -857,6 +852,7 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
switch (t->write_state) {
case GRPC_CHTTP2_WRITE_STATE_IDLE:
set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING, reason);
+ t->is_first_write_in_batch = true;
GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
GRPC_CLOSURE_SCHED(
exec_ctx,
@@ -875,52 +871,91 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
GPR_TIMER_END("grpc_chttp2_initiate_write", 0);
}
-void grpc_chttp2_become_writable(
- grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s,
- grpc_chttp2_stream_write_type stream_write_type, const char *reason) {
+void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s,
+ bool also_initiate_write, const char *reason) {
if (!t->closed && grpc_chttp2_list_add_writable_stream(t, s)) {
GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:become");
}
- switch (stream_write_type) {
- case GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK:
- break;
- case GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED:
- grpc_chttp2_initiate_write(exec_ctx, t, reason);
- break;
- case GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED:
- grpc_chttp2_initiate_write(exec_ctx, t, reason);
- break;
+ if (also_initiate_write) {
+ grpc_chttp2_initiate_write(exec_ctx, t, reason);
+ }
+}
+
+static grpc_closure_scheduler *write_scheduler(grpc_chttp2_transport *t,
+ bool early_results_scheduled,
+ bool partial_write) {
+ /* if it's not the first write in a batch, always offload to the executor:
+ we'll probably end up queuing against the kernel anyway, so we'll likely
+ get better latency overall if we switch writing work elsewhere and continue
+ with application work above */
+ if (!t->is_first_write_in_batch) {
+ return grpc_executor_scheduler(GRPC_EXECUTOR_SHORT);
}
+ /* equivalently, if it's a partial write, we *know* we're going to be taking a
+ thread jump to write it because of the above, may as well do so
+ immediately */
+ if (partial_write) {
+ return grpc_executor_scheduler(GRPC_EXECUTOR_SHORT);
+ }
+ switch (t->opt_target) {
+ case GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT:
+ /* executor gives us the largest probability of being able to batch a
+ * write with others on this transport */
+ return grpc_executor_scheduler(GRPC_EXECUTOR_SHORT);
+ case GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY:
+ return grpc_schedule_on_exec_ctx;
+ }
+ GPR_UNREACHABLE_CODE(return NULL);
+}
+
+#define WRITE_STATE_TUPLE_TO_INT(p, i) (2 * (int)(p) + (int)(i))
+static const char *begin_writing_desc(bool partial, bool inlined) {
+ switch (WRITE_STATE_TUPLE_TO_INT(partial, inlined)) {
+ case WRITE_STATE_TUPLE_TO_INT(false, false):
+ return "begin write in background";
+ case WRITE_STATE_TUPLE_TO_INT(false, true):
+ return "begin write in current thread";
+ case WRITE_STATE_TUPLE_TO_INT(true, false):
+ return "begin partial write in background";
+ case WRITE_STATE_TUPLE_TO_INT(true, true):
+ return "begin partial write in current thread";
+ }
+ GPR_UNREACHABLE_CODE(return "bad state tuple");
}
static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt,
grpc_error *error_ignored) {
GPR_TIMER_BEGIN("write_action_begin_locked", 0);
- grpc_chttp2_transport *t = gt;
+ grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE);
- switch (t->closed ? GRPC_CHTTP2_NOTHING_TO_WRITE
- : grpc_chttp2_begin_write(exec_ctx, t)) {
- case GRPC_CHTTP2_NOTHING_TO_WRITE:
- set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_IDLE,
- "begin writing nothing");
- GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "writing");
- break;
- case GRPC_CHTTP2_PARTIAL_WRITE:
- set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE,
- "begin writing partial");
- GRPC_CLOSURE_SCHED(exec_ctx, &t->write_action, GRPC_ERROR_NONE);
- break;
- case GRPC_CHTTP2_FULL_WRITE:
- set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING,
- "begin writing");
- GRPC_CLOSURE_SCHED(exec_ctx, &t->write_action, GRPC_ERROR_NONE);
- break;
+ grpc_chttp2_begin_write_result r;
+ if (t->closed) {
+ r.writing = false;
+ } else {
+ r = grpc_chttp2_begin_write(exec_ctx, t);
+ }
+ if (r.writing) {
+ grpc_closure_scheduler *scheduler =
+ write_scheduler(t, r.early_results_scheduled, r.partial);
+ set_write_state(
+ exec_ctx, t, r.partial ? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE
+ : GRPC_CHTTP2_WRITE_STATE_WRITING,
+ begin_writing_desc(r.partial, scheduler == grpc_schedule_on_exec_ctx));
+ GRPC_CLOSURE_SCHED(exec_ctx, GRPC_CLOSURE_INIT(&t->write_action,
+ write_action, t, scheduler),
+ GRPC_ERROR_NONE);
+ } else {
+ set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_IDLE,
+ "begin writing nothing");
+ GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "writing");
}
GPR_TIMER_END("write_action_begin_locked", 0);
}
static void write_action(grpc_exec_ctx *exec_ctx, void *gt, grpc_error *error) {
- grpc_chttp2_transport *t = gt;
+ grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
GPR_TIMER_BEGIN("write_action", 0);
grpc_endpoint_write(
exec_ctx, t->ep, &t->outbuf,
@@ -932,7 +967,7 @@ static void write_action(grpc_exec_ctx *exec_ctx, void *gt, grpc_error *error) {
static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error) {
GPR_TIMER_BEGIN("terminate_writing_with_lock", 0);
- grpc_chttp2_transport *t = tp;
+ grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp;
if (error != GRPC_ERROR_NONE) {
close_transport_locked(exec_ctx, t, GRPC_ERROR_REF(error));
@@ -957,7 +992,8 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp,
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
GPR_TIMER_MARK("state=writing_stale_no_poller", 0);
set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING,
- "continue writing [!covered]");
+ "continue writing");
+ t->is_first_write_in_batch = false;
GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
GRPC_CLOSURE_RUN(
exec_ctx,
@@ -1059,9 +1095,7 @@ static void maybe_start_some_streams(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
post_destructive_reclaimer(exec_ctx, t);
- grpc_chttp2_become_writable(exec_ctx, t, s,
- GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED,
- "new_stream");
+ grpc_chttp2_become_writable(exec_ctx, t, s, true, "new_stream");
}
/* cancel out streams that will never be started */
while (t->next_stream_id >= MAX_CLIENT_STREAM_ID &&
@@ -1110,12 +1144,14 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx,
closure->next_data.scratch -= CLOSURE_BARRIER_FIRST_REF_BIT;
if (GRPC_TRACER_ON(grpc_http_trace)) {
const char *errstr = grpc_error_string(error);
- gpr_log(GPR_DEBUG,
- "complete_closure_step: %p refs=%d flags=0x%04x desc=%s err=%s",
- closure,
- (int)(closure->next_data.scratch / CLOSURE_BARRIER_FIRST_REF_BIT),
- (int)(closure->next_data.scratch % CLOSURE_BARRIER_FIRST_REF_BIT),
- desc, errstr);
+ gpr_log(
+ GPR_DEBUG,
+ "complete_closure_step: t=%p %p refs=%d flags=0x%04x desc=%s err=%s "
+ "write_state=%s",
+ t, closure,
+ (int)(closure->next_data.scratch / CLOSURE_BARRIER_FIRST_REF_BIT),
+ (int)(closure->next_data.scratch % CLOSURE_BARRIER_FIRST_REF_BIT), desc,
+ errstr, write_state_name(t->write_state));
}
if (error != GRPC_ERROR_NONE) {
if (closure->error_data.error == GRPC_ERROR_NONE) {
@@ -1156,9 +1192,7 @@ static void maybe_become_writable_due_to_send_msg(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream *s) {
if (s->id != 0 && (!s->write_buffering ||
s->flow_controlled_buffer.length > t->write_buffer_size)) {
- grpc_chttp2_become_writable(exec_ctx, t, s,
- GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED,
- "op.send_message");
+ grpc_chttp2_become_writable(exec_ctx, t, s, true, "op.send_message");
}
}
@@ -1190,15 +1224,19 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx,
} else {
grpc_chttp2_write_cb *cb = t->write_cb_pool;
if (cb == NULL) {
- cb = gpr_malloc(sizeof(*cb));
+ cb = (grpc_chttp2_write_cb *)gpr_malloc(sizeof(*cb));
} else {
t->write_cb_pool = cb->next;
}
cb->call_at_byte = notify_offset;
cb->closure = s->fetching_send_message_finished;
s->fetching_send_message_finished = NULL;
- cb->next = s->on_write_finished_cbs;
- s->on_write_finished_cbs = cb;
+ grpc_chttp2_write_cb **list =
+ s->fetching_send_message->flags & GRPC_WRITE_THROUGH
+ ? &s->on_write_finished_cbs
+ : &s->on_flow_controlled_cbs;
+ cb->next = *list;
+ *list = cb;
}
s->fetching_send_message = NULL;
return; /* early out */
@@ -1218,7 +1256,7 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx,
static void complete_fetch_locked(grpc_exec_ctx *exec_ctx, void *gs,
grpc_error *error) {
- grpc_chttp2_stream *s = gs;
+ grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
grpc_chttp2_transport *t = s->t;
if (error == GRPC_ERROR_NONE) {
error = grpc_byte_stream_pull(exec_ctx, s->fetching_send_message,
@@ -1253,8 +1291,9 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
grpc_error *error_ignored) {
GPR_TIMER_BEGIN("perform_stream_op_locked", 0);
- grpc_transport_stream_op_batch *op = stream_op;
- grpc_chttp2_stream *s = op->handler_private.extra_arg;
+ grpc_transport_stream_op_batch *op =
+ (grpc_transport_stream_op_batch *)stream_op;
+ grpc_chttp2_stream *s = (grpc_chttp2_stream *)op->handler_private.extra_arg;
grpc_transport_stream_op_batch_payload *op_payload = op->payload;
grpc_chttp2_transport *t = s->t;
@@ -1350,14 +1389,13 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
}
} else {
GPR_ASSERT(s->id != 0);
- grpc_chttp2_stream_write_type write_type =
- GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED;
+ bool initiate_write = true;
if (op->send_message &&
(op->payload->send_message.send_message->flags &
GRPC_WRITE_BUFFER_HINT)) {
- write_type = GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK;
+ initiate_write = false;
}
- grpc_chttp2_become_writable(exec_ctx, t, s, write_type,
+ grpc_chttp2_become_writable(exec_ctx, t, s, initiate_write,
"op.send_initial_metadata");
}
} else {
@@ -1451,8 +1489,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
} else if (s->id != 0) {
/* TODO(ctiller): check if there's flow control for any outstanding
bytes before going writable */
- grpc_chttp2_become_writable(exec_ctx, t, s,
- GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED,
+ grpc_chttp2_become_writable(exec_ctx, t, s, true,
"op.send_trailing_metadata");
}
}
@@ -1572,7 +1609,7 @@ static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
static void retry_initiate_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error) {
- grpc_chttp2_transport *t = tp;
+ grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp;
t->ping_state.is_delayed_ping_timer_set = false;
grpc_chttp2_initiate_write(exec_ctx, t, "retry_send_ping");
}
@@ -1624,8 +1661,9 @@ void grpc_chttp2_add_ping_strike(grpc_exec_ctx *exec_ctx,
static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
void *stream_op,
grpc_error *error_ignored) {
- grpc_transport_op *op = stream_op;
- grpc_chttp2_transport *t = op->handler_private.extra_arg;
+ grpc_transport_op *op = (grpc_transport_op *)stream_op;
+ grpc_chttp2_transport *t =
+ (grpc_chttp2_transport *)op->handler_private.extra_arg;
grpc_error *close_transport = op->disconnect_with_error;
if (op->goaway_error) {
@@ -1838,7 +1876,8 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx,
static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
uint32_t id, grpc_error *error) {
- grpc_chttp2_stream *s = grpc_chttp2_stream_map_delete(&t->stream_map, id);
+ grpc_chttp2_stream *s =
+ (grpc_chttp2_stream *)grpc_chttp2_stream_map_delete(&t->stream_map, id);
GPR_ASSERT(s);
if (t->incoming_stream == s) {
t->incoming_stream = NULL;
@@ -1969,6 +2008,21 @@ static grpc_error *removal_error(grpc_error *extra_error, grpc_chttp2_stream *s,
return error;
}
+static void flush_write_list(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s, grpc_chttp2_write_cb **list,
+ grpc_error *error) {
+ while (*list) {
+ grpc_chttp2_write_cb *cb = *list;
+ *list = cb->next;
+ grpc_chttp2_complete_closure_step(exec_ctx, t, s, &cb->closure,
+ GRPC_ERROR_REF(error),
+ "on_write_finished_cb");
+ cb->next = t->write_cb_pool;
+ t->write_cb_pool = cb;
+ }
+ GRPC_ERROR_UNREF(error);
+}
+
void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s, grpc_error *error) {
@@ -1988,16 +2042,9 @@ void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_complete_closure_step(
exec_ctx, t, s, &s->fetching_send_message_finished, GRPC_ERROR_REF(error),
"fetching_send_message_finished");
- while (s->on_write_finished_cbs) {
- grpc_chttp2_write_cb *cb = s->on_write_finished_cbs;
- s->on_write_finished_cbs = cb->next;
- grpc_chttp2_complete_closure_step(exec_ctx, t, s, &cb->closure,
- GRPC_ERROR_REF(error),
- "on_write_finished_cb");
- cb->next = t->write_cb_pool;
- t->write_cb_pool = cb;
- }
- GRPC_ERROR_UNREF(error);
+ flush_write_list(exec_ctx, t, s, &s->on_write_finished_cbs,
+ GRPC_ERROR_REF(error));
+ flush_write_list(exec_ctx, t, s, &s->on_flow_controlled_cbs, error);
}
void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx,
@@ -2241,13 +2288,11 @@ void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx,
case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED:
break;
case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY:
- grpc_chttp2_become_writable(exec_ctx, t, s,
- GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED,
+ grpc_chttp2_become_writable(exec_ctx, t, s, true,
"immediate stream flowctl");
break;
case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE:
- grpc_chttp2_become_writable(exec_ctx, t, s,
- GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK,
+ grpc_chttp2_become_writable(exec_ctx, t, s, false,
"queue stream flowctl");
break;
}
@@ -2360,9 +2405,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
if (t->flow_control.initial_window_update > 0) {
grpc_chttp2_stream *s;
while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) {
- grpc_chttp2_become_writable(
- exec_ctx, t, s, GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED,
- "unstalled");
+ grpc_chttp2_become_writable(exec_ctx, t, s, true, "unstalled");
}
}
t->flow_control.initial_window_update = 0;
@@ -2959,6 +3002,8 @@ static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream),
chttp2_get_peer,
chttp2_get_endpoint};
+static const grpc_transport_vtable *get_vtable(void) { return &vtable; }
+
grpc_transport *grpc_create_chttp2_transport(
grpc_exec_ctx *exec_ctx, const grpc_channel_args *channel_args,
grpc_endpoint *ep, int is_client) {
diff --git a/src/core/ext/transport/chttp2/transport/frame_window_update.c b/src/core/ext/transport/chttp2/transport/frame_window_update.c
index 65f3b01d77..164a97d039 100644
--- a/src/core/ext/transport/chttp2/transport/frame_window_update.c
+++ b/src/core/ext/transport/chttp2/transport/frame_window_update.c
@@ -98,9 +98,8 @@ grpc_error *grpc_chttp2_window_update_parser_parse(
grpc_chttp2_flowctl_recv_stream_update(
&t->flow_control, &s->flow_control, received_update);
if (grpc_chttp2_list_remove_stalled_by_stream(t, s)) {
- grpc_chttp2_become_writable(
- exec_ctx, t, s, GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED,
- "stream.read_flow_control");
+ grpc_chttp2_become_writable(exec_ctx, t, s, true,
+ "stream.read_flow_control");
}
}
} else {
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 3c41a8958f..f0962a6b0e 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -262,6 +262,10 @@ struct grpc_chttp2_transport {
/** write execution state of the transport */
grpc_chttp2_write_state write_state;
+ /** is this the first write in a series of writes?
+ set when we initiate writing from idle, cleared when we
+ initiate writing from writing+more */
+ bool is_first_write_in_batch;
/** is the transport destroying itself? */
uint8_t destroying;
@@ -483,6 +487,7 @@ struct grpc_chttp2_stream {
grpc_slice fetching_slice;
int64_t next_message_end_offset;
int64_t flow_controlled_bytes_written;
+ int64_t flow_controlled_bytes_flowed;
grpc_closure complete_fetch_locked;
grpc_closure *fetching_send_message_finished;
@@ -553,6 +558,7 @@ struct grpc_chttp2_stream {
grpc_slice_buffer flow_controlled_buffer;
+ grpc_chttp2_write_cb *on_flow_controlled_cbs;
grpc_chttp2_write_cb *on_write_finished_cbs;
grpc_chttp2_write_cb *finish_after_write;
size_t sending_bytes;
@@ -593,10 +599,13 @@ struct grpc_chttp2_stream {
void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t, const char *reason);
-typedef enum {
- GRPC_CHTTP2_NOTHING_TO_WRITE,
- GRPC_CHTTP2_PARTIAL_WRITE,
- GRPC_CHTTP2_FULL_WRITE,
+typedef struct {
+ /** are we writing? */
+ bool writing;
+ /** if writing: was it a complete flush (false) or a partial flush (true) */
+ bool partial;
+ /** did we queue any completions as part of beginning the write */
+ bool early_results_scheduled;
} grpc_chttp2_begin_write_result;
grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
@@ -838,22 +847,12 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
void grpc_chttp2_add_ping_strike(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t);
-typedef enum {
- /* don't initiate a transport write, but piggyback on the next one */
- GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK,
- /* initiate a covered write */
- GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED,
- /* initiate an uncovered write */
- GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED
-} grpc_chttp2_stream_write_type;
-
/** add a ref to the stream and add it to the writable list;
ref will be dropped in writing.c */
void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s,
- grpc_chttp2_stream_write_type type,
- const char *reason);
+ bool also_initiate_write, const char *reason);
void grpc_chttp2_cancel_stream(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t, grpc_chttp2_stream *s,
diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c
index 18d163ee98..608c94aa30 100644
--- a/src/core/ext/transport/chttp2/transport/parsing.c
+++ b/src/core/ext/transport/chttp2/transport/parsing.c
@@ -106,7 +106,8 @@ grpc_error *grpc_chttp2_perform_read(grpc_exec_ctx *exec_ctx,
return err;
}
++cur;
- ++t->deframe_state;
+ t->deframe_state =
+ (grpc_chttp2_deframe_transport_state)(1 + (int)t->deframe_state);
}
if (cur == end) {
return GRPC_ERROR_NONE;
@@ -402,7 +403,7 @@ static void free_timeout(void *p) { gpr_free(p); }
static void on_initial_header(grpc_exec_ctx *exec_ctx, void *tp,
grpc_mdelem md) {
- grpc_chttp2_transport *t = tp;
+ grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp;
grpc_chttp2_stream *s = t->incoming_stream;
GPR_TIMER_BEGIN("on_initial_header", 0);
@@ -426,11 +427,12 @@ static void on_initial_header(grpc_exec_ctx *exec_ctx, void *tp,
}
if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_GRPC_TIMEOUT)) {
- gpr_timespec *cached_timeout = grpc_mdelem_get_user_data(md, free_timeout);
+ gpr_timespec *cached_timeout =
+ (gpr_timespec *)grpc_mdelem_get_user_data(md, free_timeout);
gpr_timespec timeout;
if (cached_timeout == NULL) {
/* not already parsed: parse it now, and store the result away */
- cached_timeout = gpr_malloc(sizeof(gpr_timespec));
+ cached_timeout = (gpr_timespec *)gpr_malloc(sizeof(gpr_timespec));
if (!grpc_http2_decode_timeout(GRPC_MDVALUE(md), cached_timeout)) {
char *val = grpc_slice_to_c_string(GRPC_MDVALUE(md));
gpr_log(GPR_ERROR, "Ignoring bad timeout value '%s'", val);
@@ -482,7 +484,7 @@ static void on_initial_header(grpc_exec_ctx *exec_ctx, void *tp,
static void on_trailing_header(grpc_exec_ctx *exec_ctx, void *tp,
grpc_mdelem md) {
- grpc_chttp2_transport *t = tp;
+ grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp;
grpc_chttp2_stream *s = t->incoming_stream;
GPR_TIMER_BEGIN("on_trailing_header", 0);
diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c
index 711938b278..8abddd778c 100644
--- a/src/core/ext/transport/chttp2/transport/writing.c
+++ b/src/core/ext/transport/chttp2/transport/writing.c
@@ -121,15 +121,18 @@ static void maybe_initiate_ping(grpc_exec_ctx *exec_ctx,
(t->ping_state.pings_before_data_required != 0);
}
-static void update_list(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+static bool update_list(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_stream *s, int64_t send_bytes,
- grpc_chttp2_write_cb **list, grpc_error *error) {
+ grpc_chttp2_write_cb **list, int64_t *ctr,
+ grpc_error *error) {
+ bool sched_any = false;
grpc_chttp2_write_cb *cb = *list;
*list = NULL;
- s->flow_controlled_bytes_written += send_bytes;
+ *ctr += send_bytes;
while (cb) {
grpc_chttp2_write_cb *next = cb->next;
- if (cb->call_at_byte <= s->flow_controlled_bytes_written) {
+ if (cb->call_at_byte <= *ctr) {
+ sched_any = true;
finish_write_cb(exec_ctx, t, s, cb, GRPC_ERROR_REF(error));
} else {
add_to_write_list(list, cb);
@@ -137,6 +140,7 @@ static void update_list(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
cb = next;
}
GRPC_ERROR_UNREF(error);
+ return sched_any;
}
static bool stream_ref_if_not_destroyed(gpr_refcount *r) {
@@ -192,13 +196,13 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
}
}
- bool partial_write = false;
+ grpc_chttp2_begin_write_result result = {false, false, false};
/* for each grpc_chttp2_stream that's become writable, frame it's data
(according to available window sizes) and add to the output buffer */
while (true) {
if (t->outbuf.length > target_write_size(t)) {
- partial_write = true;
+ result.partial = true;
break;
}
@@ -242,7 +246,6 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
.stats = &s->stats.outgoing};
grpc_chttp2_encode_header(exec_ctx, &t->hpack_compressor, NULL, 0,
s->send_initial_metadata, &hopt, &t->outbuf);
- now_writing = true;
t->ping_state.pings_before_data_required =
t->ping_policy.max_pings_without_data;
if (!t->is_client) {
@@ -269,6 +272,10 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
s->send_initial_metadata = NULL;
s->sent_initial_metadata = true;
sent_initial_metadata = true;
+ result.early_results_scheduled = true;
+ grpc_chttp2_complete_closure_step(
+ exec_ctx, t, s, &s->send_initial_metadata_finished, GRPC_ERROR_NONE,
+ "send_initial_metadata_finished");
}
/* send any window updates */
uint32_t stream_announce = grpc_chttp2_flowctl_maybe_send_stream_update(
@@ -302,6 +309,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
if (max_outgoing > 0) {
bool is_last_data_frame = false;
bool is_last_frame = false;
+ size_t sending_bytes_before = s->sending_bytes;
if (s->stream_compression_send_enabled) {
while ((s->flow_controlled_buffer.length > 0 ||
s->compressed_data_buffer->length > 0) &&
@@ -369,6 +377,11 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
&s->stats.outgoing));
}
}
+ result.early_results_scheduled |=
+ update_list(exec_ctx, t, s,
+ (int64_t)(s->sending_bytes - sending_bytes_before),
+ &s->on_flow_controlled_cbs,
+ &s->flow_controlled_bytes_flowed, GRPC_ERROR_NONE);
now_writing = true;
if (s->flow_controlled_buffer.length > 0 ||
(s->stream_compression_send_enabled &&
@@ -420,6 +433,10 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
s->id, GRPC_HTTP2_NO_ERROR, &s->stats.outgoing));
}
now_writing = true;
+ result.early_results_scheduled = true;
+ grpc_chttp2_complete_closure_step(
+ exec_ctx, t, s, &s->send_trailing_metadata_finished,
+ GRPC_ERROR_NONE, "send_trailing_metadata_finished");
}
}
@@ -461,9 +478,8 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
GPR_TIMER_END("grpc_chttp2_begin_write", 0);
- return t->outbuf.count > 0 ? (partial_write ? GRPC_CHTTP2_PARTIAL_WRITE
- : GRPC_CHTTP2_FULL_WRITE)
- : GRPC_CHTTP2_NOTHING_TO_WRITE;
+ result.writing = t->outbuf.count > 0;
+ return result;
}
void grpc_chttp2_end_write(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
@@ -472,20 +488,13 @@ void grpc_chttp2_end_write(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_stream *s;
while (grpc_chttp2_list_pop_writing_stream(t, &s)) {
- if (s->sent_initial_metadata) {
- grpc_chttp2_complete_closure_step(
- exec_ctx, t, s, &s->send_initial_metadata_finished,
- GRPC_ERROR_REF(error), "send_initial_metadata_finished");
- }
if (s->sending_bytes != 0) {
update_list(exec_ctx, t, s, (int64_t)s->sending_bytes,
- &s->on_write_finished_cbs, GRPC_ERROR_REF(error));
+ &s->on_write_finished_cbs, &s->flow_controlled_bytes_written,
+ GRPC_ERROR_REF(error));
s->sending_bytes = 0;
}
if (s->sent_trailing_metadata) {
- grpc_chttp2_complete_closure_step(
- exec_ctx, t, s, &s->send_trailing_metadata_finished,
- GRPC_ERROR_REF(error), "send_trailing_metadata_finished");
grpc_chttp2_mark_stream_closed(exec_ctx, t, s, !t->is_client, 1,
GRPC_ERROR_REF(error));
}
diff --git a/src/core/lib/http/httpcli.c b/src/core/lib/http/httpcli.c
index 77af7b7c08..84cc39604c 100644
--- a/src/core/lib/http/httpcli.c
+++ b/src/core/lib/http/httpcli.c
@@ -130,7 +130,7 @@ static void do_read(grpc_exec_ctx *exec_ctx, internal_request *req) {
static void on_read(grpc_exec_ctx *exec_ctx, void *user_data,
grpc_error *error) {
- internal_request *req = user_data;
+ internal_request *req = (internal_request *)user_data;
size_t i;
for (i = 0; i < req->incoming.count; i++) {
@@ -159,7 +159,7 @@ static void on_written(grpc_exec_ctx *exec_ctx, internal_request *req) {
}
static void done_write(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
- internal_request *req = arg;
+ internal_request *req = (internal_request *)arg;
if (error == GRPC_ERROR_NONE) {
on_written(exec_ctx, req);
} else {
@@ -175,7 +175,7 @@ static void start_write(grpc_exec_ctx *exec_ctx, internal_request *req) {
static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
grpc_endpoint *ep) {
- internal_request *req = arg;
+ internal_request *req = (internal_request *)arg;
if (!ep) {
next_address(exec_ctx, req, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
@@ -189,7 +189,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
static void on_connected(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
- internal_request *req = arg;
+ internal_request *req = (internal_request *)arg;
if (!req->ep) {
next_address(exec_ctx, req, GRPC_ERROR_REF(error));
@@ -226,7 +226,7 @@ static void next_address(grpc_exec_ctx *exec_ctx, internal_request *req,
}
static void on_resolved(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
- internal_request *req = arg;
+ internal_request *req = (internal_request *)arg;
if (error != GRPC_ERROR_NONE) {
finish(exec_ctx, req, GRPC_ERROR_REF(error));
return;
@@ -243,7 +243,8 @@ static void internal_request_begin(grpc_exec_ctx *exec_ctx,
gpr_timespec deadline, grpc_closure *on_done,
grpc_httpcli_response *response,
const char *name, grpc_slice request_text) {
- internal_request *req = gpr_malloc(sizeof(internal_request));
+ internal_request *req =
+ (internal_request *)gpr_malloc(sizeof(internal_request));
memset(req, 0, sizeof(*req));
req->request_text = request_text;
grpc_http_parser_init(&req->parser, GRPC_HTTP_RESPONSE, response);
diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c
index 9b66987b68..0e496829f6 100644
--- a/src/core/lib/iomgr/combiner.c
+++ b/src/core/lib/iomgr/combiner.c
@@ -80,7 +80,8 @@ grpc_combiner *grpc_combiner_create(void) {
gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED);
gpr_mpscq_init(&lock->queue);
grpc_closure_list_init(&lock->final_list);
- GRPC_CLOSURE_INIT(&lock->offload, offload, lock, grpc_executor_scheduler);
+ GRPC_CLOSURE_INIT(&lock->offload, offload, lock,
+ grpc_executor_scheduler(GRPC_EXECUTOR_SHORT));
GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p create", lock));
return lock;
}
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c
index fbd265f3ce..ebf22a8d9d 100644
--- a/src/core/lib/iomgr/ev_poll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_posix.c
@@ -988,6 +988,10 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
r = grpc_poll_function(pfds, pfd_count, timeout);
GRPC_SCHEDULING_END_BLOCKING_REGION;
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG, "%p poll=%d", pollset, r);
+ }
+
if (r < 0) {
if (errno != EINTR) {
work_combine_error(&error, GRPC_OS_ERROR(errno, "poll"));
@@ -1008,6 +1012,9 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
} else {
if (pfds[0].revents & POLLIN_CHECK) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG, "%p: got_wakeup", pollset);
+ }
work_combine_error(
&error, grpc_wakeup_fd_consume_wakeup(&worker.wakeup_fd->fd));
}
@@ -1015,6 +1022,11 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
if (watchers[i].fd == NULL) {
fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL);
} else {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG, "%p got_event: %d r:%d w:%d [%d]", pollset,
+ pfds[i].fd, (pfds[i].revents & POLLIN_CHECK) != 0,
+ (pfds[i].revents & POLLOUT_CHECK) != 0, pfds[i].revents);
+ }
fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN_CHECK,
pfds[i].revents & POLLOUT_CHECK, pollset);
}
diff --git a/src/core/lib/iomgr/executor.c b/src/core/lib/iomgr/executor.c
index 7621a7fe75..8496ce23c0 100644
--- a/src/core/lib/iomgr/executor.c
+++ b/src/core/lib/iomgr/executor.c
@@ -39,6 +39,7 @@ typedef struct {
grpc_closure_list elems;
size_t depth;
bool shutdown;
+ bool queued_long_job;
gpr_thd_id id;
} thread_state;
@@ -49,6 +50,9 @@ static gpr_spinlock g_adding_thread_lock = GPR_SPINLOCK_STATIC_INITIALIZER;
GPR_TLS_DECL(g_this_thread_state);
+static grpc_tracer_flag executor_trace =
+ GRPC_TRACER_INITIALIZER(false, "executor");
+
static void executor_thread(void *arg);
static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) {
@@ -58,6 +62,14 @@ static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) {
while (c != NULL) {
grpc_closure *next = c->next_data.next;
grpc_error *error = c->error_data.error;
+ if (GRPC_TRACER_ON(executor_trace)) {
+#ifndef NDEBUG
+ gpr_log(GPR_DEBUG, "EXECUTOR: run %p [created by %s:%d]", c,
+ c->file_created, c->line_created);
+#else
+ gpr_log(GPR_DEBUG, "EXECUTOR: run %p", c);
+#endif
+ }
#ifndef NDEBUG
c->scheduled = false;
#endif
@@ -65,6 +77,7 @@ static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) {
GRPC_ERROR_UNREF(error);
c = next;
n++;
+ grpc_exec_ctx_flush(exec_ctx);
}
return n;
@@ -119,6 +132,7 @@ void grpc_executor_set_threading(grpc_exec_ctx *exec_ctx, bool threading) {
}
void grpc_executor_init(grpc_exec_ctx *exec_ctx) {
+ grpc_register_tracer(&executor_trace);
gpr_atm_no_barrier_store(&g_cur_threads, 0);
grpc_executor_set_threading(exec_ctx, true);
}
@@ -136,60 +150,134 @@ static void executor_thread(void *arg) {
size_t subtract_depth = 0;
for (;;) {
+ if (GRPC_TRACER_ON(executor_trace)) {
+ gpr_log(GPR_DEBUG, "EXECUTOR[%d]: step (sub_depth=%" PRIdPTR ")",
+ (int)(ts - g_thread_state), subtract_depth);
+ }
gpr_mu_lock(&ts->mu);
ts->depth -= subtract_depth;
while (grpc_closure_list_empty(ts->elems) && !ts->shutdown) {
+ ts->queued_long_job = false;
gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_REALTIME));
}
if (ts->shutdown) {
+ if (GRPC_TRACER_ON(executor_trace)) {
+ gpr_log(GPR_DEBUG, "EXECUTOR[%d]: shutdown",
+ (int)(ts - g_thread_state));
+ }
gpr_mu_unlock(&ts->mu);
break;
}
grpc_closure_list exec = ts->elems;
ts->elems = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
gpr_mu_unlock(&ts->mu);
+ if (GRPC_TRACER_ON(executor_trace)) {
+ gpr_log(GPR_DEBUG, "EXECUTOR[%d]: execute", (int)(ts - g_thread_state));
+ }
subtract_depth = run_closures(&exec_ctx, exec);
- grpc_exec_ctx_flush(&exec_ctx);
}
grpc_exec_ctx_finish(&exec_ctx);
}
static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
- grpc_error *error) {
- size_t cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads);
- if (cur_thread_count == 0) {
- grpc_closure_list_append(&exec_ctx->closure_list, closure, error);
- return;
- }
- thread_state *ts = (thread_state *)gpr_tls_get(&g_this_thread_state);
- if (ts == NULL) {
- ts = &g_thread_state[GPR_HASH_POINTER(exec_ctx, cur_thread_count)];
- }
- gpr_mu_lock(&ts->mu);
- if (grpc_closure_list_empty(ts->elems)) {
- gpr_cv_signal(&ts->cv);
- }
- grpc_closure_list_append(&ts->elems, closure, error);
- ts->depth++;
- bool try_new_thread = ts->depth > MAX_DEPTH &&
- cur_thread_count < g_max_threads && !ts->shutdown;
- gpr_mu_unlock(&ts->mu);
- if (try_new_thread && gpr_spinlock_trylock(&g_adding_thread_lock)) {
- cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads);
- if (cur_thread_count < g_max_threads) {
- gpr_atm_no_barrier_store(&g_cur_threads, cur_thread_count + 1);
-
- gpr_thd_options opt = gpr_thd_options_default();
- gpr_thd_options_set_joinable(&opt);
- gpr_thd_new(&g_thread_state[cur_thread_count].id, executor_thread,
- &g_thread_state[cur_thread_count], &opt);
+ grpc_error *error, bool is_short) {
+ bool retry_push;
+ do {
+ retry_push = false;
+ size_t cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads);
+ if (cur_thread_count == 0) {
+ if (GRPC_TRACER_ON(executor_trace)) {
+#ifndef NDEBUG
+ gpr_log(GPR_DEBUG, "EXECUTOR: schedule %p (created %s:%d) inline",
+ closure, closure->file_created, closure->line_created);
+#else
+ gpr_log(GPR_DEBUG, "EXECUTOR: schedule %p inline", closure);
+#endif
+ }
+ grpc_closure_list_append(&exec_ctx->closure_list, closure, error);
+ return;
}
- gpr_spinlock_unlock(&g_adding_thread_lock);
- }
+ thread_state *ts = (thread_state *)gpr_tls_get(&g_this_thread_state);
+ if (ts == NULL) {
+ ts = &g_thread_state[GPR_HASH_POINTER(exec_ctx, cur_thread_count)];
+ }
+ thread_state *orig_ts = ts;
+
+ bool try_new_thread;
+ for (;;) {
+ if (GRPC_TRACER_ON(executor_trace)) {
+#ifndef NDEBUG
+ gpr_log(
+ GPR_DEBUG,
+ "EXECUTOR: try to schedule %p (%s) (created %s:%d) to thread %d",
+ closure, is_short ? "short" : "long", closure->file_created,
+ closure->line_created, (int)(ts - g_thread_state));
+#else
+ gpr_log(GPR_DEBUG, "EXECUTOR: try to schedule %p (%s) to thread %d",
+ closure, is_short ? "short" : "long",
+ (int)(ts - g_thread_state));
+#endif
+ }
+ gpr_mu_lock(&ts->mu);
+ if (ts->queued_long_job) {
+ gpr_mu_unlock(&ts->mu);
+ size_t idx = (size_t)(ts - g_thread_state);
+ ts = &g_thread_state[(idx + 1) % cur_thread_count];
+ if (ts == orig_ts) {
+ retry_push = true;
+ try_new_thread = true;
+ break;
+ }
+ continue;
+ }
+ if (grpc_closure_list_empty(ts->elems)) {
+ gpr_cv_signal(&ts->cv);
+ }
+ grpc_closure_list_append(&ts->elems, closure, error);
+ ts->depth++;
+ try_new_thread = ts->depth > MAX_DEPTH &&
+ cur_thread_count < g_max_threads && !ts->shutdown;
+ if (!is_short) ts->queued_long_job = true;
+ gpr_mu_unlock(&ts->mu);
+ break;
+ }
+ if (try_new_thread && gpr_spinlock_trylock(&g_adding_thread_lock)) {
+ cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads);
+ if (cur_thread_count < g_max_threads) {
+ gpr_atm_no_barrier_store(&g_cur_threads, cur_thread_count + 1);
+
+ gpr_thd_options opt = gpr_thd_options_default();
+ gpr_thd_options_set_joinable(&opt);
+ gpr_thd_new(&g_thread_state[cur_thread_count].id, executor_thread,
+ &g_thread_state[cur_thread_count], &opt);
+ }
+ gpr_spinlock_unlock(&g_adding_thread_lock);
+ }
+ } while (retry_push);
}
-static const grpc_closure_scheduler_vtable executor_vtable = {
- executor_push, executor_push, "executor"};
-static grpc_closure_scheduler executor_scheduler = {&executor_vtable};
-grpc_closure_scheduler *grpc_executor_scheduler = &executor_scheduler;
+static void executor_push_short(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
+ grpc_error *error) {
+ executor_push(exec_ctx, closure, error, true);
+}
+
+static void executor_push_long(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
+ grpc_error *error) {
+ executor_push(exec_ctx, closure, error, false);
+}
+
+static const grpc_closure_scheduler_vtable executor_vtable_short = {
+ executor_push_short, executor_push_short, "executor"};
+static grpc_closure_scheduler executor_scheduler_short = {
+ &executor_vtable_short};
+
+static const grpc_closure_scheduler_vtable executor_vtable_long = {
+ executor_push_long, executor_push_long, "executor"};
+static grpc_closure_scheduler executor_scheduler_long = {&executor_vtable_long};
+
+grpc_closure_scheduler *grpc_executor_scheduler(
+ grpc_executor_job_length length) {
+ return length == GRPC_EXECUTOR_SHORT ? &executor_scheduler_short
+ : &executor_scheduler_long;
+}
diff --git a/src/core/lib/iomgr/executor.h b/src/core/lib/iomgr/executor.h
index c3382a0a12..0412c02790 100644
--- a/src/core/lib/iomgr/executor.h
+++ b/src/core/lib/iomgr/executor.h
@@ -21,6 +21,11 @@
#include "src/core/lib/iomgr/closure.h"
+typedef enum {
+ GRPC_EXECUTOR_SHORT,
+ GRPC_EXECUTOR_LONG
+} grpc_executor_job_length;
+
/** Initialize the global executor.
*
* This mechanism is meant to outsource work (grpc_closure instances) to a
@@ -28,7 +33,7 @@
* non-blocking solution available. */
void grpc_executor_init(grpc_exec_ctx *exec_ctx);
-extern grpc_closure_scheduler *grpc_executor_scheduler;
+grpc_closure_scheduler *grpc_executor_scheduler(grpc_executor_job_length);
/** Shutdown the executor, running all pending work as part of the call */
void grpc_executor_shutdown(grpc_exec_ctx *exec_ctx);
diff --git a/src/core/lib/iomgr/resolve_address_posix.c b/src/core/lib/iomgr/resolve_address_posix.c
index 35dedc23de..2bb00e5eed 100644
--- a/src/core/lib/iomgr/resolve_address_posix.c
+++ b/src/core/lib/iomgr/resolve_address_posix.c
@@ -176,7 +176,7 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
grpc_resolved_addresses **addrs) {
request *r = gpr_malloc(sizeof(request));
GRPC_CLOSURE_INIT(&r->request_closure, do_request_thread, r,
- grpc_executor_scheduler);
+ grpc_executor_scheduler(GRPC_EXECUTOR_SHORT));
r->name = gpr_strdup(name);
r->default_port = gpr_strdup(default_port);
r->on_done = on_done;
diff --git a/src/core/lib/iomgr/resolve_address_windows.c b/src/core/lib/iomgr/resolve_address_windows.c
index 45cfd7248d..0cb0029f4e 100644
--- a/src/core/lib/iomgr/resolve_address_windows.c
+++ b/src/core/lib/iomgr/resolve_address_windows.c
@@ -159,7 +159,7 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
grpc_resolved_addresses **addresses) {
request *r = gpr_malloc(sizeof(request));
GRPC_CLOSURE_INIT(&r->request_closure, do_request_thread, r,
- grpc_executor_scheduler);
+ grpc_executor_scheduler(GRPC_EXECUTOR_SHORT));
r->name = gpr_strdup(name);
r->default_port = gpr_strdup(default_port);
r->on_done = on_done;
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index 6c620ca245..b15b6d8746 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -43,6 +43,7 @@
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/ev_posix.h"
+#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
@@ -90,8 +91,8 @@ typedef struct {
grpc_closure *release_fd_cb;
int *release_fd;
- grpc_closure read_closure;
- grpc_closure write_closure;
+ grpc_closure read_done_closure;
+ grpc_closure write_done_closure;
char *peer_string;
@@ -99,6 +100,145 @@ typedef struct {
grpc_resource_user_slice_allocator slice_allocator;
} grpc_tcp;
+typedef struct backup_poller {
+ gpr_mu *pollset_mu;
+ grpc_closure run_poller;
+} backup_poller;
+
+#define BACKUP_POLLER_POLLSET(b) ((grpc_pollset *)((b) + 1))
+
+static gpr_atm g_uncovered_notifications_pending;
+static gpr_atm g_backup_poller; /* backup_poller* */
+
+static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
+ grpc_error *error);
+static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
+ grpc_error *error);
+static void tcp_drop_uncovered_then_handle_write(grpc_exec_ctx *exec_ctx,
+ void *arg /* grpc_tcp */,
+ grpc_error *error);
+
+static void done_poller(grpc_exec_ctx *exec_ctx, void *bp,
+ grpc_error *error_ignored) {
+ backup_poller *p = (backup_poller *)bp;
+ if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p destroy", p);
+ }
+ grpc_pollset_destroy(exec_ctx, BACKUP_POLLER_POLLSET(p));
+ gpr_free(p);
+}
+
+static void run_poller(grpc_exec_ctx *exec_ctx, void *bp,
+ grpc_error *error_ignored) {
+ backup_poller *p = (backup_poller *)bp;
+ if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p run", p);
+ }
+ gpr_mu_lock(p->pollset_mu);
+ gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
+ gpr_timespec deadline =
+ gpr_time_add(now, gpr_time_from_seconds(10, GPR_TIMESPAN));
+ GRPC_LOG_IF_ERROR("backup_poller:pollset_work",
+ grpc_pollset_work(exec_ctx, BACKUP_POLLER_POLLSET(p), NULL,
+ now, deadline));
+ gpr_mu_unlock(p->pollset_mu);
+ /* last "uncovered" notification is the ref that keeps us polling, if we get
+ * there try a cas to release it */
+ if (gpr_atm_no_barrier_load(&g_uncovered_notifications_pending) == 1 &&
+ gpr_atm_full_cas(&g_uncovered_notifications_pending, 1, 0)) {
+ gpr_mu_lock(p->pollset_mu);
+ bool cas_ok = gpr_atm_no_barrier_cas(&g_backup_poller, (gpr_atm)p, 0);
+ if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p done cas_ok=%d", p, cas_ok);
+ }
+ gpr_mu_unlock(p->pollset_mu);
+ if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p shutdown", p);
+ }
+ grpc_pollset_shutdown(exec_ctx, BACKUP_POLLER_POLLSET(p),
+ GRPC_CLOSURE_INIT(&p->run_poller, done_poller, p,
+ grpc_schedule_on_exec_ctx));
+ } else {
+ if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p reschedule", p);
+ }
+ GRPC_CLOSURE_SCHED(exec_ctx, &p->run_poller, GRPC_ERROR_NONE);
+ }
+}
+
+static void drop_uncovered(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
+ backup_poller *p = (backup_poller *)gpr_atm_no_barrier_load(&g_backup_poller);
+ gpr_atm old_count =
+ gpr_atm_no_barrier_fetch_add(&g_uncovered_notifications_pending, -1);
+ if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p uncover cnt %d->%d", p, (int)old_count,
+ (int)old_count - 1);
+ }
+ GPR_ASSERT(old_count != 1);
+}
+
+static void cover_self(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
+ backup_poller *p;
+ gpr_atm old_count =
+ gpr_atm_no_barrier_fetch_add(&g_uncovered_notifications_pending, 2);
+ if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ gpr_log(GPR_DEBUG, "BACKUP_POLLER: cover cnt %d->%d", (int)old_count,
+ 2 + (int)old_count);
+ }
+ if (old_count == 0) {
+ p = (backup_poller *)gpr_malloc(sizeof(*p) + grpc_pollset_size());
+ if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p create", p);
+ }
+ grpc_pollset_init(BACKUP_POLLER_POLLSET(p), &p->pollset_mu);
+ gpr_atm_no_barrier_store(&g_backup_poller, (gpr_atm)p);
+ GRPC_CLOSURE_SCHED(
+ exec_ctx,
+ GRPC_CLOSURE_INIT(&p->run_poller, run_poller, p,
+ grpc_executor_scheduler(GRPC_EXECUTOR_LONG)),
+ GRPC_ERROR_NONE);
+ } else {
+ p = (backup_poller *)gpr_atm_no_barrier_load(&g_backup_poller);
+ GPR_ASSERT(p != NULL);
+ }
+ if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p add %p", p, tcp);
+ }
+ grpc_pollset_add_fd(exec_ctx, BACKUP_POLLER_POLLSET(p), tcp->em_fd);
+ if (old_count != 0) {
+ drop_uncovered(exec_ctx, tcp);
+ }
+}
+
+static void notify_on_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
+ if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ gpr_log(GPR_DEBUG, "TCP:%p notify_on_read", tcp);
+ }
+ GRPC_CLOSURE_INIT(&tcp->read_done_closure, tcp_handle_read, tcp,
+ grpc_schedule_on_exec_ctx);
+ grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_done_closure);
+}
+
+static void notify_on_write(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
+ if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ gpr_log(GPR_DEBUG, "TCP:%p notify_on_write", tcp);
+ }
+ cover_self(exec_ctx, tcp);
+ GRPC_CLOSURE_INIT(&tcp->write_done_closure,
+ tcp_drop_uncovered_then_handle_write, tcp,
+ grpc_schedule_on_exec_ctx);
+ grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_done_closure);
+}
+
+static void tcp_drop_uncovered_then_handle_write(grpc_exec_ctx *exec_ctx,
+ void *arg, grpc_error *error) {
+ if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ gpr_log(GPR_DEBUG, "TCP:%p got_write: %s", arg, grpc_error_string(error));
+ }
+ drop_uncovered(exec_ctx, (grpc_tcp *)arg);
+ tcp_handle_write(exec_ctx, arg, error);
+}
+
static void add_to_estimate(grpc_tcp *tcp, size_t bytes) {
tcp->bytes_read_this_round += (double)bytes;
}
@@ -214,6 +354,7 @@ static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp,
grpc_closure *cb = tcp->read_cb;
if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ gpr_log(GPR_DEBUG, "TCP:%p call_cb %p %p:%p", tcp, cb, cb->cb, cb->cb_arg);
size_t i;
const char *str = grpc_error_string(error);
gpr_log(GPR_DEBUG, "read: error=%s", str);
@@ -268,7 +409,7 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
if (errno == EAGAIN) {
finish_estimate(tcp);
/* We've consumed the edge, request a new one */
- grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure);
+ notify_on_read(exec_ctx, tcp);
} else {
grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
tcp->incoming_buffer);
@@ -303,7 +444,11 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
static void tcp_read_allocation_done(grpc_exec_ctx *exec_ctx, void *tcpp,
grpc_error *error) {
- grpc_tcp *tcp = tcpp;
+ grpc_tcp *tcp = (grpc_tcp *)tcpp;
+ if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ gpr_log(GPR_DEBUG, "TCP:%p read_allocation_done: %s", tcp,
+ grpc_error_string(error));
+ }
if (error != GRPC_ERROR_NONE) {
grpc_slice_buffer_reset_and_unref_internal(exec_ctx, tcp->incoming_buffer);
grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
@@ -319,9 +464,15 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
size_t target_read_size = get_target_read_size(tcp);
if (tcp->incoming_buffer->length < target_read_size &&
tcp->incoming_buffer->count < MAX_READ_IOVEC) {
+ if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ gpr_log(GPR_DEBUG, "TCP:%p alloc_slices", tcp);
+ }
grpc_resource_user_alloc_slices(exec_ctx, &tcp->slice_allocator,
target_read_size, 1, tcp->incoming_buffer);
} else {
+ if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ gpr_log(GPR_DEBUG, "TCP:%p do_read", tcp);
+ }
tcp_do_read(exec_ctx, tcp);
}
}
@@ -330,6 +481,9 @@ static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
grpc_error *error) {
grpc_tcp *tcp = (grpc_tcp *)arg;
GPR_ASSERT(!tcp->finished_edge);
+ if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ gpr_log(GPR_DEBUG, "TCP:%p got_read: %s", tcp, grpc_error_string(error));
+ }
if (error != GRPC_ERROR_NONE) {
grpc_slice_buffer_reset_and_unref_internal(exec_ctx, tcp->incoming_buffer);
@@ -353,9 +507,9 @@ static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
TCP_REF(tcp, "read");
if (tcp->finished_edge) {
tcp->finished_edge = false;
- grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure);
+ notify_on_read(exec_ctx, tcp);
} else {
- GRPC_CLOSURE_SCHED(exec_ctx, &tcp->read_closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, &tcp->read_done_closure, GRPC_ERROR_NONE);
}
}
@@ -465,7 +619,7 @@ static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
if (GRPC_TRACER_ON(grpc_tcp_trace)) {
gpr_log(GPR_DEBUG, "write: delayed");
}
- grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure);
+ notify_on_write(exec_ctx, tcp);
} else {
cb = tcp->write_cb;
tcp->write_cb = NULL;
@@ -518,7 +672,7 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
if (GRPC_TRACER_ON(grpc_tcp_trace)) {
gpr_log(GPR_DEBUG, "write: delayed");
}
- grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure);
+ notify_on_write(exec_ctx, tcp);
} else {
if (GRPC_TRACER_ON(grpc_tcp_trace)) {
const char *str = grpc_error_string(error);
@@ -595,7 +749,7 @@ grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_fd *em_fd,
strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
resource_quota = grpc_resource_quota_ref_internal(
- channel_args->args[i].value.pointer.p);
+ (grpc_resource_quota *)channel_args->args[i].value.pointer.p);
}
}
}
@@ -624,10 +778,6 @@ grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_fd *em_fd,
gpr_ref_init(&tcp->refcount, 1);
gpr_atm_no_barrier_store(&tcp->shutdown_count, 0);
tcp->em_fd = em_fd;
- GRPC_CLOSURE_INIT(&tcp->read_closure, tcp_handle_read, tcp,
- grpc_schedule_on_exec_ctx);
- GRPC_CLOSURE_INIT(&tcp->write_closure, tcp_handle_write, tcp,
- grpc_schedule_on_exec_ctx);
grpc_slice_buffer_init(&tcp->last_read_buffer);
tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string);
grpc_resource_user_slice_allocator_init(
diff --git a/src/core/lib/security/transport/security_handshaker.c b/src/core/lib/security/transport/security_handshaker.c
index fc9c9f980f..bf7a5272cb 100644
--- a/src/core/lib/security/transport/security_handshaker.c
+++ b/src/core/lib/security/transport/security_handshaker.c
@@ -127,13 +127,11 @@ static void security_handshake_failed_locked(grpc_exec_ctx *exec_ctx,
GRPC_CLOSURE_SCHED(exec_ctx, h->on_handshake_done, error);
}
-static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- security_handshaker *h = arg;
- gpr_mu_lock(&h->mu);
+static void on_peer_checked_inner(grpc_exec_ctx *exec_ctx,
+ security_handshaker *h, grpc_error *error) {
if (error != GRPC_ERROR_NONE || h->shutdown) {
security_handshake_failed_locked(exec_ctx, h, GRPC_ERROR_REF(error));
- goto done;
+ return;
}
// Create frame protector.
tsi_frame_protector *protector;
@@ -144,7 +142,7 @@ static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Frame protector creation failed"),
result);
security_handshake_failed_locked(exec_ctx, h, error);
- goto done;
+ return;
}
// Get unused bytes.
const unsigned char *unused_bytes = NULL;
@@ -177,7 +175,13 @@ static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg,
// Set shutdown to true so that subsequent calls to
// security_handshaker_shutdown() do nothing.
h->shutdown = true;
-done:
+}
+
+static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ security_handshaker *h = (security_handshaker *)arg;
+ gpr_mu_lock(&h->mu);
+ on_peer_checked_inner(exec_ctx, h, error);
gpr_mu_unlock(&h->mu);
security_handshaker_unref(exec_ctx, h);
}
@@ -239,7 +243,7 @@ static grpc_error *on_handshake_next_done_locked(
static void on_handshake_next_done_grpc_wrapper(
tsi_result result, void *user_data, const unsigned char *bytes_to_send,
size_t bytes_to_send_size, tsi_handshaker_result *handshaker_result) {
- security_handshaker *h = user_data;
+ security_handshaker *h = (security_handshaker *)user_data;
// This callback will be invoked by TSI in a non-grpc thread, so it's
// safe to create our own exec_ctx here.
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
@@ -281,7 +285,7 @@ static grpc_error *do_handshaker_next_locked(
static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx,
void *arg, grpc_error *error) {
- security_handshaker *h = arg;
+ security_handshaker *h = (security_handshaker *)arg;
gpr_mu_lock(&h->mu);
if (error != GRPC_ERROR_NONE || h->shutdown) {
security_handshake_failed_locked(
@@ -298,7 +302,8 @@ static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx,
bytes_received_size += GRPC_SLICE_LENGTH(h->args->read_buffer->slices[i]);
}
if (bytes_received_size > h->handshake_buffer_size) {
- h->handshake_buffer = gpr_realloc(h->handshake_buffer, bytes_received_size);
+ h->handshake_buffer =
+ (uint8_t *)gpr_realloc(h->handshake_buffer, bytes_received_size);
h->handshake_buffer_size = bytes_received_size;
}
size_t offset = 0;
@@ -323,7 +328,7 @@ static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx,
static void on_handshake_data_sent_to_peer(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
- security_handshaker *h = arg;
+ security_handshaker *h = (security_handshaker *)arg;
gpr_mu_lock(&h->mu);
if (error != GRPC_ERROR_NONE || h->shutdown) {
security_handshake_failed_locked(
@@ -400,14 +405,15 @@ static const grpc_handshaker_vtable security_handshaker_vtable = {
static grpc_handshaker *security_handshaker_create(
grpc_exec_ctx *exec_ctx, tsi_handshaker *handshaker,
grpc_security_connector *connector) {
- security_handshaker *h = gpr_zalloc(sizeof(security_handshaker));
+ security_handshaker *h =
+ (security_handshaker *)gpr_zalloc(sizeof(security_handshaker));
grpc_handshaker_init(&security_handshaker_vtable, &h->base);
h->handshaker = handshaker;
h->connector = GRPC_SECURITY_CONNECTOR_REF(connector, "handshake");
gpr_mu_init(&h->mu);
gpr_ref_init(&h->refs, 1);
h->handshake_buffer_size = GRPC_INITIAL_HANDSHAKE_BUFFER_SIZE;
- h->handshake_buffer = gpr_malloc(h->handshake_buffer_size);
+ h->handshake_buffer = (uint8_t *)gpr_malloc(h->handshake_buffer_size);
GRPC_CLOSURE_INIT(&h->on_handshake_data_sent_to_peer,
on_handshake_data_sent_to_peer, h,
grpc_schedule_on_exec_ctx);
@@ -450,7 +456,7 @@ static const grpc_handshaker_vtable fail_handshaker_vtable = {
fail_handshaker_do_handshake};
static grpc_handshaker *fail_handshaker_create() {
- grpc_handshaker *h = gpr_malloc(sizeof(*h));
+ grpc_handshaker *h = (grpc_handshaker *)gpr_malloc(sizeof(*h));
grpc_handshaker_init(&fail_handshaker_vtable, h);
return h;
}
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 66dcc299aa..f8c84fb4a8 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -1116,9 +1116,11 @@ void grpc_server_start(grpc_server *server) {
server_ref(server);
server->starting = true;
- GRPC_CLOSURE_SCHED(&exec_ctx, GRPC_CLOSURE_CREATE(start_listeners, server,
- grpc_executor_scheduler),
- GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(
+ &exec_ctx,
+ GRPC_CLOSURE_CREATE(start_listeners, server,
+ grpc_executor_scheduler(GRPC_EXECUTOR_SHORT)),
+ GRPC_ERROR_NONE);
grpc_exec_ctx_finish(&exec_ctx);
}
diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c
index 6c61f4b8d9..0ca7688ca4 100644
--- a/src/core/lib/transport/transport.c
+++ b/src/core/lib/transport/transport.c
@@ -72,7 +72,8 @@ void grpc_stream_unref(grpc_exec_ctx *exec_ctx,
cope with.
Throw this over to the executor (on a core-owned thread) and process it
there. */
- refcount->destroy.scheduler = grpc_executor_scheduler;
+ refcount->destroy.scheduler =
+ grpc_executor_scheduler(GRPC_EXECUTOR_SHORT);
}
GRPC_CLOSURE_SCHED(exec_ctx, &refcount->destroy, GRPC_ERROR_NONE);
}