aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-07-11 12:43:23 -0700
committerGravatar Craig Tiller <ctiller@google.com>2016-07-11 12:43:23 -0700
commit93023e410b0ab351a9a4bb6312327f18d5fa4b68 (patch)
tree9e7b29cc078431630b1119e959016dfe263401e0 /src/core
parent4e43685e511a1a3c19ac4060a8ee9f7894dea94d (diff)
Progress to converting chttp2 to combiner locks
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c612
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h46
-rw-r--r--src/core/lib/iomgr/combiner.c4
-rw-r--r--src/core/lib/iomgr/combiner.h1
-rw-r--r--src/core/lib/transport/transport.h17
5 files changed, 245 insertions, 435 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 41506094de..a3020236cf 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -31,6 +31,9 @@
*
*/
+// TODO(ctiller): schedule check_read_ops whenever something is added to that
+// list
+
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include <math.h>
@@ -89,8 +92,14 @@ static const grpc_transport_vtable vtable;
static void writing_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error);
static void reading_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error);
static void parsing_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error);
-static void initiate_writing(grpc_exec_ctx *exec_ctx, void *t,
- grpc_error *error);
+static void reading_action_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error);
+static void post_parse_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error);
+static void initiate_writing_locked(grpc_exec_ctx *exec_ctx, void *t,
+ grpc_error *error);
+static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, void *t,
+ grpc_error *error);
static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t);
static void end_waiting_for_write(grpc_exec_ctx *exec_ctx,
@@ -104,11 +113,6 @@ static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_error *error);
-/** Perform a transport_op */
-static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s, void *transport_op);
-
/** Cancel a stream: coming from the transport API */
static void cancel_from_api(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global,
@@ -120,22 +124,10 @@ static void close_from_api(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream_global *stream_global,
grpc_error *error);
-/** Add endpoint from this transport to pollset */
-static void add_to_pollset_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s_ignored, void *pollset);
-static void add_to_pollset_set_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s_ignored,
- void *pollset_set);
-
/** Start new streams that have been created if we can */
static void maybe_start_some_streams(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global);
-static void finish_global_actions(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t);
-
static void connectivity_state_set(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
grpc_connectivity_state state, grpc_error *error, const char *reason);
@@ -148,9 +140,8 @@ static void incoming_byte_stream_update_flow_control(
grpc_chttp2_stream_global *stream_global, size_t max_size_hint,
size_t have_already);
static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s,
- void *byte_stream);
+ void *byte_stream,
+ grpc_error *error_ignored);
static void fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global,
@@ -164,9 +155,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) {
size_t i;
- gpr_mu_lock(&t->executor.mu);
-
- GPR_ASSERT(t->ep == NULL);
+ grpc_endpoint_destroy(exec_ctx, t->ep);
gpr_slice_buffer_destroy(&t->global.qbuf);
@@ -190,8 +179,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream_map_destroy(&t->new_stream_map);
grpc_connectivity_state_destroy(exec_ctx, &t->channel_callback.state_tracker);
- gpr_mu_unlock(&t->executor.mu);
- gpr_mu_destroy(&t->executor.mu);
+ grpc_combiner_destroy(t->executor.combiner);
/* callback remaining pings: they're not allowed to call into the transpot,
and maybe they hold resources that need to be freed */
@@ -250,11 +238,11 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->base.vtable = &vtable;
t->ep = ep;
- /* one ref is for destroy, the other for when ep becomes NULL */
- gpr_ref_init(&t->refs, 2);
+ /* one ref is for destroy */
+ gpr_ref_init(&t->refs, 1);
/* ref is dropped at transport close() */
gpr_ref_init(&t->shutdown_ep_refs, 1);
- gpr_mu_init(&t->executor.mu);
+ t->executor.combiner = grpc_combiner_create(grpc_endpoint_get_workqueue(ep));
t->peer_string = grpc_endpoint_get_peer(ep);
t->endpoint_reading = 1;
t->global.next_stream_id = is_client ? 1 : 2;
@@ -280,15 +268,18 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_hpack_compressor_init(&t->writing.hpack_compressor);
grpc_closure_init(&t->writing_action, writing_action, t);
grpc_closure_init(&t->reading_action, reading_action, t);
+ grpc_closure_init(&t->reading_action_locked, reading_action_locked, t);
grpc_closure_init(&t->parsing_action, parsing_action, t);
- grpc_closure_init(&t->initiate_writing, initiate_writing, t);
+ grpc_closure_init(&t->post_parse_locked, post_parse_locked, t);
+ grpc_closure_init(&t->initiate_writing, initiate_writing_locked, t);
+ grpc_closure_init(&t->terminate_writing, terminate_writing_with_lock, t);
+ grpc_closure_init(&t->writing.done_cb, grpc_chttp2_terminate_writing,
+ &t->writing);
gpr_slice_buffer_init(&t->parsing.qbuf);
grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser);
grpc_chttp2_hpack_parser_init(&t->parsing.hpack_parser);
- grpc_closure_init(&t->writing.done_cb, grpc_chttp2_terminate_writing,
- &t->writing);
gpr_slice_buffer_init(&t->read_buffer);
if (is_client) {
@@ -412,45 +403,34 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
}
-static void destroy_transport_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s_ignored,
- void *arg_ignored) {
+static void destroy_transport_locked(grpc_exec_ctx *exec_ctx, void *tp,
+ grpc_error *error) {
+ grpc_chttp2_transport *t = tp;
t->destroying = 1;
drop_connection(exec_ctx, t, GRPC_ERROR_CREATE("Transport destroyed"));
}
static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
- grpc_chttp2_run_with_global_lock(exec_ctx, t, NULL, destroy_transport_locked,
- NULL, 0);
+ grpc_combiner_execute(exec_ctx, t->executor.combiner,
+ grpc_closure_create(destroy_transport_locked, t),
+ GRPC_ERROR_NONE);
UNREF_TRANSPORT(exec_ctx, t, "destroy");
}
/** block grpc_endpoint_shutdown being called until a paired
allow_endpoint_shutdown is made */
static void prevent_endpoint_shutdown(grpc_chttp2_transport *t) {
- GPR_ASSERT(t->ep);
gpr_ref(&t->shutdown_ep_refs);
}
static void allow_endpoint_shutdown_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) {
if (gpr_unref(&t->shutdown_ep_refs)) {
- if (t->ep) {
- grpc_endpoint_shutdown(exec_ctx, t->ep);
- }
+ grpc_endpoint_shutdown(exec_ctx, t->ep);
}
}
-static void destroy_endpoint(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t) {
- grpc_endpoint_destroy(exec_ctx, t->ep);
- t->ep = NULL;
- /* safe because we'll still have the ref for write */
- UNREF_TRANSPORT(exec_ctx, t, "disconnect");
-}
-
static void close_transport_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_error *error) {
@@ -461,9 +441,7 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
t->closed = 1;
connectivity_state_set(exec_ctx, &t->global, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "close_transport");
- if (t->ep) {
- allow_endpoint_shutdown_locked(exec_ctx, t);
- }
+ allow_endpoint_shutdown_locked(exec_ctx, t);
/* flush writable stream list to avoid dangling references */
grpc_chttp2_stream_global *stream_global;
@@ -497,11 +475,10 @@ void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx,
}
#endif
-static void finish_init_stream_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s,
- void *arg_ignored) {
- grpc_chttp2_register_stream(t, s);
+static void finish_init_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
+ grpc_error *error) {
+ grpc_chttp2_stream *s = sp;
+ grpc_chttp2_register_stream(s->t, s);
}
static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
@@ -509,6 +486,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
const void *server_data) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
+ s->t = t;
memset(s, 0, sizeof(*s));
@@ -545,16 +523,18 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
s->global.in_stream_map = true;
}
- grpc_chttp2_run_with_global_lock(exec_ctx, t, s, finish_init_stream_locked,
- NULL, 0);
+ grpc_closure_init(&s->init_stream, finish_init_stream_locked, s);
+ grpc_combiner_execute(exec_ctx, t->executor.combiner, &s->init_stream,
+ GRPC_ERROR_NONE);
return 0;
}
-static void destroy_stream_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s, void *arg) {
+static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
+ grpc_error *error) {
grpc_byte_stream *bs;
+ grpc_chttp2_stream *s = sp;
+ grpc_chttp2_transport *t = s->t;
GPR_TIMER_BEGIN("destroy_stream", 0);
@@ -573,7 +553,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx,
while (
(bs = grpc_chttp2_incoming_frame_queue_pop(&s->global.incoming_frames))) {
- incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs);
+ incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
}
grpc_chttp2_list_remove_unannounced_incoming_window_available(&t->global,
@@ -610,7 +590,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx,
GPR_TIMER_END("destroy_stream", 0);
- gpr_free(arg);
+ gpr_free(s->destroy_stream_arg);
}
static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
@@ -618,8 +598,10 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
- grpc_chttp2_run_with_global_lock(exec_ctx, t, s, destroy_stream_locked,
- and_free_memory, 0);
+ s->destroy_stream_arg = and_free_memory;
+ grpc_closure_init(&s->destroy_stream, destroy_stream_locked, s);
+ grpc_combiner_execute(exec_ctx, t->executor.combiner, &s->destroy_stream,
+ GRPC_ERROR_NONE);
}
grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream(
@@ -652,10 +634,6 @@ static const char *write_state_name(grpc_chttp2_write_state state) {
switch (state) {
case GRPC_CHTTP2_WRITING_INACTIVE:
return "INACTIVE";
- case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER:
- return "REQUESTED[p=0]";
- case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER:
- return "REQUESTED[p=1]";
case GRPC_CHTTP2_WRITE_SCHEDULED:
return "SCHEDULED";
case GRPC_CHTTP2_WRITING:
@@ -678,119 +656,17 @@ static void set_write_state(grpc_chttp2_transport *t,
t->executor.write_state = state;
}
-static void finish_global_actions(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t) {
- grpc_chttp2_executor_action_header *hdr;
- grpc_chttp2_executor_action_header *next;
-
- GPR_TIMER_BEGIN("finish_global_actions", 0);
-
- for (;;) {
- check_read_ops(exec_ctx, &t->global);
-
- gpr_mu_lock(&t->executor.mu);
- if (t->executor.pending_actions_head != NULL) {
- hdr = t->executor.pending_actions_head;
- t->executor.pending_actions_head = t->executor.pending_actions_tail =
- NULL;
- gpr_mu_unlock(&t->executor.mu);
- while (hdr != NULL) {
- GPR_TIMER_BEGIN("chttp2:locked_action", 0);
- hdr->action(exec_ctx, t, hdr->stream, hdr->arg);
- GPR_TIMER_END("chttp2:locked_action", 0);
- next = hdr->next;
- gpr_free(hdr);
- UNREF_TRANSPORT(exec_ctx, t, "pending_action");
- hdr = next;
- }
- continue;
- } else {
- t->executor.global_active = false;
- switch (t->executor.write_state) {
- case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER:
- set_write_state(t, GRPC_CHTTP2_WRITE_SCHEDULED, "unlocking");
- REF_TRANSPORT(t, "initiate_writing");
- gpr_mu_unlock(&t->executor.mu);
- grpc_exec_ctx_sched(exec_ctx, &t->initiate_writing, GRPC_ERROR_NONE,
- grpc_endpoint_get_workqueue(t->ep));
- break;
- case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER:
- start_writing(exec_ctx, t);
- gpr_mu_unlock(&t->executor.mu);
- break;
- case GRPC_CHTTP2_WRITING_INACTIVE:
- case GRPC_CHTTP2_WRITING:
- case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER:
- case GRPC_CHTTP2_WRITING_STALE_NO_POLLER:
- case GRPC_CHTTP2_WRITE_SCHEDULED:
- gpr_mu_unlock(&t->executor.mu);
- break;
- }
- }
- break;
- }
-
- GPR_TIMER_END("finish_global_actions", 0);
+static void initiate_writing_locked(grpc_exec_ctx *exec_ctx, void *tp,
+ grpc_error *error) {
+ grpc_chttp2_transport *t = tp;
+ GPR_ASSERT(t->executor.write_state == GRPC_CHTTP2_WRITE_SCHEDULED);
+ start_writing(exec_ctx, t);
}
-void grpc_chttp2_run_with_global_lock(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *optional_stream,
- grpc_chttp2_locked_action action,
- void *arg, size_t sizeof_arg) {
- grpc_chttp2_executor_action_header *hdr;
-
- GPR_TIMER_BEGIN("grpc_chttp2_run_with_global_lock", 0);
-
- REF_TRANSPORT(t, "run_global");
- gpr_mu_lock(&t->executor.mu);
-
- for (;;) {
- if (!t->executor.global_active) {
- t->executor.global_active = 1;
- gpr_mu_unlock(&t->executor.mu);
-
- GPR_TIMER_BEGIN("chttp2:locked_action", 0);
- action(exec_ctx, t, optional_stream, arg);
- GPR_TIMER_END("chttp2:locked_action", 0);
-
- finish_global_actions(exec_ctx, t);
- } else {
- gpr_mu_unlock(&t->executor.mu);
-
- hdr = gpr_malloc(sizeof(*hdr) + sizeof_arg);
- hdr->stream = optional_stream;
- hdr->action = action;
- if (sizeof_arg == 0) {
- hdr->arg = arg;
- } else {
- hdr->arg = hdr + 1;
- memcpy(hdr->arg, arg, sizeof_arg);
- }
-
- gpr_mu_lock(&t->executor.mu);
- if (!t->executor.global_active) {
- /* global lock was released while allocating memory: release & retry */
- gpr_free(hdr);
- continue;
- }
- hdr->next = NULL;
- if (t->executor.pending_actions_head != NULL) {
- t->executor.pending_actions_tail =
- t->executor.pending_actions_tail->next = hdr;
- } else {
- t->executor.pending_actions_tail = t->executor.pending_actions_head =
- hdr;
- }
- REF_TRANSPORT(t, "pending_action");
- gpr_mu_unlock(&t->executor.mu);
- }
- break;
- }
-
- UNREF_TRANSPORT(exec_ctx, t, "run_global");
-
- GPR_TIMER_END("grpc_chttp2_run_with_global_lock", 0);
+static void initiate_read_flush_locked(grpc_exec_ctx *exec_ctx, void *tp,
+ grpc_error *error) {
+ grpc_chttp2_transport *t = tp;
+ check_read_ops(exec_ctx, &t->global);
}
/*******************************************************************************
@@ -803,23 +679,17 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t = TRANSPORT_FROM_GLOBAL(transport_global);
switch (t->executor.write_state) {
case GRPC_CHTTP2_WRITING_INACTIVE:
- set_write_state(t, covered_by_poller
- ? GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER
- : GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER,
- reason);
- break;
- case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER:
- /* nothing to do: write already requested */
+ set_write_state(t, GRPC_CHTTP2_WRITE_SCHEDULED, reason);
+ grpc_combiner_execute_finally(exec_ctx, t->executor.combiner,
+ &t->initiate_writing, GRPC_ERROR_NONE,
+ covered_by_poller);
break;
- case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER:
+ case GRPC_CHTTP2_WRITE_SCHEDULED:
if (covered_by_poller) {
/* upgrade to note poller is available to cover the write */
- set_write_state(t, GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER, reason);
+ grpc_combiner_force_async_finally(t->executor.combiner);
}
break;
- case GRPC_CHTTP2_WRITE_SCHEDULED:
- /* nothing to do: write already scheduled */
- break;
case GRPC_CHTTP2_WRITING:
set_write_state(t,
covered_by_poller ? GRPC_CHTTP2_WRITING_STALE_WITH_POLLER
@@ -839,8 +709,7 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
}
static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
- GPR_ASSERT(t->executor.write_state == GRPC_CHTTP2_WRITE_SCHEDULED ||
- t->executor.write_state == GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER);
+ GPR_ASSERT(t->executor.write_state == GRPC_CHTTP2_WRITE_SCHEDULED);
if (!t->closed &&
grpc_chttp2_unlocking_check_writes(exec_ctx, &t->global, &t->writing)) {
set_write_state(t, GRPC_CHTTP2_WRITING, "start_writing");
@@ -856,26 +725,9 @@ static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
"start_writing:nothing_to_write");
}
end_waiting_for_write(exec_ctx, t, GRPC_ERROR_CREATE("Nothing to write"));
- if (t->ep && !t->endpoint_reading) {
- destroy_endpoint(exec_ctx, t);
- }
}
}
-static void initiate_writing_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s_unused,
- void *arg_ignored) {
- start_writing(exec_ctx, t);
- UNREF_TRANSPORT(exec_ctx, t, "initiate_writing");
-}
-
-static void initiate_writing(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- grpc_chttp2_run_with_global_lock(exec_ctx, arg, NULL, initiate_writing_locked,
- NULL, 0);
-}
-
void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global,
@@ -916,12 +768,9 @@ static void end_waiting_for_write(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_UNREF(error);
}
-static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s_ignored,
- void *a) {
- grpc_error *error = a;
-
+static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, void *tp,
+ grpc_error *error) {
+ grpc_chttp2_transport *t = tp;
allow_endpoint_shutdown_locked(exec_ctx, t);
if (error != GRPC_ERROR_NONE) {
@@ -930,39 +779,37 @@ static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx,
grpc_chttp2_cleanup_writing(exec_ctx, &t->global, &t->writing);
- end_waiting_for_write(exec_ctx, t, error);
+ end_waiting_for_write(exec_ctx, t, GRPC_ERROR_REF(error));
switch (t->executor.write_state) {
case GRPC_CHTTP2_WRITING_INACTIVE:
- case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER:
- case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER:
case GRPC_CHTTP2_WRITE_SCHEDULED:
GPR_UNREACHABLE_CODE(break);
case GRPC_CHTTP2_WRITING:
set_write_state(t, GRPC_CHTTP2_WRITING_INACTIVE, "terminate_writing");
break;
case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER:
- set_write_state(t, GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER,
- "terminate_writing");
+ set_write_state(t, GRPC_CHTTP2_WRITE_SCHEDULED, "terminate_writing");
+ grpc_combiner_execute_finally(exec_ctx, t->executor.combiner,
+ &t->initiate_writing, GRPC_ERROR_NONE,
+ true);
break;
case GRPC_CHTTP2_WRITING_STALE_NO_POLLER:
- set_write_state(t, GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER,
- "terminate_writing");
+ set_write_state(t, GRPC_CHTTP2_WRITE_SCHEDULED, "terminate_writing");
+ grpc_combiner_execute_finally(exec_ctx, t->executor.combiner,
+ &t->initiate_writing, GRPC_ERROR_NONE,
+ false);
break;
}
- if (t->ep && !t->endpoint_reading) {
- destroy_endpoint(exec_ctx, t);
- }
-
UNREF_TRANSPORT(exec_ctx, t, "writing");
}
void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx,
void *transport_writing, grpc_error *error) {
grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing);
- grpc_chttp2_run_with_global_lock(
- exec_ctx, t, NULL, terminate_writing_with_lock, GRPC_ERROR_REF(error), 0);
+ grpc_combiner_execute(exec_ctx, t->executor.combiner, &t->terminate_writing,
+ GRPC_ERROR_REF(error));
}
static void writing_action(grpc_exec_ctx *exec_ctx, void *gt,
@@ -1106,12 +953,13 @@ static int contains_non_ok_status(
static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {}
-static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s, void *stream_op) {
+static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
+ grpc_error *error_ignored) {
GPR_TIMER_BEGIN("perform_stream_op_locked", 0);
grpc_transport_stream_op *op = stream_op;
+ grpc_chttp2_transport *t = op->transport_private.args[0];
+ grpc_chttp2_stream *s = op->transport_private.args[1];
grpc_chttp2_transport_global *transport_global = &t->global;
grpc_chttp2_stream_global *stream_global = &s->global;
@@ -1291,9 +1139,12 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_transport_stream_op *op) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
- grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
- grpc_chttp2_run_with_global_lock(exec_ctx, t, s, perform_stream_op_locked, op,
- sizeof(*op));
+ grpc_closure_init(&op->transport_private.closure, perform_stream_op_locked,
+ op);
+ op->transport_private.args[0] = gt;
+ op->transport_private.args[1] = gs;
+ grpc_combiner_execute(exec_ctx, t->executor.combiner,
+ &op->transport_private.closure, GRPC_ERROR_NONE);
}
static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
@@ -1315,13 +1166,20 @@ static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_initiate_write(exec_ctx, &t->global, true, "send_ping");
}
-static void ack_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
- grpc_chttp2_stream *s, void *opaque_8bytes) {
+typedef struct ack_ping_args {
+ grpc_closure closure;
+ grpc_chttp2_transport *t;
+ uint8_t opaque_8bytes[8];
+} ack_ping_args;
+
+static void ack_ping_locked(grpc_exec_ctx *exec_ctx, void *a,
+ grpc_error *error_ignored) {
+ ack_ping_args *args = a;
grpc_chttp2_outstanding_ping *ping;
- grpc_chttp2_transport_global *transport_global = &t->global;
+ grpc_chttp2_transport_global *transport_global = &args->t->global;
for (ping = transport_global->pings.next; ping != &transport_global->pings;
ping = ping->next) {
- if (0 == memcmp(opaque_8bytes, ping->id, 8)) {
+ if (0 == memcmp(args->opaque_8bytes, ping->id, 8)) {
grpc_exec_ctx_sched(exec_ctx, ping->on_recv, GRPC_ERROR_NONE, NULL);
ping->next->prev = ping->prev;
ping->prev->next = ping->next;
@@ -1329,21 +1187,25 @@ static void ack_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
break;
}
}
+ gpr_free(args);
}
void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_parsing *transport_parsing,
const uint8_t *opaque_8bytes) {
- grpc_chttp2_run_with_global_lock(
- exec_ctx, TRANSPORT_FROM_PARSING(transport_parsing), NULL,
- ack_ping_locked, (void *)opaque_8bytes, 8);
+ ack_ping_args *args = gpr_malloc(sizeof(*args));
+ args->t = TRANSPORT_FROM_PARSING(transport_parsing);
+ memcpy(args->opaque_8bytes, opaque_8bytes, sizeof(args->opaque_8bytes));
+ grpc_closure_init(&args->closure, ack_ping_locked, args);
+ grpc_combiner_execute(exec_ctx, args->t->executor.combiner, &args->closure,
+ GRPC_ERROR_NONE);
}
static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s_unused,
- void *stream_op) {
+ void *stream_op,
+ grpc_error *error_ignored) {
grpc_transport_op *op = stream_op;
+ grpc_chttp2_transport *t = op->transport_private.args[0];
grpc_error *close_transport = op->disconnect_with_error;
/* If there's a set_accept_stream ensure that we're not parsing
@@ -1355,8 +1217,6 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
return;
}
- grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL);
-
if (op->on_connectivity_state_change != NULL) {
grpc_connectivity_state_notify_on_state_change(
exec_ctx, &t->channel_callback.state_tracker, op->connectivity_state,
@@ -1382,11 +1242,11 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
}
if (op->bind_pollset) {
- add_to_pollset_locked(exec_ctx, t, NULL, op->bind_pollset);
+ grpc_endpoint_add_to_pollset(exec_ctx, t->ep, op->bind_pollset);
}
if (op->bind_pollset_set) {
- add_to_pollset_set_locked(exec_ctx, t, NULL, op->bind_pollset_set);
+ grpc_endpoint_add_to_pollset_set(exec_ctx, t->ep, op->bind_pollset_set);
}
if (op->send_ping) {
@@ -1396,13 +1256,18 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
if (close_transport != GRPC_ERROR_NONE) {
close_transport_locked(exec_ctx, t, close_transport);
}
+
+ grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL);
}
static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_transport_op *op) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
- grpc_chttp2_run_with_global_lock(
- exec_ctx, t, NULL, perform_transport_op_locked, op, sizeof(*op));
+ op->transport_private.args[0] = gt;
+ grpc_closure_init(&op->transport_private.closure, perform_transport_op_locked,
+ op);
+ grpc_combiner_execute(exec_ctx, t->executor.combiner,
+ &op->transport_private.closure, GRPC_ERROR_NONE);
}
/*******************************************************************************
@@ -1420,7 +1285,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
if (stream_global->seen_error) {
while ((bs = grpc_chttp2_incoming_frame_queue_pop(
&stream_global->incoming_frames)) != NULL) {
- incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs);
+ incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
}
if (stream_global->exceeded_metadata_size) {
cancel_from_api(
@@ -1443,7 +1308,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
stream_global->seen_error &&
(bs = grpc_chttp2_incoming_frame_queue_pop(
&stream_global->incoming_frames)) != NULL) {
- incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs);
+ incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
}
if (stream_global->incoming_frames.head != NULL) {
*stream_global->recv_message = grpc_chttp2_incoming_frame_queue_pop(
@@ -1464,7 +1329,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
if (stream_global->seen_error) {
while ((bs = grpc_chttp2_incoming_frame_queue_pop(
&stream_global->incoming_frames)) != NULL) {
- incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs);
+ incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
}
if (stream_global->exceeded_metadata_size) {
cancel_from_api(
@@ -1905,16 +1770,12 @@ static void update_global_window(void *args, uint32_t id, void *stream) {
* INPUT PROCESSING - PARSING
*/
-static void reading_action_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s_unused, void *arg);
static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
-static void post_reading_action_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s_unused, void *arg);
-static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
- grpc_chttp2_stream *s_unused, void *arg);
+static void post_reading_action_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error);
+static void post_parse_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error);
static void reading_action(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error) {
@@ -1922,16 +1783,16 @@ static void reading_action(grpc_exec_ctx *exec_ctx, void *tp,
reading_action_locked ->
(parse_unlocked -> post_parse_locked)? ->
post_reading_action_locked */
- grpc_chttp2_run_with_global_lock(exec_ctx, tp, NULL, reading_action_locked,
- GRPC_ERROR_REF(error), 0);
+ grpc_chttp2_transport *t = tp;
+ grpc_combiner_execute(exec_ctx, t->executor.combiner,
+ &t->reading_action_locked, GRPC_ERROR_REF(error));
}
-static void reading_action_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s_unused, void *arg) {
+static void reading_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
+ grpc_error *error) {
+ grpc_chttp2_transport *t = tp;
grpc_chttp2_transport_global *transport_global = &t->global;
grpc_chttp2_transport_parsing *transport_parsing = &t->parsing;
- grpc_error *error = arg;
GPR_ASSERT(!t->executor.parsing_active);
if (!t->closed) {
@@ -1942,7 +1803,7 @@ static void reading_action_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_prepare_to_read(transport_global, transport_parsing);
grpc_exec_ctx_sched(exec_ctx, &t->parsing_action, error, NULL);
} else {
- post_reading_action_locked(exec_ctx, t, s_unused, arg);
+ post_reading_action_locked(exec_ctx, t, GRPC_ERROR_REF(error));
}
}
@@ -1997,12 +1858,13 @@ static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_ERROR_UNREF(errors[i]);
}
GPR_TIMER_END("reading_action.parse", 0);
- grpc_chttp2_run_with_global_lock(exec_ctx, t, NULL, post_parse_locked, err,
- 0);
+ grpc_combiner_execute(exec_ctx, t->executor.combiner, &t->post_parse_locked,
+ err);
}
-static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
- grpc_chttp2_stream *s_unused, void *arg) {
+static void post_parse_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_chttp2_transport *t = arg;
grpc_chttp2_transport_global *transport_global = &t->global;
grpc_chttp2_transport_parsing *transport_parsing = &t->parsing;
/* copy parsing qbuf to global qbuf */
@@ -2028,7 +1890,7 @@ static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
if (t->post_parsing_op) {
grpc_transport_op *op = t->post_parsing_op;
t->post_parsing_op = NULL;
- perform_transport_op_locked(exec_ctx, t, NULL, op);
+ perform_transport_op_locked(exec_ctx, op, GRPC_ERROR_NONE);
gpr_free(op);
}
/* if a stream is in the stream map, and gets cancelled, we need to
@@ -2045,15 +1907,14 @@ static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2");
}
- post_reading_action_locked(exec_ctx, t, s_unused, arg);
+ post_reading_action_locked(exec_ctx, t, error);
}
-static void post_reading_action_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s_unused,
- void *arg) {
- grpc_error *error = arg;
+static void post_reading_action_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_chttp2_transport *t = arg;
bool keep_reading = false;
+ GRPC_ERROR_REF(error);
if (error == GRPC_ERROR_NONE && t->closed) {
error = GRPC_ERROR_CREATE("Transport closed");
}
@@ -2068,16 +1929,12 @@ static void post_reading_action_locked(grpc_exec_ctx *exec_ctx,
gpr_log(GPR_DEBUG, "R:%p -> 0 ws=%s", t,
write_state_name(t->executor.write_state));
}
- if (t->executor.write_state == GRPC_CHTTP2_WRITING_INACTIVE && t->ep) {
- destroy_endpoint(exec_ctx, t);
- }
} else if (!t->closed) {
keep_reading = true;
REF_TRANSPORT(t, "keep_reading");
prevent_endpoint_shutdown(t);
}
gpr_slice_buffer_reset_and_unref(&t->read_buffer);
- GRPC_ERROR_UNREF(error);
if (keep_reading) {
grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, &t->reading_action);
@@ -2086,6 +1943,7 @@ static void post_reading_action_locked(grpc_exec_ctx *exec_ctx,
} else {
UNREF_TRANSPORT(exec_ctx, t, "reading_action");
}
+ GRPC_ERROR_UNREF(error);
}
/*******************************************************************************
@@ -2107,36 +1965,16 @@ static void connectivity_state_set(
* POLLSET STUFF
*/
-static void add_to_pollset_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s_unused, void *pollset) {
- if (t->ep) {
- grpc_endpoint_add_to_pollset(exec_ctx, t->ep, pollset);
- }
-}
-
-static void add_to_pollset_set_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s_unused,
- void *pollset_set) {
- if (t->ep) {
- grpc_endpoint_add_to_pollset_set(exec_ctx, t->ep, pollset_set);
- }
-}
-
static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_pollset *pollset) {
- /* TODO(ctiller): keep pollset alive */
- grpc_chttp2_run_with_global_lock(exec_ctx, (grpc_chttp2_transport *)gt,
- (grpc_chttp2_stream *)gs,
- add_to_pollset_locked, pollset, 0);
+ grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
+ grpc_endpoint_add_to_pollset(exec_ctx, t->ep, pollset);
}
static void set_pollset_set(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_pollset_set *pollset_set) {
- grpc_chttp2_run_with_global_lock(exec_ctx, (grpc_chttp2_transport *)gt,
- (grpc_chttp2_stream *)gs,
- add_to_pollset_set_locked, pollset_set, 0);
+ grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
+ grpc_endpoint_add_to_pollset_set(exec_ctx, t->ep, pollset_set);
}
/*******************************************************************************
@@ -2148,6 +1986,7 @@ static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx,
if (gpr_unref(&bs->refs)) {
GRPC_ERROR_UNREF(bs->error);
gpr_slice_buffer_destroy(&bs->slices);
+ gpr_mu_destroy(&bs->slice_mu);
gpr_free(bs);
}
}
@@ -2193,38 +2032,31 @@ static void incoming_byte_stream_update_flow_control(
}
}
-typedef struct {
- grpc_chttp2_incoming_byte_stream *byte_stream;
- gpr_slice *slice;
- size_t max_size_hint;
- grpc_closure *on_complete;
-} incoming_byte_stream_next_arg;
-
static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s,
- void *argp) {
- incoming_byte_stream_next_arg *arg = argp;
- grpc_chttp2_incoming_byte_stream *bs =
- (grpc_chttp2_incoming_byte_stream *)arg->byte_stream;
+ void *argp,
+ grpc_error *error_ignored) {
+ grpc_chttp2_incoming_byte_stream *bs = argp;
grpc_chttp2_transport_global *transport_global = &bs->transport->global;
grpc_chttp2_stream_global *stream_global = &bs->stream->global;
if (bs->is_tail) {
- incoming_byte_stream_update_flow_control(exec_ctx, transport_global,
- stream_global, arg->max_size_hint,
- bs->slices.length);
+ incoming_byte_stream_update_flow_control(
+ exec_ctx, transport_global, stream_global,
+ bs->next_action.max_size_hint, bs->slices.length);
}
+ gpr_mu_lock(&bs->slice_mu);
if (bs->slices.count > 0) {
- *arg->slice = gpr_slice_buffer_take_first(&bs->slices);
- grpc_exec_ctx_sched(exec_ctx, arg->on_complete, GRPC_ERROR_NONE, NULL);
- } else if (bs->error != GRPC_ERROR_NONE) {
- grpc_exec_ctx_sched(exec_ctx, arg->on_complete, GRPC_ERROR_REF(bs->error),
+ *bs->next_action.slice = gpr_slice_buffer_take_first(&bs->slices);
+ grpc_exec_ctx_sched(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE,
NULL);
+ } else if (bs->error != GRPC_ERROR_NONE) {
+ grpc_exec_ctx_sched(exec_ctx, bs->next_action.on_complete,
+ GRPC_ERROR_REF(bs->error), NULL);
} else {
- bs->on_next = arg->on_complete;
- bs->next = arg->slice;
+ bs->on_next = bs->next_action.on_complete;
+ bs->next = bs->next_action.slice;
}
+ gpr_mu_unlock(&bs->slice_mu);
incoming_byte_stream_unref(exec_ctx, bs);
}
@@ -2234,11 +2066,14 @@ static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
grpc_closure *on_complete) {
grpc_chttp2_incoming_byte_stream *bs =
(grpc_chttp2_incoming_byte_stream *)byte_stream;
- incoming_byte_stream_next_arg arg = {bs, slice, max_size_hint, on_complete};
gpr_ref(&bs->refs);
- grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream,
- incoming_byte_stream_next_locked, &arg,
- sizeof(arg));
+ bs->next_action.slice = slice;
+ bs->next_action.max_size_hint = max_size_hint;
+ bs->next_action.on_complete = on_complete;
+ grpc_closure_init(&bs->next_action.closure, incoming_byte_stream_next_locked,
+ bs);
+ grpc_combiner_execute(exec_ctx, bs->transport->executor.combiner,
+ &bs->next_action.closure, GRPC_ERROR_NONE);
return 0;
}
@@ -2246,9 +2081,8 @@ static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream);
static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s,
- void *byte_stream) {
+ void *byte_stream,
+ grpc_error *error_ignored) {
grpc_chttp2_incoming_byte_stream *bs = byte_stream;
GPR_ASSERT(bs->base.destroy == incoming_byte_stream_destroy);
decrement_active_streams_locked(exec_ctx, &bs->transport->global,
@@ -2260,8 +2094,10 @@ static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream) {
grpc_chttp2_incoming_byte_stream *bs =
(grpc_chttp2_incoming_byte_stream *)byte_stream;
- grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream,
- incoming_byte_stream_destroy_locked, bs, 0);
+ grpc_closure_init(&bs->destroy_action, incoming_byte_stream_destroy_locked,
+ bs);
+ grpc_combiner_execute(exec_ctx, bs->transport->executor.combiner,
+ &bs->destroy_action, GRPC_ERROR_NONE);
}
typedef struct {
@@ -2269,64 +2105,29 @@ typedef struct {
gpr_slice slice;
} incoming_byte_stream_push_arg;
-static void incoming_byte_stream_push_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s,
- void *argp) {
- incoming_byte_stream_push_arg *arg = argp;
- grpc_chttp2_incoming_byte_stream *bs = arg->byte_stream;
+void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_incoming_byte_stream *bs,
+ gpr_slice slice) {
+ gpr_mu_lock(&bs->slice_mu);
if (bs->on_next != NULL) {
- *bs->next = arg->slice;
+ *bs->next = slice;
grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_NONE, NULL);
bs->on_next = NULL;
} else {
- gpr_slice_buffer_add(&bs->slices, arg->slice);
+ gpr_slice_buffer_add(&bs->slices, slice);
}
- incoming_byte_stream_unref(exec_ctx, bs);
-}
-
-void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_incoming_byte_stream *bs,
- gpr_slice slice) {
- incoming_byte_stream_push_arg arg = {bs, slice};
- gpr_ref(&bs->refs);
- grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream,
- incoming_byte_stream_push_locked, &arg,
- sizeof(arg));
-}
-
-typedef struct {
- grpc_chttp2_incoming_byte_stream *bs;
- grpc_error *error;
-} bs_fail_args;
-
-static bs_fail_args *make_bs_fail_args(grpc_chttp2_incoming_byte_stream *bs,
- grpc_error *error) {
- bs_fail_args *a = gpr_malloc(sizeof(*a));
- a->bs = bs;
- a->error = error;
- return a;
-}
-
-static void incoming_byte_stream_finished_failed_locked(
- grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s,
- void *argp) {
- bs_fail_args *a = argp;
- grpc_chttp2_incoming_byte_stream *bs = a->bs;
- grpc_error *error = a->error;
- gpr_free(a);
- grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_REF(error), NULL);
- bs->on_next = NULL;
- GRPC_ERROR_UNREF(bs->error);
- bs->error = error;
- incoming_byte_stream_unref(exec_ctx, bs);
+ gpr_mu_unlock(&bs->slice_mu);
}
-static void incoming_byte_stream_finished_ok_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s,
- void *argp) {
- grpc_chttp2_incoming_byte_stream *bs = argp;
+static void incoming_byte_stream_finished_locked(grpc_exec_ctx *exec_ctx,
+ void *bsp, grpc_error *error) {
+ grpc_chttp2_incoming_byte_stream *bs = bsp;
+ if (error != GRPC_ERROR_NONE) {
+ grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_REF(error), NULL);
+ bs->on_next = NULL;
+ GRPC_ERROR_UNREF(bs->error);
+ bs->error = error;
+ }
incoming_byte_stream_unref(exec_ctx, bs);
}
@@ -2334,24 +2135,12 @@ void grpc_chttp2_incoming_byte_stream_finished(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
grpc_error *error, int from_parsing_thread) {
if (from_parsing_thread) {
- if (error == GRPC_ERROR_NONE) {
- grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream,
- incoming_byte_stream_finished_ok_locked,
- bs, 0);
- } else {
- grpc_chttp2_run_with_global_lock(
- exec_ctx, bs->transport, bs->stream,
- incoming_byte_stream_finished_failed_locked,
- make_bs_fail_args(bs, error), 0);
- }
+ grpc_closure_init(&bs->finished_action,
+ incoming_byte_stream_finished_locked, bs);
+ grpc_combiner_execute(exec_ctx, bs->transport->executor.combiner,
+ &bs->finished_action, GRPC_ERROR_REF(error));
} else {
- if (error == GRPC_ERROR_NONE) {
- incoming_byte_stream_finished_ok_locked(exec_ctx, bs->transport,
- bs->stream, bs);
- } else {
- incoming_byte_stream_finished_failed_locked(
- exec_ctx, bs->transport, bs->stream, make_bs_fail_args(bs, error));
- }
+ incoming_byte_stream_finished_locked(exec_ctx, bs, error);
}
}
@@ -2365,6 +2154,7 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
incoming_byte_stream->base.flags = flags;
incoming_byte_stream->base.next = incoming_byte_stream_next;
incoming_byte_stream->base.destroy = incoming_byte_stream_destroy;
+ gpr_mu_init(&incoming_byte_stream->slice_mu);
gpr_ref_init(&incoming_byte_stream->refs, 2);
incoming_byte_stream->next_message = NULL;
incoming_byte_stream->transport = TRANSPORT_FROM_PARSING(transport_parsing);
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 81ddaa8fa1..efac10a93c 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -48,6 +48,7 @@
#include "src/core/ext/transport/chttp2/transport/hpack_parser.h"
#include "src/core/ext/transport/chttp2/transport/incoming_metadata.h"
#include "src/core/ext/transport/chttp2/transport/stream_map.h"
+#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/transport_impl.h"
@@ -161,9 +162,20 @@ struct grpc_chttp2_incoming_byte_stream {
grpc_chttp2_transport *transport;
grpc_chttp2_stream *stream;
int is_tail;
+
+ gpr_mu slice_mu; // protects slices, on_next
gpr_slice_buffer slices;
grpc_closure *on_next;
gpr_slice *next;
+
+ struct {
+ grpc_closure closure;
+ gpr_slice *slice;
+ size_t max_size_hint;
+ grpc_closure *on_complete;
+ } next_action;
+ grpc_closure destroy_action;
+ grpc_closure finished_action;
};
typedef struct {
@@ -294,23 +306,9 @@ struct grpc_chttp2_transport_parsing {
int64_t outgoing_window;
};
-typedef void (*grpc_chttp2_locked_action)(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s, void *arg);
-
-typedef struct grpc_chttp2_executor_action_header {
- grpc_chttp2_stream *stream;
- grpc_chttp2_locked_action action;
- struct grpc_chttp2_executor_action_header *next;
- void *arg;
-} grpc_chttp2_executor_action_header;
-
typedef enum {
/** no writing activity */
GRPC_CHTTP2_WRITING_INACTIVE,
- /** write has been requested, but not scheduled yet */
- GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER,
- GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER,
/** write has been requested and scheduled against the workqueue */
GRPC_CHTTP2_WRITE_SCHEDULED,
/** write has been initiated after being reaped from the workqueue */
@@ -331,7 +329,7 @@ struct grpc_chttp2_transport {
gpr_refcount shutdown_ep_refs;
struct {
- gpr_mu mu;
+ grpc_combiner *combiner;
/** is a thread currently in the global lock */
bool global_active;
@@ -339,9 +337,6 @@ struct grpc_chttp2_transport {
bool parsing_active;
/** write execution state of the transport */
grpc_chttp2_write_state write_state;
-
- grpc_chttp2_executor_action_header *pending_actions_head;
- grpc_chttp2_executor_action_header *pending_actions_tail;
} executor;
/** is the transport destroying itself? */
@@ -377,10 +372,14 @@ struct grpc_chttp2_transport {
grpc_closure writing_action;
/** closure to start reading from the endpoint */
grpc_closure reading_action;
+ grpc_closure reading_action_locked;
+ grpc_closure post_parse_locked;
/** closure to actually do parsing */
grpc_closure parsing_action;
/** closure to initiate writing */
grpc_closure initiate_writing;
+ /** closure to finish writing */
+ grpc_closure terminate_writing;
/** incoming read bytes */
gpr_slice_buffer read_buffer;
@@ -522,11 +521,16 @@ struct grpc_chttp2_stream_parsing {
};
struct grpc_chttp2_stream {
+ grpc_chttp2_transport *t;
grpc_stream_refcount *refcount;
grpc_chttp2_stream_global global;
grpc_chttp2_stream_writing writing;
grpc_chttp2_stream_parsing parsing;
+ grpc_closure init_stream;
+ grpc_closure destroy_stream;
+ void *destroy_stream_arg;
+
grpc_chttp2_stream_link links[STREAM_LIST_COUNT];
uint8_t included[STREAM_LIST_COUNT];
};
@@ -698,12 +702,6 @@ void grpc_chttp2_complete_closure_step(
grpc_chttp2_stream_global *stream_global, grpc_closure **pclosure,
grpc_error *error);
-void grpc_chttp2_run_with_global_lock(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *transport,
- grpc_chttp2_stream *optional_stream,
- grpc_chttp2_locked_action action,
- void *arg, size_t sizeof_arg);
-
#define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
#define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \
(sizeof(GRPC_CHTTP2_CLIENT_CONNECT_STRING) - 1)
diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c
index 34e8f43905..5dfef4f25b 100644
--- a/src/core/lib/iomgr/combiner.c
+++ b/src/core/lib/iomgr/combiner.c
@@ -187,3 +187,7 @@ void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
}
grpc_closure_list_append(&lock->final_list, closure, error);
}
+
+void grpc_combiner_force_async_finally(grpc_combiner *lock) {
+ lock->take_async_break_before_final_list = true;
+}
diff --git a/src/core/lib/iomgr/combiner.h b/src/core/lib/iomgr/combiner.h
index d6bc27111f..3554202aae 100644
--- a/src/core/lib/iomgr/combiner.h
+++ b/src/core/lib/iomgr/combiner.h
@@ -63,5 +63,6 @@ void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
grpc_closure *closure, grpc_error *error,
bool force_async_break);
+void grpc_combiner_force_async_finally(grpc_combiner *lock);
#endif /* GRPC_CORE_LIB_IOMGR_COMBINER_H */
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index 08c0a237c9..b04b60ec3d 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -96,6 +96,11 @@ void grpc_transport_move_one_way_stats(grpc_transport_one_way_stats *from,
void grpc_transport_move_stats(grpc_transport_stream_stats *from,
grpc_transport_stream_stats *to);
+typedef struct {
+ grpc_closure closure;
+ void *args[2];
+} grpc_transport_private_op_data;
+
/* Transport stream op: a set of operations to perform on a transport
against a single stream */
typedef struct grpc_transport_stream_op {
@@ -144,6 +149,12 @@ typedef struct grpc_transport_stream_op {
/* Indexes correspond to grpc_context_index enum values */
grpc_call_context_element *context;
+
+ /***************************************************************************
+ * remaining fields are initialized and used at the discretion of the
+ * transport implementation */
+
+ grpc_transport_private_op_data transport_private;
} grpc_transport_stream_op;
/** Transport op: a set of operations to perform on a transport as a whole */
@@ -177,6 +188,12 @@ typedef struct grpc_transport_op {
grpc_pollset_set *bind_pollset_set;
/** send a ping, call this back if not NULL */
grpc_closure *send_ping;
+
+ /***************************************************************************
+ * remaining fields are initialized and used at the discretion of the
+ * transport implementation */
+
+ grpc_transport_private_op_data transport_private;
} grpc_transport_op;
/* Returns the amount of memory required to store a grpc_stream for this