aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext
diff options
context:
space:
mode:
authorGravatar Nicolas Noble <nicolasnoble@users.noreply.github.com>2016-09-12 11:59:25 -0700
committerGravatar GitHub <noreply@github.com>2016-09-12 11:59:25 -0700
commit6e51f992c6bfdfba61d984ab173305da455bd2e7 (patch)
tree0ca276fbe0dac5c2be06d8ba74ac2193cdd3f550 /src/core/ext
parentec5c93cabfbf535be2528df55ca8bb4500e6bc9b (diff)
parent537f7c2a136641487febeac89a25e430029eb40c (diff)
Merge pull request #8068 from grpc/revert-7279-grand-unified-closures
Revert "Grand unified closures"
Diffstat (limited to 'src/core/ext')
-rw-r--r--src/core/ext/client_config/client_channel.c14
-rw-r--r--src/core/ext/client_config/subchannel.c18
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c766
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h54
-rw-r--r--src/core/ext/transport/chttp2/transport/parsing.c12
-rw-r--r--src/core/ext/transport/chttp2/transport/stream_lists.c9
-rw-r--r--src/core/ext/transport/chttp2/transport/writing.c21
7 files changed, 503 insertions, 391 deletions
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c
index be333f4e0d..61e012578e 100644
--- a/src/core/ext/client_config/client_channel.c
+++ b/src/core/ext/client_config/client_channel.c
@@ -387,7 +387,7 @@ typedef struct client_channel_call_data {
grpc_connected_subchannel *connected_subchannel;
grpc_polling_entity *pollent;
- grpc_transport_stream_op **waiting_ops;
+ grpc_transport_stream_op *waiting_ops;
size_t waiting_ops_count;
size_t waiting_ops_capacity;
@@ -404,7 +404,7 @@ static void add_waiting_locked(call_data *calld, grpc_transport_stream_op *op) {
gpr_realloc(calld->waiting_ops,
calld->waiting_ops_capacity * sizeof(*calld->waiting_ops));
}
- calld->waiting_ops[calld->waiting_ops_count++] = op;
+ calld->waiting_ops[calld->waiting_ops_count++] = *op;
GPR_TIMER_END("add_waiting_locked", 0);
}
@@ -413,14 +413,14 @@ static void fail_locked(grpc_exec_ctx *exec_ctx, call_data *calld,
size_t i;
for (i = 0; i < calld->waiting_ops_count; i++) {
grpc_transport_stream_op_finish_with_failure(
- exec_ctx, calld->waiting_ops[i], GRPC_ERROR_REF(error));
+ exec_ctx, &calld->waiting_ops[i], GRPC_ERROR_REF(error));
}
calld->waiting_ops_count = 0;
GRPC_ERROR_UNREF(error);
}
typedef struct {
- grpc_transport_stream_op **ops;
+ grpc_transport_stream_op *ops;
size_t nops;
grpc_subchannel_call *call;
} retry_ops_args;
@@ -429,7 +429,7 @@ static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) {
retry_ops_args *a = args;
size_t i;
for (i = 0; i < a->nops; i++) {
- grpc_subchannel_call_process_op(exec_ctx, a->call, a->ops[i]);
+ grpc_subchannel_call_process_op(exec_ctx, a->call, &a->ops[i]);
}
GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, a->call, "retry_ops");
gpr_free(a->ops);
@@ -437,10 +437,6 @@ static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) {
}
static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) {
- if (calld->waiting_ops_count == 0) {
- return;
- }
-
retry_ops_args *a = gpr_malloc(sizeof(*a));
a->ops = calld->waiting_ops;
a->nops = calld->waiting_ops_count;
diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c
index 2c4364b259..df35904b85 100644
--- a/src/core/ext/client_config/subchannel.c
+++ b/src/core/ext/client_config/subchannel.c
@@ -504,13 +504,14 @@ static void connected_subchannel_state_op(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *interested_parties,
grpc_connectivity_state *state,
grpc_closure *closure) {
- grpc_transport_op *op = grpc_make_transport_op(NULL);
+ grpc_transport_op op;
grpc_channel_element *elem;
- op->connectivity_state = state;
- op->on_connectivity_state_change = closure;
- op->bind_pollset_set = interested_parties;
+ memset(&op, 0, sizeof(op));
+ op.connectivity_state = state;
+ op.on_connectivity_state_change = closure;
+ op.bind_pollset_set = interested_parties;
elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0);
- elem->filter->start_transport_op(exec_ctx, elem, op);
+ elem->filter->start_transport_op(exec_ctx, elem, &op);
}
void grpc_connected_subchannel_notify_on_state_change(
@@ -524,11 +525,12 @@ void grpc_connected_subchannel_notify_on_state_change(
void grpc_connected_subchannel_ping(grpc_exec_ctx *exec_ctx,
grpc_connected_subchannel *con,
grpc_closure *closure) {
- grpc_transport_op *op = grpc_make_transport_op(NULL);
+ grpc_transport_op op;
grpc_channel_element *elem;
- op->send_ping = closure;
+ memset(&op, 0, sizeof(op));
+ op.send_ping = closure;
elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0);
- elem->filter->start_transport_op(exec_ctx, elem, op);
+ elem->filter->start_transport_op(exec_ctx, elem, &op);
}
static void publish_transport_locked(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 53da0e5d70..00999e3b94 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -89,20 +89,13 @@ static const grpc_transport_vtable vtable;
static void writing_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error);
static void reading_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error);
static void parsing_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error);
-static void reading_action_locked(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error);
-static void post_parse_locked(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error);
-static void initiate_writing_locked(grpc_exec_ctx *exec_ctx, void *t,
- grpc_error *error);
-static void initiate_read_flush_locked(grpc_exec_ctx *exec_ctx, void *t,
- grpc_error *error);
-static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, void *t,
- grpc_error *error);
+static void initiate_writing(grpc_exec_ctx *exec_ctx, void *t,
+ grpc_error *error);
static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t);
static void end_waiting_for_write(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t, grpc_error *error);
+ grpc_chttp2_transport *t, grpc_error *error,
+ const char *reason);
/** Set a transport level setting, and push it to our peer */
static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
@@ -112,6 +105,11 @@ static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_error *error);
+/** Perform a transport_op */
+static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s, void *transport_op);
+
/** Cancel a stream: coming from the transport API */
static void cancel_from_api(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global,
@@ -123,10 +121,22 @@ static void close_from_api(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream_global *stream_global,
grpc_error *error);
+/** Add endpoint from this transport to pollset */
+static void add_to_pollset_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s_ignored, void *pollset);
+static void add_to_pollset_set_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s_ignored,
+ void *pollset_set);
+
/** Start new streams that have been created if we can */
static void maybe_start_some_streams(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global);
+static void finish_global_actions(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t);
+
static void connectivity_state_set(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
grpc_connectivity_state state, grpc_error *error, const char *reason);
@@ -139,16 +149,14 @@ static void incoming_byte_stream_update_flow_control(
grpc_chttp2_stream_global *stream_global, size_t max_size_hint,
size_t have_already);
static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
- void *byte_stream,
- grpc_error *error_ignored);
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s,
+ void *byte_stream);
static void fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global,
grpc_error *error);
-static void set_write_state(grpc_chttp2_transport *t,
- grpc_chttp2_write_state state, const char *reason);
-
/*******************************************************************************
* CONSTRUCTION/DESTRUCTION/REFCOUNTING
*/
@@ -157,7 +165,9 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) {
size_t i;
- grpc_endpoint_destroy(exec_ctx, t->ep);
+ gpr_mu_lock(&t->executor.mu);
+
+ GPR_ASSERT(t->ep == NULL);
gpr_slice_buffer_destroy(&t->global.qbuf);
@@ -181,7 +191,8 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream_map_destroy(&t->new_stream_map);
grpc_connectivity_state_destroy(exec_ctx, &t->channel_callback.state_tracker);
- grpc_combiner_destroy(exec_ctx, t->executor.combiner);
+ gpr_mu_unlock(&t->executor.mu);
+ gpr_mu_destroy(&t->executor.mu);
/* callback remaining pings: they're not allowed to call into the transpot,
and maybe they hold resources that need to be freed */
@@ -239,13 +250,12 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
memset(t, 0, sizeof(*t));
t->base.vtable = &vtable;
- t->executor.write_state = GRPC_CHTTP2_WRITES_CORKED;
t->ep = ep;
- /* one ref is for destroy */
- gpr_ref_init(&t->refs, 1);
+ /* one ref is for destroy, the other for when ep becomes NULL */
+ gpr_ref_init(&t->refs, 2);
/* ref is dropped at transport close() */
gpr_ref_init(&t->shutdown_ep_refs, 1);
- t->executor.combiner = grpc_combiner_create(grpc_endpoint_get_workqueue(ep));
+ gpr_mu_init(&t->executor.mu);
t->peer_string = grpc_endpoint_get_peer(ep);
t->endpoint_reading = 1;
t->global.next_stream_id = is_client ? 1 : 2;
@@ -271,22 +281,23 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_hpack_compressor_init(&t->writing.hpack_compressor);
grpc_closure_init(&t->writing_action, writing_action, t);
grpc_closure_init(&t->reading_action, reading_action, t);
- grpc_closure_init(&t->reading_action_locked, reading_action_locked, t);
grpc_closure_init(&t->parsing_action, parsing_action, t);
- grpc_closure_init(&t->post_parse_locked, post_parse_locked, t);
- grpc_closure_init(&t->initiate_writing, initiate_writing_locked, t);
- grpc_closure_init(&t->terminate_writing, terminate_writing_with_lock, t);
- grpc_closure_init(&t->initiate_read_flush_locked, initiate_read_flush_locked,
- t);
- grpc_closure_init(&t->writing.done_cb, grpc_chttp2_terminate_writing,
- &t->writing);
+ grpc_closure_init(&t->initiate_writing, initiate_writing, t);
gpr_slice_buffer_init(&t->parsing.qbuf);
grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser);
grpc_chttp2_hpack_parser_init(&t->parsing.hpack_parser);
+ grpc_closure_init(&t->writing.done_cb, grpc_chttp2_terminate_writing,
+ &t->writing);
gpr_slice_buffer_init(&t->read_buffer);
+ if (is_client) {
+ gpr_slice_buffer_add(
+ &t->global.qbuf,
+ gpr_slice_from_copied_string(GRPC_CHTTP2_CLIENT_CONNECT_STRING));
+ grpc_chttp2_initiate_write(exec_ctx, &t->global, false, "initial_write");
+ }
/* 8 is a random stab in the dark as to a good initial size: it's small enough
that it shouldn't waste memory for infrequently used connections, yet
large enough that the exponential growth should happen nicely when it's
@@ -309,13 +320,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->global.force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
t->global.sent_local_settings = 0;
- if (is_client) {
- gpr_slice_buffer_add(
- &t->writing.outbuf,
- gpr_slice_from_copied_string(GRPC_CHTTP2_CLIENT_CONNECT_STRING));
- grpc_chttp2_initiate_write(exec_ctx, &t->global, false, "initial_write");
- }
-
/* configure http2 the way we like it */
if (is_client) {
push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0);
@@ -420,39 +424,47 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
}
}
-
- set_write_state(t, GRPC_CHTTP2_WRITING_INACTIVE, "uncork");
- grpc_chttp2_initiate_write(exec_ctx, &t->global, false, "init");
}
-static void destroy_transport_locked(grpc_exec_ctx *exec_ctx, void *tp,
- grpc_error *error) {
- grpc_chttp2_transport *t = tp;
+static void destroy_transport_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s_ignored,
+ void *arg_ignored) {
t->destroying = 1;
drop_connection(exec_ctx, t, GRPC_ERROR_CREATE("Transport destroyed"));
- UNREF_TRANSPORT(exec_ctx, t, "destroy");
}
static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
- grpc_combiner_execute(exec_ctx, t->executor.combiner,
- grpc_closure_create(destroy_transport_locked, t),
- GRPC_ERROR_NONE);
+ grpc_chttp2_run_with_global_lock(exec_ctx, t, NULL, destroy_transport_locked,
+ NULL, 0);
+ UNREF_TRANSPORT(exec_ctx, t, "destroy");
}
/** block grpc_endpoint_shutdown being called until a paired
allow_endpoint_shutdown is made */
static void prevent_endpoint_shutdown(grpc_chttp2_transport *t) {
+ GPR_ASSERT(t->ep);
gpr_ref(&t->shutdown_ep_refs);
}
static void allow_endpoint_shutdown_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) {
if (gpr_unref(&t->shutdown_ep_refs)) {
- grpc_endpoint_shutdown(exec_ctx, t->ep);
+ if (t->ep) {
+ grpc_endpoint_shutdown(exec_ctx, t->ep);
+ }
}
}
+static void destroy_endpoint(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t) {
+ grpc_endpoint_destroy(exec_ctx, t->ep);
+ t->ep = NULL;
+ /* safe because we'll still have the ref for write */
+ UNREF_TRANSPORT(exec_ctx, t, "disconnect");
+}
+
static void close_transport_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_error *error) {
@@ -463,7 +475,9 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
t->closed = 1;
connectivity_state_set(exec_ctx, &t->global, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "close_transport");
- allow_endpoint_shutdown_locked(exec_ctx, t);
+ if (t->ep) {
+ allow_endpoint_shutdown_locked(exec_ctx, t);
+ }
/* flush writable stream list to avoid dangling references */
grpc_chttp2_stream_global *stream_global;
@@ -497,23 +511,21 @@ void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx,
}
#endif
-static void finish_init_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
- grpc_error *error) {
- grpc_chttp2_stream *s = sp;
- grpc_chttp2_register_stream(s->t, s);
- GRPC_CHTTP2_STREAM_UNREF(exec_ctx, &s->global, "init");
+static void finish_init_stream_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s,
+ void *arg_ignored) {
+ grpc_chttp2_register_stream(t, s);
}
static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_stream_refcount *refcount,
const void *server_data) {
- GPR_TIMER_BEGIN("init_stream", 0);
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
memset(s, 0, sizeof(*s));
- s->t = t;
s->refcount = refcount;
/* We reserve one 'active stream' that's dropped when the stream is
read-closed. The others are for incoming_byte_streams that are actively
@@ -548,21 +560,16 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
s->global.in_stream_map = true;
}
- grpc_closure_init(&s->init_stream, finish_init_stream_locked, s);
- GRPC_CHTTP2_STREAM_REF(&s->global, "init");
- grpc_combiner_execute(exec_ctx, t->executor.combiner, &s->init_stream,
- GRPC_ERROR_NONE);
-
- GPR_TIMER_END("init_stream", 0);
+ grpc_chttp2_run_with_global_lock(exec_ctx, t, s, finish_init_stream_locked,
+ NULL, 0);
return 0;
}
-static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
- grpc_error *error) {
+static void destroy_stream_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s, void *arg) {
grpc_byte_stream *bs;
- grpc_chttp2_stream *s = sp;
- grpc_chttp2_transport *t = s->t;
GPR_TIMER_BEGIN("destroy_stream", 0);
@@ -581,7 +588,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
while (
(bs = grpc_chttp2_incoming_frame_queue_pop(&s->global.incoming_frames))) {
- incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
+ incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs);
}
grpc_chttp2_list_remove_unannounced_incoming_window_available(&t->global,
@@ -618,20 +625,16 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
GPR_TIMER_END("destroy_stream", 0);
- gpr_free(s->destroy_stream_arg);
+ gpr_free(arg);
}
static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, void *and_free_memory) {
- GPR_TIMER_BEGIN("destroy_stream", 0);
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
- s->destroy_stream_arg = and_free_memory;
- grpc_closure_init(&s->destroy_stream, destroy_stream_locked, s);
- grpc_combiner_execute(exec_ctx, t->executor.combiner, &s->destroy_stream,
- GRPC_ERROR_NONE);
- GPR_TIMER_END("destroy_stream", 0);
+ grpc_chttp2_run_with_global_lock(exec_ctx, t, s, destroy_stream_locked,
+ and_free_memory, 0);
}
grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream(
@@ -662,10 +665,12 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(
static const char *write_state_name(grpc_chttp2_write_state state) {
switch (state) {
- case GRPC_CHTTP2_WRITES_CORKED:
- return "CORKED";
case GRPC_CHTTP2_WRITING_INACTIVE:
return "INACTIVE";
+ case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER:
+ return "REQUESTED[p=0]";
+ case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER:
+ return "REQUESTED[p=1]";
case GRPC_CHTTP2_WRITE_SCHEDULED:
return "SCHEDULED";
case GRPC_CHTTP2_WRITING:
@@ -688,18 +693,120 @@ static void set_write_state(grpc_chttp2_transport *t,
t->executor.write_state = state;
}
-static void initiate_writing_locked(grpc_exec_ctx *exec_ctx, void *tp,
- grpc_error *error) {
- grpc_chttp2_transport *t = tp;
- GPR_ASSERT(t->executor.write_state == GRPC_CHTTP2_WRITE_SCHEDULED);
- start_writing(exec_ctx, t);
+static void finish_global_actions(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t) {
+ grpc_chttp2_executor_action_header *hdr;
+ grpc_chttp2_executor_action_header *next;
+
+ GPR_TIMER_BEGIN("finish_global_actions", 0);
+
+ for (;;) {
+ check_read_ops(exec_ctx, &t->global);
+
+ gpr_mu_lock(&t->executor.mu);
+ if (t->executor.pending_actions_head != NULL) {
+ hdr = t->executor.pending_actions_head;
+ t->executor.pending_actions_head = t->executor.pending_actions_tail =
+ NULL;
+ gpr_mu_unlock(&t->executor.mu);
+ while (hdr != NULL) {
+ GPR_TIMER_BEGIN("chttp2:locked_action", 0);
+ hdr->action(exec_ctx, t, hdr->stream, hdr->arg);
+ GPR_TIMER_END("chttp2:locked_action", 0);
+ next = hdr->next;
+ gpr_free(hdr);
+ UNREF_TRANSPORT(exec_ctx, t, "pending_action");
+ hdr = next;
+ }
+ continue;
+ } else {
+ t->executor.global_active = false;
+ switch (t->executor.write_state) {
+ case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER:
+ set_write_state(t, GRPC_CHTTP2_WRITE_SCHEDULED, "unlocking");
+ REF_TRANSPORT(t, "initiate_writing");
+ gpr_mu_unlock(&t->executor.mu);
+ grpc_exec_ctx_sched(
+ exec_ctx, &t->initiate_writing, GRPC_ERROR_NONE,
+ t->ep != NULL ? grpc_endpoint_get_workqueue(t->ep) : NULL);
+ break;
+ case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER:
+ start_writing(exec_ctx, t);
+ gpr_mu_unlock(&t->executor.mu);
+ break;
+ case GRPC_CHTTP2_WRITING_INACTIVE:
+ case GRPC_CHTTP2_WRITING:
+ case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER:
+ case GRPC_CHTTP2_WRITING_STALE_NO_POLLER:
+ case GRPC_CHTTP2_WRITE_SCHEDULED:
+ gpr_mu_unlock(&t->executor.mu);
+ break;
+ }
+ }
+ break;
+ }
+
+ GPR_TIMER_END("finish_global_actions", 0);
}
-static void initiate_read_flush_locked(grpc_exec_ctx *exec_ctx, void *tp,
- grpc_error *error) {
- grpc_chttp2_transport *t = tp;
- t->executor.check_read_ops_scheduled = false;
- check_read_ops(exec_ctx, &t->global);
+void grpc_chttp2_run_with_global_lock(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *optional_stream,
+ grpc_chttp2_locked_action action,
+ void *arg, size_t sizeof_arg) {
+ grpc_chttp2_executor_action_header *hdr;
+
+ GPR_TIMER_BEGIN("grpc_chttp2_run_with_global_lock", 0);
+
+ REF_TRANSPORT(t, "run_global");
+ gpr_mu_lock(&t->executor.mu);
+
+ for (;;) {
+ if (!t->executor.global_active) {
+ t->executor.global_active = 1;
+ gpr_mu_unlock(&t->executor.mu);
+
+ GPR_TIMER_BEGIN("chttp2:locked_action", 0);
+ action(exec_ctx, t, optional_stream, arg);
+ GPR_TIMER_END("chttp2:locked_action", 0);
+
+ finish_global_actions(exec_ctx, t);
+ } else {
+ gpr_mu_unlock(&t->executor.mu);
+
+ hdr = gpr_malloc(sizeof(*hdr) + sizeof_arg);
+ hdr->stream = optional_stream;
+ hdr->action = action;
+ if (sizeof_arg == 0) {
+ hdr->arg = arg;
+ } else {
+ hdr->arg = hdr + 1;
+ memcpy(hdr->arg, arg, sizeof_arg);
+ }
+
+ gpr_mu_lock(&t->executor.mu);
+ if (!t->executor.global_active) {
+ /* global lock was released while allocating memory: release & retry */
+ gpr_free(hdr);
+ continue;
+ }
+ hdr->next = NULL;
+ if (t->executor.pending_actions_head != NULL) {
+ t->executor.pending_actions_tail =
+ t->executor.pending_actions_tail->next = hdr;
+ } else {
+ t->executor.pending_actions_tail = t->executor.pending_actions_head =
+ hdr;
+ }
+ REF_TRANSPORT(t, "pending_action");
+ gpr_mu_unlock(&t->executor.mu);
+ }
+ break;
+ }
+
+ UNREF_TRANSPORT(exec_ctx, t, "run_global");
+
+ GPR_TIMER_END("grpc_chttp2_run_with_global_lock", 0);
}
/*******************************************************************************
@@ -709,12 +816,11 @@ static void initiate_read_flush_locked(grpc_exec_ctx *exec_ctx, void *tp,
void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global,
bool covered_by_poller, const char *reason) {
- GPR_TIMER_BEGIN("grpc_chttp2_initiate_write", 0);
-
/* Perform state checks, and transition to a scheduled state if appropriate.
- If we are inactive, schedule a write chain to begin once the transport
- combiner finishes any executions in its current batch (which may be
- scheduled AFTER this code executes). The write chain will:
+ Each time we finish the global lock execution, we check if we need to
+ write. If we do:
+ - (if there is a poller surrounding the write) schedule
+ initiate_writing, which locks and calls initiate_writing_locked to...
- call start_writing, which verifies (under the global lock) that there
are things that need to be written by calling
grpc_chttp2_unlocking_check_writes, and if so schedules writing_action
@@ -724,28 +830,31 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
to do *another* write immediately, and if so loops back to
start_writing.
- Current problems:
+ Current problems:
- too much lock entry/exiting
- the writing thread can become stuck indefinitely (punt through the
workqueue periodically to fix) */
grpc_chttp2_transport *t = TRANSPORT_FROM_GLOBAL(transport_global);
switch (t->executor.write_state) {
- case GRPC_CHTTP2_WRITES_CORKED:
- break;
case GRPC_CHTTP2_WRITING_INACTIVE:
- set_write_state(t, GRPC_CHTTP2_WRITE_SCHEDULED, reason);
- REF_TRANSPORT(t, "writing");
- grpc_combiner_execute_finally(exec_ctx, t->executor.combiner,
- &t->initiate_writing, GRPC_ERROR_NONE,
- covered_by_poller);
+ set_write_state(t, covered_by_poller
+ ? GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER
+ : GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER,
+ reason);
break;
- case GRPC_CHTTP2_WRITE_SCHEDULED:
+ case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER:
+ /* nothing to do: write already requested */
+ break;
+ case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER:
if (covered_by_poller) {
/* upgrade to note poller is available to cover the write */
- grpc_combiner_force_async_finally(t->executor.combiner);
+ set_write_state(t, GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER, reason);
}
break;
+ case GRPC_CHTTP2_WRITE_SCHEDULED:
+ /* nothing to do: write already scheduled */
+ break;
case GRPC_CHTTP2_WRITING:
set_write_state(t,
covered_by_poller ? GRPC_CHTTP2_WRITING_STALE_WITH_POLLER
@@ -762,15 +871,15 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
}
break;
}
- GPR_TIMER_END("grpc_chttp2_initiate_write", 0);
}
static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
- GPR_TIMER_BEGIN("start_writing", 0);
- GPR_ASSERT(t->executor.write_state == GRPC_CHTTP2_WRITE_SCHEDULED);
+ GPR_ASSERT(t->executor.write_state == GRPC_CHTTP2_WRITE_SCHEDULED ||
+ t->executor.write_state == GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER);
if (!t->closed &&
grpc_chttp2_unlocking_check_writes(exec_ctx, &t->global, &t->writing)) {
set_write_state(t, GRPC_CHTTP2_WRITING, "start_writing");
+ REF_TRANSPORT(t, "writing");
prevent_endpoint_shutdown(t);
grpc_exec_ctx_sched(exec_ctx, &t->writing_action, GRPC_ERROR_NONE, NULL);
} else {
@@ -781,10 +890,25 @@ static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
set_write_state(t, GRPC_CHTTP2_WRITING_INACTIVE,
"start_writing:nothing_to_write");
}
- end_waiting_for_write(exec_ctx, t, GRPC_ERROR_NONE);
- UNREF_TRANSPORT(exec_ctx, t, "writing");
+ end_waiting_for_write(exec_ctx, t, GRPC_ERROR_NONE, "Nothing to write");
+ if (t->ep && !t->endpoint_reading) {
+ destroy_endpoint(exec_ctx, t);
+ }
}
- GPR_TIMER_END("start_writing", 0);
+}
+
+static void initiate_writing_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s_unused,
+ void *arg_ignored) {
+ start_writing(exec_ctx, t);
+ UNREF_TRANSPORT(exec_ctx, t, "initiate_writing");
+}
+
+static void initiate_writing(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_chttp2_run_with_global_lock(exec_ctx, arg, NULL, initiate_writing_locked,
+ NULL, 0);
}
void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx,
@@ -818,10 +942,15 @@ static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
/* error may be GRPC_ERROR_NONE if there is no error allocated yet.
In that case, use "reason" as the text for a new error. */
static void end_waiting_for_write(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t, grpc_error *error) {
+ grpc_chttp2_transport *t, grpc_error *error,
+ const char *reason) {
grpc_chttp2_stream_global *stream_global;
while (grpc_chttp2_list_pop_closed_waiting_for_writing(&t->global,
&stream_global)) {
+ if (error == GRPC_ERROR_NONE && reason != NULL) {
+ /* create error object. */
+ error = GRPC_ERROR_CREATE(reason);
+ }
fail_pending_writes(exec_ctx, &t->global, stream_global,
GRPC_ERROR_REF(error));
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "finish_writes");
@@ -829,10 +958,12 @@ static void end_waiting_for_write(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_UNREF(error);
}
-static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, void *tp,
- grpc_error *error) {
- GPR_TIMER_BEGIN("terminate_writing_with_lock", 0);
- grpc_chttp2_transport *t = tp;
+static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s_ignored,
+ void *a) {
+ grpc_error *error = a;
+
allow_endpoint_shutdown_locked(exec_ctx, t);
if (error != GRPC_ERROR_NONE) {
@@ -841,46 +972,39 @@ static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, void *tp,
grpc_chttp2_cleanup_writing(exec_ctx, &t->global, &t->writing);
- end_waiting_for_write(exec_ctx, t, GRPC_ERROR_REF(error));
+ end_waiting_for_write(exec_ctx, t, error, NULL);
switch (t->executor.write_state) {
- case GRPC_CHTTP2_WRITES_CORKED:
case GRPC_CHTTP2_WRITING_INACTIVE:
+ case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER:
+ case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER:
case GRPC_CHTTP2_WRITE_SCHEDULED:
GPR_UNREACHABLE_CODE(break);
case GRPC_CHTTP2_WRITING:
- GPR_TIMER_MARK("state=writing", 0);
set_write_state(t, GRPC_CHTTP2_WRITING_INACTIVE, "terminate_writing");
break;
case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER:
- GPR_TIMER_MARK("state=writing_stale_with_poller", 0);
- set_write_state(t, GRPC_CHTTP2_WRITE_SCHEDULED, "terminate_writing");
- REF_TRANSPORT(t, "writing");
- grpc_combiner_execute_finally(exec_ctx, t->executor.combiner,
- &t->initiate_writing, GRPC_ERROR_NONE,
- true);
+ set_write_state(t, GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER,
+ "terminate_writing");
break;
case GRPC_CHTTP2_WRITING_STALE_NO_POLLER:
- GPR_TIMER_MARK("state=writing_stale_no_poller", 0);
- set_write_state(t, GRPC_CHTTP2_WRITE_SCHEDULED, "terminate_writing");
- REF_TRANSPORT(t, "writing");
- grpc_combiner_execute_finally(exec_ctx, t->executor.combiner,
- &t->initiate_writing, GRPC_ERROR_NONE,
- false);
+ set_write_state(t, GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER,
+ "terminate_writing");
break;
}
+ if (t->ep && !t->endpoint_reading) {
+ destroy_endpoint(exec_ctx, t);
+ }
+
UNREF_TRANSPORT(exec_ctx, t, "writing");
- GPR_TIMER_END("terminate_writing_with_lock", 0);
}
void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx,
void *transport_writing, grpc_error *error) {
- GPR_TIMER_BEGIN("grpc_chttp2_terminate_writing", 0);
grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing);
- grpc_combiner_execute(exec_ctx, t->executor.combiner, &t->terminate_writing,
- GRPC_ERROR_REF(error));
- GPR_TIMER_END("grpc_chttp2_terminate_writing", 0);
+ grpc_chttp2_run_with_global_lock(
+ exec_ctx, t, NULL, terminate_writing_with_lock, GRPC_ERROR_REF(error), 0);
}
static void writing_action(grpc_exec_ctx *exec_ctx, void *gt,
@@ -1024,22 +1148,15 @@ static int contains_non_ok_status(
static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {}
-static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
- grpc_error *error_ignored) {
+static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s, void *stream_op) {
GPR_TIMER_BEGIN("perform_stream_op_locked", 0);
grpc_transport_stream_op *op = stream_op;
- grpc_chttp2_transport *t = op->transport_private.args[0];
- grpc_chttp2_stream *s = op->transport_private.args[1];
grpc_chttp2_transport_global *transport_global = &t->global;
grpc_chttp2_stream_global *stream_global = &s->global;
- if (grpc_http_trace) {
- char *str = grpc_transport_stream_op_string(op);
- gpr_log(GPR_DEBUG, "perform_stream_op_locked: %s", str);
- gpr_free(str);
- }
-
grpc_closure *on_complete = op->on_complete;
if (on_complete == NULL) {
on_complete = grpc_closure_create(do_nothing, NULL);
@@ -1094,8 +1211,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
} else {
if (contains_non_ok_status(transport_global, op->send_initial_metadata)) {
stream_global->seen_error = true;
- grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
- stream_global);
+ grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
if (!stream_global->write_closed) {
if (transport_global->is_client) {
@@ -1162,8 +1278,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
if (contains_non_ok_status(transport_global,
op->send_trailing_metadata)) {
stream_global->seen_error = true;
- grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
- stream_global);
+ grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
if (stream_global->write_closed) {
stream_global->send_trailing_metadata = NULL;
@@ -1188,8 +1303,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
stream_global->recv_initial_metadata_ready =
op->recv_initial_metadata_ready;
stream_global->recv_initial_metadata = op->recv_initial_metadata;
- grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
- stream_global);
+ grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
if (op->recv_message != NULL) {
@@ -1203,8 +1317,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
exec_ctx, transport_global, stream_global,
transport_global->stream_lookahead, 0);
}
- grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
- stream_global);
+ grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
if (op->recv_trailing_metadata != NULL) {
@@ -1213,30 +1326,21 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
add_closure_barrier(on_complete);
stream_global->recv_trailing_metadata = op->recv_trailing_metadata;
stream_global->final_metadata_requested = true;
- grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
- stream_global);
+ grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
grpc_chttp2_complete_closure_step(exec_ctx, transport_global, stream_global,
&on_complete, GRPC_ERROR_NONE);
GPR_TIMER_END("perform_stream_op_locked", 0);
- GRPC_CHTTP2_STREAM_UNREF(exec_ctx, &s->global, "perform_stream_op");
}
static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_transport_stream_op *op) {
- GPR_TIMER_BEGIN("perform_stream_op", 0);
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
- grpc_closure_init(&op->transport_private.closure, perform_stream_op_locked,
- op);
- op->transport_private.args[0] = gt;
- op->transport_private.args[1] = gs;
- GRPC_CHTTP2_STREAM_REF(&s->global, "perform_stream_op");
- grpc_combiner_execute(exec_ctx, t->executor.combiner,
- &op->transport_private.closure, GRPC_ERROR_NONE);
- GPR_TIMER_END("perform_stream_op", 0);
+ grpc_chttp2_run_with_global_lock(exec_ctx, t, s, perform_stream_op_locked, op,
+ sizeof(*op));
}
static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
@@ -1258,20 +1362,13 @@ static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_initiate_write(exec_ctx, &t->global, true, "send_ping");
}
-typedef struct ack_ping_args {
- grpc_closure closure;
- grpc_chttp2_transport *t;
- uint8_t opaque_8bytes[8];
-} ack_ping_args;
-
-static void ack_ping_locked(grpc_exec_ctx *exec_ctx, void *a,
- grpc_error *error_ignored) {
- ack_ping_args *args = a;
+static void ack_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s, void *opaque_8bytes) {
grpc_chttp2_outstanding_ping *ping;
- grpc_chttp2_transport_global *transport_global = &args->t->global;
+ grpc_chttp2_transport_global *transport_global = &t->global;
for (ping = transport_global->pings.next; ping != &transport_global->pings;
ping = ping->next) {
- if (0 == memcmp(args->opaque_8bytes, ping->id, 8)) {
+ if (0 == memcmp(opaque_8bytes, ping->id, 8)) {
grpc_exec_ctx_sched(exec_ctx, ping->on_recv, GRPC_ERROR_NONE, NULL);
ping->next->prev = ping->prev;
ping->prev->next = ping->next;
@@ -1279,27 +1376,21 @@ static void ack_ping_locked(grpc_exec_ctx *exec_ctx, void *a,
break;
}
}
- UNREF_TRANSPORT(exec_ctx, args->t, "ack_ping");
- gpr_free(args);
}
void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_parsing *transport_parsing,
const uint8_t *opaque_8bytes) {
- ack_ping_args *args = gpr_malloc(sizeof(*args));
- args->t = TRANSPORT_FROM_PARSING(transport_parsing);
- memcpy(args->opaque_8bytes, opaque_8bytes, sizeof(args->opaque_8bytes));
- grpc_closure_init(&args->closure, ack_ping_locked, args);
- REF_TRANSPORT(args->t, "ack_ping");
- grpc_combiner_execute(exec_ctx, args->t->executor.combiner, &args->closure,
- GRPC_ERROR_NONE);
+ grpc_chttp2_run_with_global_lock(
+ exec_ctx, TRANSPORT_FROM_PARSING(transport_parsing), NULL,
+ ack_ping_locked, (void *)opaque_8bytes, 8);
}
static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
- void *stream_op,
- grpc_error *error_ignored) {
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s_unused,
+ void *stream_op) {
grpc_transport_op *op = stream_op;
- grpc_chttp2_transport *t = op->transport_private.args[0];
grpc_error *close_transport = op->disconnect_with_error;
/* If there's a set_accept_stream ensure that we're not parsing
@@ -1311,6 +1402,8 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
return;
}
+ grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL);
+
if (op->on_connectivity_state_change != NULL) {
grpc_connectivity_state_notify_on_state_change(
exec_ctx, &t->channel_callback.state_tracker, op->connectivity_state,
@@ -1336,11 +1429,11 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
}
if (op->bind_pollset) {
- grpc_endpoint_add_to_pollset(exec_ctx, t->ep, op->bind_pollset);
+ add_to_pollset_locked(exec_ctx, t, NULL, op->bind_pollset);
}
if (op->bind_pollset_set) {
- grpc_endpoint_add_to_pollset_set(exec_ctx, t->ep, op->bind_pollset_set);
+ add_to_pollset_set_locked(exec_ctx, t, NULL, op->bind_pollset_set);
}
if (op->send_ping) {
@@ -1350,21 +1443,13 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
if (close_transport != GRPC_ERROR_NONE) {
close_transport_locked(exec_ctx, t, close_transport);
}
-
- grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL);
-
- UNREF_TRANSPORT(exec_ctx, t, "transport_op");
}
static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_transport_op *op) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
- op->transport_private.args[0] = gt;
- grpc_closure_init(&op->transport_private.closure, perform_transport_op_locked,
- op);
- REF_TRANSPORT(t, "transport_op");
- grpc_combiner_execute(exec_ctx, t->executor.combiner,
- &op->transport_private.closure, GRPC_ERROR_NONE);
+ grpc_chttp2_run_with_global_lock(
+ exec_ctx, t, NULL, perform_transport_op_locked, op, sizeof(*op));
}
/*******************************************************************************
@@ -1373,7 +1458,6 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
static void check_read_ops(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global) {
- GPR_TIMER_BEGIN("check_read_ops", 0);
grpc_chttp2_stream_global *stream_global;
grpc_byte_stream *bs;
while (
@@ -1383,7 +1467,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
if (stream_global->seen_error) {
while ((bs = grpc_chttp2_incoming_frame_queue_pop(
&stream_global->incoming_frames)) != NULL) {
- incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
+ incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs);
}
if (stream_global->exceeded_metadata_size) {
cancel_from_api(
@@ -1406,7 +1490,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
stream_global->seen_error &&
(bs = grpc_chttp2_incoming_frame_queue_pop(
&stream_global->incoming_frames)) != NULL) {
- incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
+ incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs);
}
if (stream_global->incoming_frames.head != NULL) {
*stream_global->recv_message = grpc_chttp2_incoming_frame_queue_pop(
@@ -1427,7 +1511,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
if (stream_global->seen_error) {
while ((bs = grpc_chttp2_incoming_frame_queue_pop(
&stream_global->incoming_frames)) != NULL) {
- incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
+ incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs);
}
if (stream_global->exceeded_metadata_size) {
cancel_from_api(
@@ -1448,7 +1532,6 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
}
}
}
- GPR_TIMER_END("check_read_ops", 0);
}
static void decrement_active_streams_locked(
@@ -1456,8 +1539,7 @@ static void decrement_active_streams_locked(
grpc_chttp2_stream_global *stream_global) {
if ((stream_global->all_incoming_byte_streams_finished =
gpr_unref(&stream_global->active_streams))) {
- grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
- stream_global);
+ grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
}
@@ -1561,8 +1643,7 @@ static void cancel_from_api(grpc_exec_ctx *exec_ctx,
}
if (due_to_error != GRPC_ERROR_NONE && !stream_global->seen_error) {
stream_global->seen_error = true;
- grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
- stream_global);
+ grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, 1,
1, due_to_error);
@@ -1574,8 +1655,7 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx,
grpc_status_code status, gpr_slice *slice) {
if (status != GRPC_STATUS_OK) {
stream_global->seen_error = true;
- grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
- stream_global);
+ grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
/* stream_global->recv_trailing_metadata_finished gives us a
last chance replacement: we've received trailing metadata,
@@ -1599,8 +1679,7 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx,
grpc_mdstr_from_slice(gpr_slice_ref(*slice))));
}
stream_global->published_trailing_metadata = true;
- grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
- stream_global);
+ grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
if (slice) {
gpr_slice_unref(*slice);
@@ -1660,8 +1739,7 @@ void grpc_chttp2_mark_stream_closed(
GRPC_ERROR_UNREF(error);
return;
}
- grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
- stream_global);
+ grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
if (close_reads && !stream_global->read_closed) {
stream_global->read_closed_error = GRPC_ERROR_REF(error);
stream_global->read_closed = true;
@@ -1842,10 +1920,6 @@ static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_error *error) {
- if (!grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, NULL)) {
- error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
- GRPC_STATUS_UNAVAILABLE);
- }
close_transport_locked(exec_ctx, t, GRPC_ERROR_REF(error));
end_all_the_calls(exec_ctx, t, error);
}
@@ -1881,12 +1955,16 @@ static void update_global_window(void *args, uint32_t id, void *stream) {
* INPUT PROCESSING - PARSING
*/
+static void reading_action_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s_unused, void *arg);
static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
-static void post_reading_action_locked(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error);
-static void post_parse_locked(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error);
+static void post_reading_action_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s_unused, void *arg);
+static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s_unused, void *arg);
static void reading_action(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error) {
@@ -1894,20 +1972,16 @@ static void reading_action(grpc_exec_ctx *exec_ctx, void *tp,
reading_action_locked ->
(parse_unlocked -> post_parse_locked)? ->
post_reading_action_locked */
- GPR_TIMER_BEGIN("reading_action", 0);
- grpc_chttp2_transport *t = tp;
- grpc_combiner_execute(exec_ctx, t->executor.combiner,
- &t->reading_action_locked, GRPC_ERROR_REF(error));
- GPR_TIMER_END("reading_action", 0);
+ grpc_chttp2_run_with_global_lock(exec_ctx, tp, NULL, reading_action_locked,
+ GRPC_ERROR_REF(error), 0);
}
-static void reading_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
- grpc_error *error) {
- GPR_TIMER_BEGIN("reading_action_locked", 0);
-
- grpc_chttp2_transport *t = tp;
+static void reading_action_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s_unused, void *arg) {
grpc_chttp2_transport_global *transport_global = &t->global;
grpc_chttp2_transport_parsing *transport_parsing = &t->parsing;
+ grpc_error *error = arg;
GPR_ASSERT(!t->executor.parsing_active);
if (!t->closed) {
@@ -1916,13 +1990,10 @@ static void reading_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_chttp2_stream_map_move_into(&t->new_stream_map,
&t->parsing_stream_map);
grpc_chttp2_prepare_to_read(transport_global, transport_parsing);
- grpc_exec_ctx_sched(exec_ctx, &t->parsing_action, GRPC_ERROR_REF(error),
- NULL);
+ grpc_exec_ctx_sched(exec_ctx, &t->parsing_action, error, NULL);
} else {
- post_reading_action_locked(exec_ctx, t, error);
+ post_reading_action_locked(exec_ctx, t, s_unused, arg);
}
-
- GPR_TIMER_END("reading_action_locked", 0);
}
static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx,
@@ -1974,15 +2045,13 @@ static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg,
for (i = 0; i < GPR_ARRAY_SIZE(errors); i++) {
GRPC_ERROR_UNREF(errors[i]);
}
- grpc_combiner_execute(exec_ctx, t->executor.combiner, &t->post_parse_locked,
- err);
GPR_TIMER_END("reading_action.parse", 0);
+ grpc_chttp2_run_with_global_lock(exec_ctx, t, NULL, post_parse_locked, err,
+ 0);
}
-static void post_parse_locked(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- GPR_TIMER_BEGIN("post_parse_locked", 0);
- grpc_chttp2_transport *t = arg;
+static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s_unused, void *arg) {
grpc_chttp2_transport_global *transport_global = &t->global;
grpc_chttp2_transport_parsing *transport_parsing = &t->parsing;
/* copy parsing qbuf to global qbuf */
@@ -2008,7 +2077,7 @@ static void post_parse_locked(grpc_exec_ctx *exec_ctx, void *arg,
if (t->post_parsing_op) {
grpc_transport_op *op = t->post_parsing_op;
t->post_parsing_op = NULL;
- perform_transport_op_locked(exec_ctx, op, GRPC_ERROR_NONE);
+ perform_transport_op_locked(exec_ctx, t, NULL, op);
gpr_free(op);
}
/* if a stream is in the stream map, and gets cancelled, we need to
@@ -2025,32 +2094,39 @@ static void post_parse_locked(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2");
}
- post_reading_action_locked(exec_ctx, t, error);
- GPR_TIMER_END("post_parse_locked", 0);
+ post_reading_action_locked(exec_ctx, t, s_unused, arg);
}
-static void post_reading_action_locked(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- GPR_TIMER_BEGIN("post_reading_action_locked", 0);
- grpc_chttp2_transport *t = arg;
+static void post_reading_action_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s_unused,
+ void *arg) {
+ grpc_error *error = arg;
bool keep_reading = false;
- GRPC_ERROR_REF(error);
if (error == GRPC_ERROR_NONE && t->closed) {
error = GRPC_ERROR_CREATE("Transport closed");
}
if (error != GRPC_ERROR_NONE) {
+ if (!grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, NULL)) {
+ error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
+ GRPC_STATUS_UNAVAILABLE);
+ }
drop_connection(exec_ctx, t, GRPC_ERROR_REF(error));
t->endpoint_reading = 0;
if (grpc_http_write_state_trace) {
gpr_log(GPR_DEBUG, "R:%p -> 0 ws=%s", t,
write_state_name(t->executor.write_state));
}
+ if (t->executor.write_state == GRPC_CHTTP2_WRITING_INACTIVE && t->ep) {
+ destroy_endpoint(exec_ctx, t);
+ }
} else if (!t->closed) {
keep_reading = true;
REF_TRANSPORT(t, "keep_reading");
prevent_endpoint_shutdown(t);
}
gpr_slice_buffer_reset_and_unref(&t->read_buffer);
+ GRPC_ERROR_UNREF(error);
if (keep_reading) {
grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, &t->reading_action);
@@ -2059,9 +2135,6 @@ static void post_reading_action_locked(grpc_exec_ctx *exec_ctx, void *arg,
} else {
UNREF_TRANSPORT(exec_ctx, t, "reading_action");
}
- GRPC_ERROR_UNREF(error);
-
- GPR_TIMER_END("post_reading_action_locked", 0);
}
/*******************************************************************************
@@ -2083,16 +2156,36 @@ static void connectivity_state_set(
* POLLSET STUFF
*/
+static void add_to_pollset_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s_unused, void *pollset) {
+ if (t->ep) {
+ grpc_endpoint_add_to_pollset(exec_ctx, t->ep, pollset);
+ }
+}
+
+static void add_to_pollset_set_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s_unused,
+ void *pollset_set) {
+ if (t->ep) {
+ grpc_endpoint_add_to_pollset_set(exec_ctx, t->ep, pollset_set);
+ }
+}
+
static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_pollset *pollset) {
- grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
- grpc_endpoint_add_to_pollset(exec_ctx, t->ep, pollset);
+ /* TODO(ctiller): keep pollset alive */
+ grpc_chttp2_run_with_global_lock(exec_ctx, (grpc_chttp2_transport *)gt,
+ (grpc_chttp2_stream *)gs,
+ add_to_pollset_locked, pollset, 0);
}
static void set_pollset_set(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_pollset_set *pollset_set) {
- grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
- grpc_endpoint_add_to_pollset_set(exec_ctx, t->ep, pollset_set);
+ grpc_chttp2_run_with_global_lock(exec_ctx, (grpc_chttp2_transport *)gt,
+ (grpc_chttp2_stream *)gs,
+ add_to_pollset_set_locked, pollset_set, 0);
}
/*******************************************************************************
@@ -2104,7 +2197,6 @@ static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx,
if (gpr_unref(&bs->refs)) {
GRPC_ERROR_UNREF(bs->error);
gpr_slice_buffer_destroy(&bs->slices);
- gpr_mu_destroy(&bs->slice_mu);
gpr_free(bs);
}
}
@@ -2150,34 +2242,38 @@ static void incoming_byte_stream_update_flow_control(
}
}
+typedef struct {
+ grpc_chttp2_incoming_byte_stream *byte_stream;
+ gpr_slice *slice;
+ size_t max_size_hint;
+ grpc_closure *on_complete;
+} incoming_byte_stream_next_arg;
+
static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx,
- void *argp,
- grpc_error *error_ignored) {
- grpc_chttp2_incoming_byte_stream *bs = argp;
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s,
+ void *argp) {
+ incoming_byte_stream_next_arg *arg = argp;
+ grpc_chttp2_incoming_byte_stream *bs =
+ (grpc_chttp2_incoming_byte_stream *)arg->byte_stream;
grpc_chttp2_transport_global *transport_global = &bs->transport->global;
grpc_chttp2_stream_global *stream_global = &bs->stream->global;
if (bs->is_tail) {
- gpr_mu_lock(&bs->slice_mu);
- size_t cur_length = bs->slices.length;
- gpr_mu_unlock(&bs->slice_mu);
- incoming_byte_stream_update_flow_control(
- exec_ctx, transport_global, stream_global,
- bs->next_action.max_size_hint, cur_length);
- }
- gpr_mu_lock(&bs->slice_mu);
+ incoming_byte_stream_update_flow_control(exec_ctx, transport_global,
+ stream_global, arg->max_size_hint,
+ bs->slices.length);
+ }
if (bs->slices.count > 0) {
- *bs->next_action.slice = gpr_slice_buffer_take_first(&bs->slices);
- grpc_exec_ctx_sched(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE,
- NULL);
+ *arg->slice = gpr_slice_buffer_take_first(&bs->slices);
+ grpc_exec_ctx_sched(exec_ctx, arg->on_complete, GRPC_ERROR_NONE, NULL);
} else if (bs->error != GRPC_ERROR_NONE) {
- grpc_exec_ctx_sched(exec_ctx, bs->next_action.on_complete,
- GRPC_ERROR_REF(bs->error), NULL);
+ grpc_exec_ctx_sched(exec_ctx, arg->on_complete, GRPC_ERROR_REF(bs->error),
+ NULL);
} else {
- bs->on_next = bs->next_action.on_complete;
- bs->next = bs->next_action.slice;
+ bs->on_next = arg->on_complete;
+ bs->next = arg->slice;
}
- gpr_mu_unlock(&bs->slice_mu);
incoming_byte_stream_unref(exec_ctx, bs);
}
@@ -2185,18 +2281,13 @@ static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream,
gpr_slice *slice, size_t max_size_hint,
grpc_closure *on_complete) {
- GPR_TIMER_BEGIN("incoming_byte_stream_next", 0);
grpc_chttp2_incoming_byte_stream *bs =
(grpc_chttp2_incoming_byte_stream *)byte_stream;
+ incoming_byte_stream_next_arg arg = {bs, slice, max_size_hint, on_complete};
gpr_ref(&bs->refs);
- bs->next_action.slice = slice;
- bs->next_action.max_size_hint = max_size_hint;
- bs->next_action.on_complete = on_complete;
- grpc_closure_init(&bs->next_action.closure, incoming_byte_stream_next_locked,
- bs);
- grpc_combiner_execute(exec_ctx, bs->transport->executor.combiner,
- &bs->next_action.closure, GRPC_ERROR_NONE);
- GPR_TIMER_END("incoming_byte_stream_next", 0);
+ grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream,
+ incoming_byte_stream_next_locked, &arg,
+ sizeof(arg));
return 0;
}
@@ -2204,8 +2295,9 @@ static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream);
static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
- void *byte_stream,
- grpc_error *error_ignored) {
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s,
+ void *byte_stream) {
grpc_chttp2_incoming_byte_stream *bs = byte_stream;
GPR_ASSERT(bs->base.destroy == incoming_byte_stream_destroy);
decrement_active_streams_locked(exec_ctx, &bs->transport->global,
@@ -2215,14 +2307,10 @@ static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream) {
- GPR_TIMER_BEGIN("incoming_byte_stream_destroy", 0);
grpc_chttp2_incoming_byte_stream *bs =
(grpc_chttp2_incoming_byte_stream *)byte_stream;
- grpc_closure_init(&bs->destroy_action, incoming_byte_stream_destroy_locked,
- bs);
- grpc_combiner_execute(exec_ctx, bs->transport->executor.combiner,
- &bs->destroy_action, GRPC_ERROR_NONE);
- GPR_TIMER_END("incoming_byte_stream_destroy", 0);
+ grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream,
+ incoming_byte_stream_destroy_locked, bs, 0);
}
typedef struct {
@@ -2230,45 +2318,90 @@ typedef struct {
gpr_slice slice;
} incoming_byte_stream_push_arg;
-void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_incoming_byte_stream *bs,
- gpr_slice slice) {
- gpr_mu_lock(&bs->slice_mu);
+static void incoming_byte_stream_push_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s,
+ void *argp) {
+ incoming_byte_stream_push_arg *arg = argp;
+ grpc_chttp2_incoming_byte_stream *bs = arg->byte_stream;
if (bs->on_next != NULL) {
- *bs->next = slice;
+ *bs->next = arg->slice;
grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_NONE, NULL);
bs->on_next = NULL;
} else {
- gpr_slice_buffer_add(&bs->slices, slice);
+ gpr_slice_buffer_add(&bs->slices, arg->slice);
}
- gpr_mu_unlock(&bs->slice_mu);
+ incoming_byte_stream_unref(exec_ctx, bs);
}
-static void incoming_byte_stream_finished_locked(grpc_exec_ctx *exec_ctx,
- void *bsp, grpc_error *error) {
- grpc_chttp2_incoming_byte_stream *bs = bsp;
- if (error != GRPC_ERROR_NONE) {
- grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_REF(error), NULL);
- bs->on_next = NULL;
- GRPC_ERROR_UNREF(bs->error);
- bs->error = error;
- }
+void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_incoming_byte_stream *bs,
+ gpr_slice slice) {
+ incoming_byte_stream_push_arg arg = {bs, slice};
+ gpr_ref(&bs->refs);
+ grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream,
+ incoming_byte_stream_push_locked, &arg,
+ sizeof(arg));
+}
+
+typedef struct {
+ grpc_chttp2_incoming_byte_stream *bs;
+ grpc_error *error;
+} bs_fail_args;
+
+static bs_fail_args *make_bs_fail_args(grpc_chttp2_incoming_byte_stream *bs,
+ grpc_error *error) {
+ bs_fail_args *a = gpr_malloc(sizeof(*a));
+ a->bs = bs;
+ a->error = error;
+ return a;
+}
+
+static void incoming_byte_stream_finished_failed_locked(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s,
+ void *argp) {
+ bs_fail_args *a = argp;
+ grpc_chttp2_incoming_byte_stream *bs = a->bs;
+ grpc_error *error = a->error;
+ gpr_free(a);
+ grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_REF(error), NULL);
+ bs->on_next = NULL;
+ GRPC_ERROR_UNREF(bs->error);
+ bs->error = error;
+ incoming_byte_stream_unref(exec_ctx, bs);
+}
+
+static void incoming_byte_stream_finished_ok_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s,
+ void *argp) {
+ grpc_chttp2_incoming_byte_stream *bs = argp;
incoming_byte_stream_unref(exec_ctx, bs);
}
void grpc_chttp2_incoming_byte_stream_finished(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
grpc_error *error, int from_parsing_thread) {
- GPR_TIMER_BEGIN("grpc_chttp2_incoming_byte_stream_finished", 0);
if (from_parsing_thread) {
- grpc_closure_init(&bs->finished_action,
- incoming_byte_stream_finished_locked, bs);
- grpc_combiner_execute(exec_ctx, bs->transport->executor.combiner,
- &bs->finished_action, GRPC_ERROR_REF(error));
+ if (error == GRPC_ERROR_NONE) {
+ grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream,
+ incoming_byte_stream_finished_ok_locked,
+ bs, 0);
+ } else {
+ grpc_chttp2_run_with_global_lock(
+ exec_ctx, bs->transport, bs->stream,
+ incoming_byte_stream_finished_failed_locked,
+ make_bs_fail_args(bs, error), 0);
+ }
} else {
- incoming_byte_stream_finished_locked(exec_ctx, bs, error);
+ if (error == GRPC_ERROR_NONE) {
+ incoming_byte_stream_finished_ok_locked(exec_ctx, bs->transport,
+ bs->stream, bs);
+ } else {
+ incoming_byte_stream_finished_failed_locked(
+ exec_ctx, bs->transport, bs->stream, make_bs_fail_args(bs, error));
+ }
}
- GPR_TIMER_END("grpc_chttp2_incoming_byte_stream_finished", 0);
}
grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
@@ -2281,7 +2414,6 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
incoming_byte_stream->base.flags = flags;
incoming_byte_stream->base.next = incoming_byte_stream_next;
incoming_byte_stream->base.destroy = incoming_byte_stream_destroy;
- gpr_mu_init(&incoming_byte_stream->slice_mu);
gpr_ref_init(&incoming_byte_stream->refs, 2);
incoming_byte_stream->next_message = NULL;
incoming_byte_stream->transport = TRANSPORT_FROM_PARSING(transport_parsing);
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 04b788b702..d67c014e54 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -48,7 +48,6 @@
#include "src/core/ext/transport/chttp2/transport/hpack_parser.h"
#include "src/core/ext/transport/chttp2/transport/incoming_metadata.h"
#include "src/core/ext/transport/chttp2/transport/stream_map.h"
-#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/transport_impl.h"
@@ -162,20 +161,9 @@ struct grpc_chttp2_incoming_byte_stream {
grpc_chttp2_transport *transport;
grpc_chttp2_stream *stream;
int is_tail;
-
- gpr_mu slice_mu; // protects slices, on_next
gpr_slice_buffer slices;
grpc_closure *on_next;
gpr_slice *next;
-
- struct {
- grpc_closure closure;
- gpr_slice *slice;
- size_t max_size_hint;
- grpc_closure *on_complete;
- } next_action;
- grpc_closure destroy_action;
- grpc_closure finished_action;
};
typedef struct {
@@ -308,11 +296,23 @@ struct grpc_chttp2_transport_parsing {
int64_t outgoing_window;
};
+typedef void (*grpc_chttp2_locked_action)(grpc_exec_ctx *ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s, void *arg);
+
+typedef struct grpc_chttp2_executor_action_header {
+ grpc_chttp2_stream *stream;
+ grpc_chttp2_locked_action action;
+ struct grpc_chttp2_executor_action_header *next;
+ void *arg;
+} grpc_chttp2_executor_action_header;
+
typedef enum {
- /** no writing activity allowed */
- GRPC_CHTTP2_WRITES_CORKED,
/** no writing activity */
GRPC_CHTTP2_WRITING_INACTIVE,
+ /** write has been requested, but not scheduled yet */
+ GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER,
+ GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER,
/** write has been requested and scheduled against the workqueue */
GRPC_CHTTP2_WRITE_SCHEDULED,
/** write has been initiated after being reaped from the workqueue */
@@ -333,7 +333,7 @@ struct grpc_chttp2_transport {
gpr_refcount shutdown_ep_refs;
struct {
- grpc_combiner *combiner;
+ gpr_mu mu;
/** is a thread currently in the global lock */
bool global_active;
@@ -341,8 +341,9 @@ struct grpc_chttp2_transport {
bool parsing_active;
/** write execution state of the transport */
grpc_chttp2_write_state write_state;
- /** has a check_read_ops been scheduled */
- bool check_read_ops_scheduled;
+
+ grpc_chttp2_executor_action_header *pending_actions_head;
+ grpc_chttp2_executor_action_header *pending_actions_tail;
} executor;
/** is the transport destroying itself? */
@@ -379,16 +380,10 @@ struct grpc_chttp2_transport {
grpc_closure writing_action;
/** closure to start reading from the endpoint */
grpc_closure reading_action;
- grpc_closure reading_action_locked;
- grpc_closure post_parse_locked;
/** closure to actually do parsing */
grpc_closure parsing_action;
/** closure to initiate writing */
grpc_closure initiate_writing;
- /** closure to finish writing */
- grpc_closure terminate_writing;
- /** closure to flush read state up the stack */
- grpc_closure initiate_read_flush_locked;
/** incoming read bytes */
gpr_slice_buffer read_buffer;
@@ -532,16 +527,11 @@ struct grpc_chttp2_stream_parsing {
};
struct grpc_chttp2_stream {
- grpc_chttp2_transport *t;
grpc_stream_refcount *refcount;
grpc_chttp2_stream_global global;
grpc_chttp2_stream_writing writing;
grpc_chttp2_stream_parsing parsing;
- grpc_closure init_stream;
- grpc_closure destroy_stream;
- void *destroy_stream_arg;
-
grpc_chttp2_stream_link links[STREAM_LIST_COUNT];
uint8_t included[STREAM_LIST_COUNT];
};
@@ -636,7 +626,7 @@ int grpc_chttp2_list_pop_waiting_for_concurrency(
grpc_chttp2_stream_global **stream_global);
void grpc_chttp2_list_add_check_read_ops(
- grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);
bool grpc_chttp2_list_remove_check_read_ops(
grpc_chttp2_transport_global *transport_global,
@@ -716,6 +706,12 @@ void grpc_chttp2_complete_closure_step(
grpc_chttp2_stream_global *stream_global, grpc_closure **pclosure,
grpc_error *error);
+void grpc_chttp2_run_with_global_lock(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *transport,
+ grpc_chttp2_stream *optional_stream,
+ grpc_chttp2_locked_action action,
+ void *arg, size_t sizeof_arg);
+
#define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
#define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \
(sizeof(GRPC_CHTTP2_CLIENT_CONNECT_STRING) - 1)
diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c
index 0e6d579ba9..482cd55c44 100644
--- a/src/core/ext/transport/chttp2/transport/parsing.c
+++ b/src/core/ext/transport/chttp2/transport/parsing.c
@@ -177,8 +177,7 @@ void grpc_chttp2_publish_reads(
stream_global->seen_error = true;
stream_global->exceeded_metadata_size =
stream_parsing->exceeded_metadata_size;
- grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
- stream_global);
+ grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
/* flush stats to global stream state */
@@ -204,8 +203,7 @@ void grpc_chttp2_publish_reads(
stream_global->incoming_frames.tail->is_tail = 0;
}
if (stream_parsing->data_parser.incoming_frames.head != NULL) {
- grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
- stream_global);
+ grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
grpc_chttp2_incoming_frame_queue_merge(
&stream_global->incoming_frames,
@@ -221,8 +219,7 @@ void grpc_chttp2_publish_reads(
GPR_SWAP(grpc_chttp2_incoming_metadata_buffer,
stream_parsing->metadata_buffer[0],
stream_global->received_initial_metadata);
- grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
- stream_global);
+ grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
if (!stream_global->published_trailing_metadata &&
stream_parsing->got_metadata_on_parse[1]) {
@@ -231,8 +228,7 @@ void grpc_chttp2_publish_reads(
GPR_SWAP(grpc_chttp2_incoming_metadata_buffer,
stream_parsing->metadata_buffer[1],
stream_global->received_trailing_metadata);
- grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
- stream_global);
+ grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
if (stream_parsing->forced_close_error != GRPC_ERROR_NONE) {
diff --git a/src/core/ext/transport/chttp2/transport/stream_lists.c b/src/core/ext/transport/chttp2/transport/stream_lists.c
index 4dc4968248..2eb5f5f632 100644
--- a/src/core/ext/transport/chttp2/transport/stream_lists.c
+++ b/src/core/ext/transport/chttp2/transport/stream_lists.c
@@ -298,15 +298,8 @@ int grpc_chttp2_list_pop_waiting_for_concurrency(
}
void grpc_chttp2_list_add_check_read_ops(
- grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
- grpc_chttp2_transport *t = TRANSPORT_FROM_GLOBAL(transport_global);
- if (!t->executor.check_read_ops_scheduled) {
- grpc_combiner_execute_finally(exec_ctx, t->executor.combiner,
- &t->initiate_read_flush_locked,
- GRPC_ERROR_NONE, false);
- t->executor.check_read_ops_scheduled = true;
- }
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global),
GRPC_CHTTP2_LIST_CHECK_READ_OPS);
diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c
index 979515bd54..311b26e354 100644
--- a/src/core/ext/transport/chttp2/transport/writing.c
+++ b/src/core/ext/transport/chttp2/transport/writing.c
@@ -55,6 +55,15 @@ int grpc_chttp2_unlocking_check_writes(
transport_global->settings[GRPC_ACKED_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE];
+ /* simple writes are queued to qbuf, and flushed here */
+ gpr_slice_buffer_swap(&transport_global->qbuf, &transport_writing->outbuf);
+ GPR_ASSERT(transport_global->qbuf.count == 0);
+
+ grpc_chttp2_hpack_compressor_set_max_table_size(
+ &transport_writing->hpack_compressor,
+ transport_global->settings[GRPC_PEER_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]);
+
if (transport_global->dirtied_local_settings &&
!transport_global->sent_local_settings) {
gpr_slice_buffer_add(
@@ -68,16 +77,6 @@ int grpc_chttp2_unlocking_check_writes(
transport_global->sent_local_settings = 1;
}
- /* simple writes are queued to qbuf, and flushed here */
- gpr_slice_buffer_move_into(&transport_global->qbuf,
- &transport_writing->outbuf);
- GPR_ASSERT(transport_global->qbuf.count == 0);
-
- grpc_chttp2_hpack_compressor_set_max_table_size(
- &transport_writing->hpack_compressor,
- transport_global->settings[GRPC_PEER_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]);
-
GRPC_CHTTP2_FLOW_MOVE_TRANSPORT("write", transport_writing, outgoing_window,
transport_global, outgoing_window);
if (transport_writing->outgoing_window > 0) {
@@ -345,7 +344,6 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx,
void grpc_chttp2_cleanup_writing(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_transport_writing *transport_writing) {
- GPR_TIMER_BEGIN("grpc_chttp2_cleanup_writing", 0);
grpc_chttp2_stream_writing *stream_writing;
grpc_chttp2_stream_global *stream_global;
@@ -384,5 +382,4 @@ void grpc_chttp2_cleanup_writing(
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2_writing");
}
gpr_slice_buffer_reset_and_unref(&transport_writing->outbuf);
- GPR_TIMER_END("grpc_chttp2_cleanup_writing", 0);
}