aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/client_config/client_channel.c14
-rw-r--r--src/core/ext/client_config/subchannel.c18
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c766
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h54
-rw-r--r--src/core/ext/transport/chttp2/transport/parsing.c12
-rw-r--r--src/core/ext/transport/chttp2/transport/stream_lists.c9
-rw-r--r--src/core/ext/transport/chttp2/transport/writing.c21
-rw-r--r--src/core/lib/channel/channel_stack.c23
-rw-r--r--src/core/lib/channel/compress_filter.c14
-rw-r--r--src/core/lib/iomgr/closure.c4
-rw-r--r--src/core/lib/iomgr/closure.h18
-rw-r--r--src/core/lib/iomgr/combiner.c293
-rw-r--r--src/core/lib/iomgr/combiner.h71
-rw-r--r--src/core/lib/iomgr/error.c2
-rw-r--r--src/core/lib/iomgr/ev_epoll_linux.c4
-rw-r--r--src/core/lib/iomgr/exec_ctx.h6
-rw-r--r--src/core/lib/iomgr/tcp_posix.c17
-rw-r--r--src/core/lib/iomgr/workqueue.h4
-rw-r--r--src/core/lib/iomgr/workqueue_posix.c97
-rw-r--r--src/core/lib/iomgr/workqueue_posix.h9
-rw-r--r--src/core/lib/iomgr/workqueue_windows.c2
-rw-r--r--src/core/lib/security/transport/client_auth_filter.c5
-rw-r--r--src/core/lib/security/transport/secure_endpoint.c5
-rw-r--r--src/core/lib/security/transport/server_auth_filter.c27
-rw-r--r--src/core/lib/support/mpscq.c83
-rw-r--r--src/core/lib/support/mpscq.h65
-rw-r--r--src/core/lib/surface/call.c63
-rw-r--r--src/core/lib/surface/channel.c7
-rw-r--r--src/core/lib/surface/channel_ping.c11
-rw-r--r--src/core/lib/surface/init.c2
-rw-r--r--src/core/lib/surface/lame_client.c6
-rw-r--r--src/core/lib/surface/server.c69
-rw-r--r--src/core/lib/transport/transport.c27
-rw-r--r--src/core/lib/transport/transport.h21
-rw-r--r--src/python/grpcio/grpc_core_dependencies.py2
35 files changed, 649 insertions, 1202 deletions
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c
index be333f4e0d..61e012578e 100644
--- a/src/core/ext/client_config/client_channel.c
+++ b/src/core/ext/client_config/client_channel.c
@@ -387,7 +387,7 @@ typedef struct client_channel_call_data {
grpc_connected_subchannel *connected_subchannel;
grpc_polling_entity *pollent;
- grpc_transport_stream_op **waiting_ops;
+ grpc_transport_stream_op *waiting_ops;
size_t waiting_ops_count;
size_t waiting_ops_capacity;
@@ -404,7 +404,7 @@ static void add_waiting_locked(call_data *calld, grpc_transport_stream_op *op) {
gpr_realloc(calld->waiting_ops,
calld->waiting_ops_capacity * sizeof(*calld->waiting_ops));
}
- calld->waiting_ops[calld->waiting_ops_count++] = op;
+ calld->waiting_ops[calld->waiting_ops_count++] = *op;
GPR_TIMER_END("add_waiting_locked", 0);
}
@@ -413,14 +413,14 @@ static void fail_locked(grpc_exec_ctx *exec_ctx, call_data *calld,
size_t i;
for (i = 0; i < calld->waiting_ops_count; i++) {
grpc_transport_stream_op_finish_with_failure(
- exec_ctx, calld->waiting_ops[i], GRPC_ERROR_REF(error));
+ exec_ctx, &calld->waiting_ops[i], GRPC_ERROR_REF(error));
}
calld->waiting_ops_count = 0;
GRPC_ERROR_UNREF(error);
}
typedef struct {
- grpc_transport_stream_op **ops;
+ grpc_transport_stream_op *ops;
size_t nops;
grpc_subchannel_call *call;
} retry_ops_args;
@@ -429,7 +429,7 @@ static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) {
retry_ops_args *a = args;
size_t i;
for (i = 0; i < a->nops; i++) {
- grpc_subchannel_call_process_op(exec_ctx, a->call, a->ops[i]);
+ grpc_subchannel_call_process_op(exec_ctx, a->call, &a->ops[i]);
}
GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, a->call, "retry_ops");
gpr_free(a->ops);
@@ -437,10 +437,6 @@ static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) {
}
static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) {
- if (calld->waiting_ops_count == 0) {
- return;
- }
-
retry_ops_args *a = gpr_malloc(sizeof(*a));
a->ops = calld->waiting_ops;
a->nops = calld->waiting_ops_count;
diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c
index 2c4364b259..df35904b85 100644
--- a/src/core/ext/client_config/subchannel.c
+++ b/src/core/ext/client_config/subchannel.c
@@ -504,13 +504,14 @@ static void connected_subchannel_state_op(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *interested_parties,
grpc_connectivity_state *state,
grpc_closure *closure) {
- grpc_transport_op *op = grpc_make_transport_op(NULL);
+ grpc_transport_op op;
grpc_channel_element *elem;
- op->connectivity_state = state;
- op->on_connectivity_state_change = closure;
- op->bind_pollset_set = interested_parties;
+ memset(&op, 0, sizeof(op));
+ op.connectivity_state = state;
+ op.on_connectivity_state_change = closure;
+ op.bind_pollset_set = interested_parties;
elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0);
- elem->filter->start_transport_op(exec_ctx, elem, op);
+ elem->filter->start_transport_op(exec_ctx, elem, &op);
}
void grpc_connected_subchannel_notify_on_state_change(
@@ -524,11 +525,12 @@ void grpc_connected_subchannel_notify_on_state_change(
void grpc_connected_subchannel_ping(grpc_exec_ctx *exec_ctx,
grpc_connected_subchannel *con,
grpc_closure *closure) {
- grpc_transport_op *op = grpc_make_transport_op(NULL);
+ grpc_transport_op op;
grpc_channel_element *elem;
- op->send_ping = closure;
+ memset(&op, 0, sizeof(op));
+ op.send_ping = closure;
elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0);
- elem->filter->start_transport_op(exec_ctx, elem, op);
+ elem->filter->start_transport_op(exec_ctx, elem, &op);
}
static void publish_transport_locked(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 53da0e5d70..00999e3b94 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -89,20 +89,13 @@ static const grpc_transport_vtable vtable;
static void writing_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error);
static void reading_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error);
static void parsing_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error);
-static void reading_action_locked(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error);
-static void post_parse_locked(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error);
-static void initiate_writing_locked(grpc_exec_ctx *exec_ctx, void *t,
- grpc_error *error);
-static void initiate_read_flush_locked(grpc_exec_ctx *exec_ctx, void *t,
- grpc_error *error);
-static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, void *t,
- grpc_error *error);
+static void initiate_writing(grpc_exec_ctx *exec_ctx, void *t,
+ grpc_error *error);
static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t);
static void end_waiting_for_write(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t, grpc_error *error);
+ grpc_chttp2_transport *t, grpc_error *error,
+ const char *reason);
/** Set a transport level setting, and push it to our peer */
static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
@@ -112,6 +105,11 @@ static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_error *error);
+/** Perform a transport_op */
+static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s, void *transport_op);
+
/** Cancel a stream: coming from the transport API */
static void cancel_from_api(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global,
@@ -123,10 +121,22 @@ static void close_from_api(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream_global *stream_global,
grpc_error *error);
+/** Add endpoint from this transport to pollset */
+static void add_to_pollset_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s_ignored, void *pollset);
+static void add_to_pollset_set_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s_ignored,
+ void *pollset_set);
+
/** Start new streams that have been created if we can */
static void maybe_start_some_streams(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global);
+static void finish_global_actions(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t);
+
static void connectivity_state_set(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
grpc_connectivity_state state, grpc_error *error, const char *reason);
@@ -139,16 +149,14 @@ static void incoming_byte_stream_update_flow_control(
grpc_chttp2_stream_global *stream_global, size_t max_size_hint,
size_t have_already);
static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
- void *byte_stream,
- grpc_error *error_ignored);
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s,
+ void *byte_stream);
static void fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global,
grpc_error *error);
-static void set_write_state(grpc_chttp2_transport *t,
- grpc_chttp2_write_state state, const char *reason);
-
/*******************************************************************************
* CONSTRUCTION/DESTRUCTION/REFCOUNTING
*/
@@ -157,7 +165,9 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) {
size_t i;
- grpc_endpoint_destroy(exec_ctx, t->ep);
+ gpr_mu_lock(&t->executor.mu);
+
+ GPR_ASSERT(t->ep == NULL);
gpr_slice_buffer_destroy(&t->global.qbuf);
@@ -181,7 +191,8 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream_map_destroy(&t->new_stream_map);
grpc_connectivity_state_destroy(exec_ctx, &t->channel_callback.state_tracker);
- grpc_combiner_destroy(exec_ctx, t->executor.combiner);
+ gpr_mu_unlock(&t->executor.mu);
+ gpr_mu_destroy(&t->executor.mu);
/* callback remaining pings: they're not allowed to call into the transpot,
and maybe they hold resources that need to be freed */
@@ -239,13 +250,12 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
memset(t, 0, sizeof(*t));
t->base.vtable = &vtable;
- t->executor.write_state = GRPC_CHTTP2_WRITES_CORKED;
t->ep = ep;
- /* one ref is for destroy */
- gpr_ref_init(&t->refs, 1);
+ /* one ref is for destroy, the other for when ep becomes NULL */
+ gpr_ref_init(&t->refs, 2);
/* ref is dropped at transport close() */
gpr_ref_init(&t->shutdown_ep_refs, 1);
- t->executor.combiner = grpc_combiner_create(grpc_endpoint_get_workqueue(ep));
+ gpr_mu_init(&t->executor.mu);
t->peer_string = grpc_endpoint_get_peer(ep);
t->endpoint_reading = 1;
t->global.next_stream_id = is_client ? 1 : 2;
@@ -271,22 +281,23 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_hpack_compressor_init(&t->writing.hpack_compressor);
grpc_closure_init(&t->writing_action, writing_action, t);
grpc_closure_init(&t->reading_action, reading_action, t);
- grpc_closure_init(&t->reading_action_locked, reading_action_locked, t);
grpc_closure_init(&t->parsing_action, parsing_action, t);
- grpc_closure_init(&t->post_parse_locked, post_parse_locked, t);
- grpc_closure_init(&t->initiate_writing, initiate_writing_locked, t);
- grpc_closure_init(&t->terminate_writing, terminate_writing_with_lock, t);
- grpc_closure_init(&t->initiate_read_flush_locked, initiate_read_flush_locked,
- t);
- grpc_closure_init(&t->writing.done_cb, grpc_chttp2_terminate_writing,
- &t->writing);
+ grpc_closure_init(&t->initiate_writing, initiate_writing, t);
gpr_slice_buffer_init(&t->parsing.qbuf);
grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser);
grpc_chttp2_hpack_parser_init(&t->parsing.hpack_parser);
+ grpc_closure_init(&t->writing.done_cb, grpc_chttp2_terminate_writing,
+ &t->writing);
gpr_slice_buffer_init(&t->read_buffer);
+ if (is_client) {
+ gpr_slice_buffer_add(
+ &t->global.qbuf,
+ gpr_slice_from_copied_string(GRPC_CHTTP2_CLIENT_CONNECT_STRING));
+ grpc_chttp2_initiate_write(exec_ctx, &t->global, false, "initial_write");
+ }
/* 8 is a random stab in the dark as to a good initial size: it's small enough
that it shouldn't waste memory for infrequently used connections, yet
large enough that the exponential growth should happen nicely when it's
@@ -309,13 +320,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->global.force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
t->global.sent_local_settings = 0;
- if (is_client) {
- gpr_slice_buffer_add(
- &t->writing.outbuf,
- gpr_slice_from_copied_string(GRPC_CHTTP2_CLIENT_CONNECT_STRING));
- grpc_chttp2_initiate_write(exec_ctx, &t->global, false, "initial_write");
- }
-
/* configure http2 the way we like it */
if (is_client) {
push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0);
@@ -420,39 +424,47 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
}
}
-
- set_write_state(t, GRPC_CHTTP2_WRITING_INACTIVE, "uncork");
- grpc_chttp2_initiate_write(exec_ctx, &t->global, false, "init");
}
-static void destroy_transport_locked(grpc_exec_ctx *exec_ctx, void *tp,
- grpc_error *error) {
- grpc_chttp2_transport *t = tp;
+static void destroy_transport_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s_ignored,
+ void *arg_ignored) {
t->destroying = 1;
drop_connection(exec_ctx, t, GRPC_ERROR_CREATE("Transport destroyed"));
- UNREF_TRANSPORT(exec_ctx, t, "destroy");
}
static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
- grpc_combiner_execute(exec_ctx, t->executor.combiner,
- grpc_closure_create(destroy_transport_locked, t),
- GRPC_ERROR_NONE);
+ grpc_chttp2_run_with_global_lock(exec_ctx, t, NULL, destroy_transport_locked,
+ NULL, 0);
+ UNREF_TRANSPORT(exec_ctx, t, "destroy");
}
/** block grpc_endpoint_shutdown being called until a paired
allow_endpoint_shutdown is made */
static void prevent_endpoint_shutdown(grpc_chttp2_transport *t) {
+ GPR_ASSERT(t->ep);
gpr_ref(&t->shutdown_ep_refs);
}
static void allow_endpoint_shutdown_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) {
if (gpr_unref(&t->shutdown_ep_refs)) {
- grpc_endpoint_shutdown(exec_ctx, t->ep);
+ if (t->ep) {
+ grpc_endpoint_shutdown(exec_ctx, t->ep);
+ }
}
}
+static void destroy_endpoint(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t) {
+ grpc_endpoint_destroy(exec_ctx, t->ep);
+ t->ep = NULL;
+ /* safe because we'll still have the ref for write */
+ UNREF_TRANSPORT(exec_ctx, t, "disconnect");
+}
+
static void close_transport_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_error *error) {
@@ -463,7 +475,9 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
t->closed = 1;
connectivity_state_set(exec_ctx, &t->global, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "close_transport");
- allow_endpoint_shutdown_locked(exec_ctx, t);
+ if (t->ep) {
+ allow_endpoint_shutdown_locked(exec_ctx, t);
+ }
/* flush writable stream list to avoid dangling references */
grpc_chttp2_stream_global *stream_global;
@@ -497,23 +511,21 @@ void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx,
}
#endif
-static void finish_init_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
- grpc_error *error) {
- grpc_chttp2_stream *s = sp;
- grpc_chttp2_register_stream(s->t, s);
- GRPC_CHTTP2_STREAM_UNREF(exec_ctx, &s->global, "init");
+static void finish_init_stream_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s,
+ void *arg_ignored) {
+ grpc_chttp2_register_stream(t, s);
}
static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_stream_refcount *refcount,
const void *server_data) {
- GPR_TIMER_BEGIN("init_stream", 0);
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
memset(s, 0, sizeof(*s));
- s->t = t;
s->refcount = refcount;
/* We reserve one 'active stream' that's dropped when the stream is
read-closed. The others are for incoming_byte_streams that are actively
@@ -548,21 +560,16 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
s->global.in_stream_map = true;
}
- grpc_closure_init(&s->init_stream, finish_init_stream_locked, s);
- GRPC_CHTTP2_STREAM_REF(&s->global, "init");
- grpc_combiner_execute(exec_ctx, t->executor.combiner, &s->init_stream,
- GRPC_ERROR_NONE);
-
- GPR_TIMER_END("init_stream", 0);
+ grpc_chttp2_run_with_global_lock(exec_ctx, t, s, finish_init_stream_locked,
+ NULL, 0);
return 0;
}
-static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
- grpc_error *error) {
+static void destroy_stream_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s, void *arg) {
grpc_byte_stream *bs;
- grpc_chttp2_stream *s = sp;
- grpc_chttp2_transport *t = s->t;
GPR_TIMER_BEGIN("destroy_stream", 0);
@@ -581,7 +588,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
while (
(bs = grpc_chttp2_incoming_frame_queue_pop(&s->global.incoming_frames))) {
- incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
+ incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs);
}
grpc_chttp2_list_remove_unannounced_incoming_window_available(&t->global,
@@ -618,20 +625,16 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
GPR_TIMER_END("destroy_stream", 0);
- gpr_free(s->destroy_stream_arg);
+ gpr_free(arg);
}
static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, void *and_free_memory) {
- GPR_TIMER_BEGIN("destroy_stream", 0);
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
- s->destroy_stream_arg = and_free_memory;
- grpc_closure_init(&s->destroy_stream, destroy_stream_locked, s);
- grpc_combiner_execute(exec_ctx, t->executor.combiner, &s->destroy_stream,
- GRPC_ERROR_NONE);
- GPR_TIMER_END("destroy_stream", 0);
+ grpc_chttp2_run_with_global_lock(exec_ctx, t, s, destroy_stream_locked,
+ and_free_memory, 0);
}
grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream(
@@ -662,10 +665,12 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(
static const char *write_state_name(grpc_chttp2_write_state state) {
switch (state) {
- case GRPC_CHTTP2_WRITES_CORKED:
- return "CORKED";
case GRPC_CHTTP2_WRITING_INACTIVE:
return "INACTIVE";
+ case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER:
+ return "REQUESTED[p=0]";
+ case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER:
+ return "REQUESTED[p=1]";
case GRPC_CHTTP2_WRITE_SCHEDULED:
return "SCHEDULED";
case GRPC_CHTTP2_WRITING:
@@ -688,18 +693,120 @@ static void set_write_state(grpc_chttp2_transport *t,
t->executor.write_state = state;
}
-static void initiate_writing_locked(grpc_exec_ctx *exec_ctx, void *tp,
- grpc_error *error) {
- grpc_chttp2_transport *t = tp;
- GPR_ASSERT(t->executor.write_state == GRPC_CHTTP2_WRITE_SCHEDULED);
- start_writing(exec_ctx, t);
+static void finish_global_actions(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t) {
+ grpc_chttp2_executor_action_header *hdr;
+ grpc_chttp2_executor_action_header *next;
+
+ GPR_TIMER_BEGIN("finish_global_actions", 0);
+
+ for (;;) {
+ check_read_ops(exec_ctx, &t->global);
+
+ gpr_mu_lock(&t->executor.mu);
+ if (t->executor.pending_actions_head != NULL) {
+ hdr = t->executor.pending_actions_head;
+ t->executor.pending_actions_head = t->executor.pending_actions_tail =
+ NULL;
+ gpr_mu_unlock(&t->executor.mu);
+ while (hdr != NULL) {
+ GPR_TIMER_BEGIN("chttp2:locked_action", 0);
+ hdr->action(exec_ctx, t, hdr->stream, hdr->arg);
+ GPR_TIMER_END("chttp2:locked_action", 0);
+ next = hdr->next;
+ gpr_free(hdr);
+ UNREF_TRANSPORT(exec_ctx, t, "pending_action");
+ hdr = next;
+ }
+ continue;
+ } else {
+ t->executor.global_active = false;
+ switch (t->executor.write_state) {
+ case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER:
+ set_write_state(t, GRPC_CHTTP2_WRITE_SCHEDULED, "unlocking");
+ REF_TRANSPORT(t, "initiate_writing");
+ gpr_mu_unlock(&t->executor.mu);
+ grpc_exec_ctx_sched(
+ exec_ctx, &t->initiate_writing, GRPC_ERROR_NONE,
+ t->ep != NULL ? grpc_endpoint_get_workqueue(t->ep) : NULL);
+ break;
+ case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER:
+ start_writing(exec_ctx, t);
+ gpr_mu_unlock(&t->executor.mu);
+ break;
+ case GRPC_CHTTP2_WRITING_INACTIVE:
+ case GRPC_CHTTP2_WRITING:
+ case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER:
+ case GRPC_CHTTP2_WRITING_STALE_NO_POLLER:
+ case GRPC_CHTTP2_WRITE_SCHEDULED:
+ gpr_mu_unlock(&t->executor.mu);
+ break;
+ }
+ }
+ break;
+ }
+
+ GPR_TIMER_END("finish_global_actions", 0);
}
-static void initiate_read_flush_locked(grpc_exec_ctx *exec_ctx, void *tp,
- grpc_error *error) {
- grpc_chttp2_transport *t = tp;
- t->executor.check_read_ops_scheduled = false;
- check_read_ops(exec_ctx, &t->global);
+void grpc_chttp2_run_with_global_lock(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *optional_stream,
+ grpc_chttp2_locked_action action,
+ void *arg, size_t sizeof_arg) {
+ grpc_chttp2_executor_action_header *hdr;
+
+ GPR_TIMER_BEGIN("grpc_chttp2_run_with_global_lock", 0);
+
+ REF_TRANSPORT(t, "run_global");
+ gpr_mu_lock(&t->executor.mu);
+
+ for (;;) {
+ if (!t->executor.global_active) {
+ t->executor.global_active = 1;
+ gpr_mu_unlock(&t->executor.mu);
+
+ GPR_TIMER_BEGIN("chttp2:locked_action", 0);
+ action(exec_ctx, t, optional_stream, arg);
+ GPR_TIMER_END("chttp2:locked_action", 0);
+
+ finish_global_actions(exec_ctx, t);
+ } else {
+ gpr_mu_unlock(&t->executor.mu);
+
+ hdr = gpr_malloc(sizeof(*hdr) + sizeof_arg);
+ hdr->stream = optional_stream;
+ hdr->action = action;
+ if (sizeof_arg == 0) {
+ hdr->arg = arg;
+ } else {
+ hdr->arg = hdr + 1;
+ memcpy(hdr->arg, arg, sizeof_arg);
+ }
+
+ gpr_mu_lock(&t->executor.mu);
+ if (!t->executor.global_active) {
+ /* global lock was released while allocating memory: release & retry */
+ gpr_free(hdr);
+ continue;
+ }
+ hdr->next = NULL;
+ if (t->executor.pending_actions_head != NULL) {
+ t->executor.pending_actions_tail =
+ t->executor.pending_actions_tail->next = hdr;
+ } else {
+ t->executor.pending_actions_tail = t->executor.pending_actions_head =
+ hdr;
+ }
+ REF_TRANSPORT(t, "pending_action");
+ gpr_mu_unlock(&t->executor.mu);
+ }
+ break;
+ }
+
+ UNREF_TRANSPORT(exec_ctx, t, "run_global");
+
+ GPR_TIMER_END("grpc_chttp2_run_with_global_lock", 0);
}
/*******************************************************************************
@@ -709,12 +816,11 @@ static void initiate_read_flush_locked(grpc_exec_ctx *exec_ctx, void *tp,
void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global,
bool covered_by_poller, const char *reason) {
- GPR_TIMER_BEGIN("grpc_chttp2_initiate_write", 0);
-
/* Perform state checks, and transition to a scheduled state if appropriate.
- If we are inactive, schedule a write chain to begin once the transport
- combiner finishes any executions in its current batch (which may be
- scheduled AFTER this code executes). The write chain will:
+ Each time we finish the global lock execution, we check if we need to
+ write. If we do:
+ - (if there is a poller surrounding the write) schedule
+ initiate_writing, which locks and calls initiate_writing_locked to...
- call start_writing, which verifies (under the global lock) that there
are things that need to be written by calling
grpc_chttp2_unlocking_check_writes, and if so schedules writing_action
@@ -724,28 +830,31 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
to do *another* write immediately, and if so loops back to
start_writing.
- Current problems:
+ Current problems:
- too much lock entry/exiting
- the writing thread can become stuck indefinitely (punt through the
workqueue periodically to fix) */
grpc_chttp2_transport *t = TRANSPORT_FROM_GLOBAL(transport_global);
switch (t->executor.write_state) {
- case GRPC_CHTTP2_WRITES_CORKED:
- break;
case GRPC_CHTTP2_WRITING_INACTIVE:
- set_write_state(t, GRPC_CHTTP2_WRITE_SCHEDULED, reason);
- REF_TRANSPORT(t, "writing");
- grpc_combiner_execute_finally(exec_ctx, t->executor.combiner,
- &t->initiate_writing, GRPC_ERROR_NONE,
- covered_by_poller);
+ set_write_state(t, covered_by_poller
+ ? GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER
+ : GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER,
+ reason);
break;
- case GRPC_CHTTP2_WRITE_SCHEDULED:
+ case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER:
+ /* nothing to do: write already requested */
+ break;
+ case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER:
if (covered_by_poller) {
/* upgrade to note poller is available to cover the write */
- grpc_combiner_force_async_finally(t->executor.combiner);
+ set_write_state(t, GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER, reason);
}
break;
+ case GRPC_CHTTP2_WRITE_SCHEDULED:
+ /* nothing to do: write already scheduled */
+ break;
case GRPC_CHTTP2_WRITING:
set_write_state(t,
covered_by_poller ? GRPC_CHTTP2_WRITING_STALE_WITH_POLLER
@@ -762,15 +871,15 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
}
break;
}
- GPR_TIMER_END("grpc_chttp2_initiate_write", 0);
}
static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
- GPR_TIMER_BEGIN("start_writing", 0);
- GPR_ASSERT(t->executor.write_state == GRPC_CHTTP2_WRITE_SCHEDULED);
+ GPR_ASSERT(t->executor.write_state == GRPC_CHTTP2_WRITE_SCHEDULED ||
+ t->executor.write_state == GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER);
if (!t->closed &&
grpc_chttp2_unlocking_check_writes(exec_ctx, &t->global, &t->writing)) {
set_write_state(t, GRPC_CHTTP2_WRITING, "start_writing");
+ REF_TRANSPORT(t, "writing");
prevent_endpoint_shutdown(t);
grpc_exec_ctx_sched(exec_ctx, &t->writing_action, GRPC_ERROR_NONE, NULL);
} else {
@@ -781,10 +890,25 @@ static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
set_write_state(t, GRPC_CHTTP2_WRITING_INACTIVE,
"start_writing:nothing_to_write");
}
- end_waiting_for_write(exec_ctx, t, GRPC_ERROR_NONE);
- UNREF_TRANSPORT(exec_ctx, t, "writing");
+ end_waiting_for_write(exec_ctx, t, GRPC_ERROR_NONE, "Nothing to write");
+ if (t->ep && !t->endpoint_reading) {
+ destroy_endpoint(exec_ctx, t);
+ }
}
- GPR_TIMER_END("start_writing", 0);
+}
+
+static void initiate_writing_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s_unused,
+ void *arg_ignored) {
+ start_writing(exec_ctx, t);
+ UNREF_TRANSPORT(exec_ctx, t, "initiate_writing");
+}
+
+static void initiate_writing(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_chttp2_run_with_global_lock(exec_ctx, arg, NULL, initiate_writing_locked,
+ NULL, 0);
}
void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx,
@@ -818,10 +942,15 @@ static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
/* error may be GRPC_ERROR_NONE if there is no error allocated yet.
In that case, use "reason" as the text for a new error. */
static void end_waiting_for_write(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t, grpc_error *error) {
+ grpc_chttp2_transport *t, grpc_error *error,
+ const char *reason) {
grpc_chttp2_stream_global *stream_global;
while (grpc_chttp2_list_pop_closed_waiting_for_writing(&t->global,
&stream_global)) {
+ if (error == GRPC_ERROR_NONE && reason != NULL) {
+ /* create error object. */
+ error = GRPC_ERROR_CREATE(reason);
+ }
fail_pending_writes(exec_ctx, &t->global, stream_global,
GRPC_ERROR_REF(error));
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "finish_writes");
@@ -829,10 +958,12 @@ static void end_waiting_for_write(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_UNREF(error);
}
-static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, void *tp,
- grpc_error *error) {
- GPR_TIMER_BEGIN("terminate_writing_with_lock", 0);
- grpc_chttp2_transport *t = tp;
+static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s_ignored,
+ void *a) {
+ grpc_error *error = a;
+
allow_endpoint_shutdown_locked(exec_ctx, t);
if (error != GRPC_ERROR_NONE) {
@@ -841,46 +972,39 @@ static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, void *tp,
grpc_chttp2_cleanup_writing(exec_ctx, &t->global, &t->writing);
- end_waiting_for_write(exec_ctx, t, GRPC_ERROR_REF(error));
+ end_waiting_for_write(exec_ctx, t, error, NULL);
switch (t->executor.write_state) {
- case GRPC_CHTTP2_WRITES_CORKED:
case GRPC_CHTTP2_WRITING_INACTIVE:
+ case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER:
+ case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER:
case GRPC_CHTTP2_WRITE_SCHEDULED:
GPR_UNREACHABLE_CODE(break);
case GRPC_CHTTP2_WRITING:
- GPR_TIMER_MARK("state=writing", 0);
set_write_state(t, GRPC_CHTTP2_WRITING_INACTIVE, "terminate_writing");
break;
case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER:
- GPR_TIMER_MARK("state=writing_stale_with_poller", 0);
- set_write_state(t, GRPC_CHTTP2_WRITE_SCHEDULED, "terminate_writing");
- REF_TRANSPORT(t, "writing");
- grpc_combiner_execute_finally(exec_ctx, t->executor.combiner,
- &t->initiate_writing, GRPC_ERROR_NONE,
- true);
+ set_write_state(t, GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER,
+ "terminate_writing");
break;
case GRPC_CHTTP2_WRITING_STALE_NO_POLLER:
- GPR_TIMER_MARK("state=writing_stale_no_poller", 0);
- set_write_state(t, GRPC_CHTTP2_WRITE_SCHEDULED, "terminate_writing");
- REF_TRANSPORT(t, "writing");
- grpc_combiner_execute_finally(exec_ctx, t->executor.combiner,
- &t->initiate_writing, GRPC_ERROR_NONE,
- false);
+ set_write_state(t, GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER,
+ "terminate_writing");
break;
}
+ if (t->ep && !t->endpoint_reading) {
+ destroy_endpoint(exec_ctx, t);
+ }
+
UNREF_TRANSPORT(exec_ctx, t, "writing");
- GPR_TIMER_END("terminate_writing_with_lock", 0);
}
void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx,
void *transport_writing, grpc_error *error) {
- GPR_TIMER_BEGIN("grpc_chttp2_terminate_writing", 0);
grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing);
- grpc_combiner_execute(exec_ctx, t->executor.combiner, &t->terminate_writing,
- GRPC_ERROR_REF(error));
- GPR_TIMER_END("grpc_chttp2_terminate_writing", 0);
+ grpc_chttp2_run_with_global_lock(
+ exec_ctx, t, NULL, terminate_writing_with_lock, GRPC_ERROR_REF(error), 0);
}
static void writing_action(grpc_exec_ctx *exec_ctx, void *gt,
@@ -1024,22 +1148,15 @@ static int contains_non_ok_status(
static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {}
-static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
- grpc_error *error_ignored) {
+static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s, void *stream_op) {
GPR_TIMER_BEGIN("perform_stream_op_locked", 0);
grpc_transport_stream_op *op = stream_op;
- grpc_chttp2_transport *t = op->transport_private.args[0];
- grpc_chttp2_stream *s = op->transport_private.args[1];
grpc_chttp2_transport_global *transport_global = &t->global;
grpc_chttp2_stream_global *stream_global = &s->global;
- if (grpc_http_trace) {
- char *str = grpc_transport_stream_op_string(op);
- gpr_log(GPR_DEBUG, "perform_stream_op_locked: %s", str);
- gpr_free(str);
- }
-
grpc_closure *on_complete = op->on_complete;
if (on_complete == NULL) {
on_complete = grpc_closure_create(do_nothing, NULL);
@@ -1094,8 +1211,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
} else {
if (contains_non_ok_status(transport_global, op->send_initial_metadata)) {
stream_global->seen_error = true;
- grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
- stream_global);
+ grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
if (!stream_global->write_closed) {
if (transport_global->is_client) {
@@ -1162,8 +1278,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
if (contains_non_ok_status(transport_global,
op->send_trailing_metadata)) {
stream_global->seen_error = true;
- grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
- stream_global);
+ grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
if (stream_global->write_closed) {
stream_global->send_trailing_metadata = NULL;
@@ -1188,8 +1303,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
stream_global->recv_initial_metadata_ready =
op->recv_initial_metadata_ready;
stream_global->recv_initial_metadata = op->recv_initial_metadata;
- grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
- stream_global);
+ grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
if (op->recv_message != NULL) {
@@ -1203,8 +1317,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
exec_ctx, transport_global, stream_global,
transport_global->stream_lookahead, 0);
}
- grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
- stream_global);
+ grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
if (op->recv_trailing_metadata != NULL) {
@@ -1213,30 +1326,21 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
add_closure_barrier(on_complete);
stream_global->recv_trailing_metadata = op->recv_trailing_metadata;
stream_global->final_metadata_requested = true;
- grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
- stream_global);
+ grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
grpc_chttp2_complete_closure_step(exec_ctx, transport_global, stream_global,
&on_complete, GRPC_ERROR_NONE);
GPR_TIMER_END("perform_stream_op_locked", 0);
- GRPC_CHTTP2_STREAM_UNREF(exec_ctx, &s->global, "perform_stream_op");
}
static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_transport_stream_op *op) {
- GPR_TIMER_BEGIN("perform_stream_op", 0);
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
- grpc_closure_init(&op->transport_private.closure, perform_stream_op_locked,
- op);
- op->transport_private.args[0] = gt;
- op->transport_private.args[1] = gs;
- GRPC_CHTTP2_STREAM_REF(&s->global, "perform_stream_op");
- grpc_combiner_execute(exec_ctx, t->executor.combiner,
- &op->transport_private.closure, GRPC_ERROR_NONE);
- GPR_TIMER_END("perform_stream_op", 0);
+ grpc_chttp2_run_with_global_lock(exec_ctx, t, s, perform_stream_op_locked, op,
+ sizeof(*op));
}
static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
@@ -1258,20 +1362,13 @@ static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_initiate_write(exec_ctx, &t->global, true, "send_ping");
}
-typedef struct ack_ping_args {
- grpc_closure closure;
- grpc_chttp2_transport *t;
- uint8_t opaque_8bytes[8];
-} ack_ping_args;
-
-static void ack_ping_locked(grpc_exec_ctx *exec_ctx, void *a,
- grpc_error *error_ignored) {
- ack_ping_args *args = a;
+static void ack_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s, void *opaque_8bytes) {
grpc_chttp2_outstanding_ping *ping;
- grpc_chttp2_transport_global *transport_global = &args->t->global;
+ grpc_chttp2_transport_global *transport_global = &t->global;
for (ping = transport_global->pings.next; ping != &transport_global->pings;
ping = ping->next) {
- if (0 == memcmp(args->opaque_8bytes, ping->id, 8)) {
+ if (0 == memcmp(opaque_8bytes, ping->id, 8)) {
grpc_exec_ctx_sched(exec_ctx, ping->on_recv, GRPC_ERROR_NONE, NULL);
ping->next->prev = ping->prev;
ping->prev->next = ping->next;
@@ -1279,27 +1376,21 @@ static void ack_ping_locked(grpc_exec_ctx *exec_ctx, void *a,
break;
}
}
- UNREF_TRANSPORT(exec_ctx, args->t, "ack_ping");
- gpr_free(args);
}
void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_parsing *transport_parsing,
const uint8_t *opaque_8bytes) {
- ack_ping_args *args = gpr_malloc(sizeof(*args));
- args->t = TRANSPORT_FROM_PARSING(transport_parsing);
- memcpy(args->opaque_8bytes, opaque_8bytes, sizeof(args->opaque_8bytes));
- grpc_closure_init(&args->closure, ack_ping_locked, args);
- REF_TRANSPORT(args->t, "ack_ping");
- grpc_combiner_execute(exec_ctx, args->t->executor.combiner, &args->closure,
- GRPC_ERROR_NONE);
+ grpc_chttp2_run_with_global_lock(
+ exec_ctx, TRANSPORT_FROM_PARSING(transport_parsing), NULL,
+ ack_ping_locked, (void *)opaque_8bytes, 8);
}
static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
- void *stream_op,
- grpc_error *error_ignored) {
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s_unused,
+ void *stream_op) {
grpc_transport_op *op = stream_op;
- grpc_chttp2_transport *t = op->transport_private.args[0];
grpc_error *close_transport = op->disconnect_with_error;
/* If there's a set_accept_stream ensure that we're not parsing
@@ -1311,6 +1402,8 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
return;
}
+ grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL);
+
if (op->on_connectivity_state_change != NULL) {
grpc_connectivity_state_notify_on_state_change(
exec_ctx, &t->channel_callback.state_tracker, op->connectivity_state,
@@ -1336,11 +1429,11 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
}
if (op->bind_pollset) {
- grpc_endpoint_add_to_pollset(exec_ctx, t->ep, op->bind_pollset);
+ add_to_pollset_locked(exec_ctx, t, NULL, op->bind_pollset);
}
if (op->bind_pollset_set) {
- grpc_endpoint_add_to_pollset_set(exec_ctx, t->ep, op->bind_pollset_set);
+ add_to_pollset_set_locked(exec_ctx, t, NULL, op->bind_pollset_set);
}
if (op->send_ping) {
@@ -1350,21 +1443,13 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
if (close_transport != GRPC_ERROR_NONE) {
close_transport_locked(exec_ctx, t, close_transport);
}
-
- grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL);
-
- UNREF_TRANSPORT(exec_ctx, t, "transport_op");
}
static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_transport_op *op) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
- op->transport_private.args[0] = gt;
- grpc_closure_init(&op->transport_private.closure, perform_transport_op_locked,
- op);
- REF_TRANSPORT(t, "transport_op");
- grpc_combiner_execute(exec_ctx, t->executor.combiner,
- &op->transport_private.closure, GRPC_ERROR_NONE);
+ grpc_chttp2_run_with_global_lock(
+ exec_ctx, t, NULL, perform_transport_op_locked, op, sizeof(*op));
}
/*******************************************************************************
@@ -1373,7 +1458,6 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
static void check_read_ops(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global) {
- GPR_TIMER_BEGIN("check_read_ops", 0);
grpc_chttp2_stream_global *stream_global;
grpc_byte_stream *bs;
while (
@@ -1383,7 +1467,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
if (stream_global->seen_error) {
while ((bs = grpc_chttp2_incoming_frame_queue_pop(
&stream_global->incoming_frames)) != NULL) {
- incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
+ incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs);
}
if (stream_global->exceeded_metadata_size) {
cancel_from_api(
@@ -1406,7 +1490,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
stream_global->seen_error &&
(bs = grpc_chttp2_incoming_frame_queue_pop(
&stream_global->incoming_frames)) != NULL) {
- incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
+ incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs);
}
if (stream_global->incoming_frames.head != NULL) {
*stream_global->recv_message = grpc_chttp2_incoming_frame_queue_pop(
@@ -1427,7 +1511,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
if (stream_global->seen_error) {
while ((bs = grpc_chttp2_incoming_frame_queue_pop(
&stream_global->incoming_frames)) != NULL) {
- incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
+ incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs);
}
if (stream_global->exceeded_metadata_size) {
cancel_from_api(
@@ -1448,7 +1532,6 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
}
}
}
- GPR_TIMER_END("check_read_ops", 0);
}
static void decrement_active_streams_locked(
@@ -1456,8 +1539,7 @@ static void decrement_active_streams_locked(
grpc_chttp2_stream_global *stream_global) {
if ((stream_global->all_incoming_byte_streams_finished =
gpr_unref(&stream_global->active_streams))) {
- grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
- stream_global);
+ grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
}
@@ -1561,8 +1643,7 @@ static void cancel_from_api(grpc_exec_ctx *exec_ctx,
}
if (due_to_error != GRPC_ERROR_NONE && !stream_global->seen_error) {
stream_global->seen_error = true;
- grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
- stream_global);
+ grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, 1,
1, due_to_error);
@@ -1574,8 +1655,7 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx,
grpc_status_code status, gpr_slice *slice) {
if (status != GRPC_STATUS_OK) {
stream_global->seen_error = true;
- grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
- stream_global);
+ grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
/* stream_global->recv_trailing_metadata_finished gives us a
last chance replacement: we've received trailing metadata,
@@ -1599,8 +1679,7 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx,
grpc_mdstr_from_slice(gpr_slice_ref(*slice))));
}
stream_global->published_trailing_metadata = true;
- grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
- stream_global);
+ grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
if (slice) {
gpr_slice_unref(*slice);
@@ -1660,8 +1739,7 @@ void grpc_chttp2_mark_stream_closed(
GRPC_ERROR_UNREF(error);
return;
}
- grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
- stream_global);
+ grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
if (close_reads && !stream_global->read_closed) {
stream_global->read_closed_error = GRPC_ERROR_REF(error);
stream_global->read_closed = true;
@@ -1842,10 +1920,6 @@ static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_error *error) {
- if (!grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, NULL)) {
- error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
- GRPC_STATUS_UNAVAILABLE);
- }
close_transport_locked(exec_ctx, t, GRPC_ERROR_REF(error));
end_all_the_calls(exec_ctx, t, error);
}
@@ -1881,12 +1955,16 @@ static void update_global_window(void *args, uint32_t id, void *stream) {
* INPUT PROCESSING - PARSING
*/
+static void reading_action_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s_unused, void *arg);
static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
-static void post_reading_action_locked(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error);
-static void post_parse_locked(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error);
+static void post_reading_action_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s_unused, void *arg);
+static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s_unused, void *arg);
static void reading_action(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error) {
@@ -1894,20 +1972,16 @@ static void reading_action(grpc_exec_ctx *exec_ctx, void *tp,
reading_action_locked ->
(parse_unlocked -> post_parse_locked)? ->
post_reading_action_locked */
- GPR_TIMER_BEGIN("reading_action", 0);
- grpc_chttp2_transport *t = tp;
- grpc_combiner_execute(exec_ctx, t->executor.combiner,
- &t->reading_action_locked, GRPC_ERROR_REF(error));
- GPR_TIMER_END("reading_action", 0);
+ grpc_chttp2_run_with_global_lock(exec_ctx, tp, NULL, reading_action_locked,
+ GRPC_ERROR_REF(error), 0);
}
-static void reading_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
- grpc_error *error) {
- GPR_TIMER_BEGIN("reading_action_locked", 0);
-
- grpc_chttp2_transport *t = tp;
+static void reading_action_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s_unused, void *arg) {
grpc_chttp2_transport_global *transport_global = &t->global;
grpc_chttp2_transport_parsing *transport_parsing = &t->parsing;
+ grpc_error *error = arg;
GPR_ASSERT(!t->executor.parsing_active);
if (!t->closed) {
@@ -1916,13 +1990,10 @@ static void reading_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_chttp2_stream_map_move_into(&t->new_stream_map,
&t->parsing_stream_map);
grpc_chttp2_prepare_to_read(transport_global, transport_parsing);
- grpc_exec_ctx_sched(exec_ctx, &t->parsing_action, GRPC_ERROR_REF(error),
- NULL);
+ grpc_exec_ctx_sched(exec_ctx, &t->parsing_action, error, NULL);
} else {
- post_reading_action_locked(exec_ctx, t, error);
+ post_reading_action_locked(exec_ctx, t, s_unused, arg);
}
-
- GPR_TIMER_END("reading_action_locked", 0);
}
static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx,
@@ -1974,15 +2045,13 @@ static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg,
for (i = 0; i < GPR_ARRAY_SIZE(errors); i++) {
GRPC_ERROR_UNREF(errors[i]);
}
- grpc_combiner_execute(exec_ctx, t->executor.combiner, &t->post_parse_locked,
- err);
GPR_TIMER_END("reading_action.parse", 0);
+ grpc_chttp2_run_with_global_lock(exec_ctx, t, NULL, post_parse_locked, err,
+ 0);
}
-static void post_parse_locked(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- GPR_TIMER_BEGIN("post_parse_locked", 0);
- grpc_chttp2_transport *t = arg;
+static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s_unused, void *arg) {
grpc_chttp2_transport_global *transport_global = &t->global;
grpc_chttp2_transport_parsing *transport_parsing = &t->parsing;
/* copy parsing qbuf to global qbuf */
@@ -2008,7 +2077,7 @@ static void post_parse_locked(grpc_exec_ctx *exec_ctx, void *arg,
if (t->post_parsing_op) {
grpc_transport_op *op = t->post_parsing_op;
t->post_parsing_op = NULL;
- perform_transport_op_locked(exec_ctx, op, GRPC_ERROR_NONE);
+ perform_transport_op_locked(exec_ctx, t, NULL, op);
gpr_free(op);
}
/* if a stream is in the stream map, and gets cancelled, we need to
@@ -2025,32 +2094,39 @@ static void post_parse_locked(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2");
}
- post_reading_action_locked(exec_ctx, t, error);
- GPR_TIMER_END("post_parse_locked", 0);
+ post_reading_action_locked(exec_ctx, t, s_unused, arg);
}
-static void post_reading_action_locked(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- GPR_TIMER_BEGIN("post_reading_action_locked", 0);
- grpc_chttp2_transport *t = arg;
+static void post_reading_action_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s_unused,
+ void *arg) {
+ grpc_error *error = arg;
bool keep_reading = false;
- GRPC_ERROR_REF(error);
if (error == GRPC_ERROR_NONE && t->closed) {
error = GRPC_ERROR_CREATE("Transport closed");
}
if (error != GRPC_ERROR_NONE) {
+ if (!grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, NULL)) {
+ error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
+ GRPC_STATUS_UNAVAILABLE);
+ }
drop_connection(exec_ctx, t, GRPC_ERROR_REF(error));
t->endpoint_reading = 0;
if (grpc_http_write_state_trace) {
gpr_log(GPR_DEBUG, "R:%p -> 0 ws=%s", t,
write_state_name(t->executor.write_state));
}
+ if (t->executor.write_state == GRPC_CHTTP2_WRITING_INACTIVE && t->ep) {
+ destroy_endpoint(exec_ctx, t);
+ }
} else if (!t->closed) {
keep_reading = true;
REF_TRANSPORT(t, "keep_reading");
prevent_endpoint_shutdown(t);
}
gpr_slice_buffer_reset_and_unref(&t->read_buffer);
+ GRPC_ERROR_UNREF(error);
if (keep_reading) {
grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, &t->reading_action);
@@ -2059,9 +2135,6 @@ static void post_reading_action_locked(grpc_exec_ctx *exec_ctx, void *arg,
} else {
UNREF_TRANSPORT(exec_ctx, t, "reading_action");
}
- GRPC_ERROR_UNREF(error);
-
- GPR_TIMER_END("post_reading_action_locked", 0);
}
/*******************************************************************************
@@ -2083,16 +2156,36 @@ static void connectivity_state_set(
* POLLSET STUFF
*/
+static void add_to_pollset_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s_unused, void *pollset) {
+ if (t->ep) {
+ grpc_endpoint_add_to_pollset(exec_ctx, t->ep, pollset);
+ }
+}
+
+static void add_to_pollset_set_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s_unused,
+ void *pollset_set) {
+ if (t->ep) {
+ grpc_endpoint_add_to_pollset_set(exec_ctx, t->ep, pollset_set);
+ }
+}
+
static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_pollset *pollset) {
- grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
- grpc_endpoint_add_to_pollset(exec_ctx, t->ep, pollset);
+ /* TODO(ctiller): keep pollset alive */
+ grpc_chttp2_run_with_global_lock(exec_ctx, (grpc_chttp2_transport *)gt,
+ (grpc_chttp2_stream *)gs,
+ add_to_pollset_locked, pollset, 0);
}
static void set_pollset_set(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_pollset_set *pollset_set) {
- grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
- grpc_endpoint_add_to_pollset_set(exec_ctx, t->ep, pollset_set);
+ grpc_chttp2_run_with_global_lock(exec_ctx, (grpc_chttp2_transport *)gt,
+ (grpc_chttp2_stream *)gs,
+ add_to_pollset_set_locked, pollset_set, 0);
}
/*******************************************************************************
@@ -2104,7 +2197,6 @@ static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx,
if (gpr_unref(&bs->refs)) {
GRPC_ERROR_UNREF(bs->error);
gpr_slice_buffer_destroy(&bs->slices);
- gpr_mu_destroy(&bs->slice_mu);
gpr_free(bs);
}
}
@@ -2150,34 +2242,38 @@ static void incoming_byte_stream_update_flow_control(
}
}
+typedef struct {
+ grpc_chttp2_incoming_byte_stream *byte_stream;
+ gpr_slice *slice;
+ size_t max_size_hint;
+ grpc_closure *on_complete;
+} incoming_byte_stream_next_arg;
+
static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx,
- void *argp,
- grpc_error *error_ignored) {
- grpc_chttp2_incoming_byte_stream *bs = argp;
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s,
+ void *argp) {
+ incoming_byte_stream_next_arg *arg = argp;
+ grpc_chttp2_incoming_byte_stream *bs =
+ (grpc_chttp2_incoming_byte_stream *)arg->byte_stream;
grpc_chttp2_transport_global *transport_global = &bs->transport->global;
grpc_chttp2_stream_global *stream_global = &bs->stream->global;
if (bs->is_tail) {
- gpr_mu_lock(&bs->slice_mu);
- size_t cur_length = bs->slices.length;
- gpr_mu_unlock(&bs->slice_mu);
- incoming_byte_stream_update_flow_control(
- exec_ctx, transport_global, stream_global,
- bs->next_action.max_size_hint, cur_length);
- }
- gpr_mu_lock(&bs->slice_mu);
+ incoming_byte_stream_update_flow_control(exec_ctx, transport_global,
+ stream_global, arg->max_size_hint,
+ bs->slices.length);
+ }
if (bs->slices.count > 0) {
- *bs->next_action.slice = gpr_slice_buffer_take_first(&bs->slices);
- grpc_exec_ctx_sched(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE,
- NULL);
+ *arg->slice = gpr_slice_buffer_take_first(&bs->slices);
+ grpc_exec_ctx_sched(exec_ctx, arg->on_complete, GRPC_ERROR_NONE, NULL);
} else if (bs->error != GRPC_ERROR_NONE) {
- grpc_exec_ctx_sched(exec_ctx, bs->next_action.on_complete,
- GRPC_ERROR_REF(bs->error), NULL);
+ grpc_exec_ctx_sched(exec_ctx, arg->on_complete, GRPC_ERROR_REF(bs->error),
+ NULL);
} else {
- bs->on_next = bs->next_action.on_complete;
- bs->next = bs->next_action.slice;
+ bs->on_next = arg->on_complete;
+ bs->next = arg->slice;
}
- gpr_mu_unlock(&bs->slice_mu);
incoming_byte_stream_unref(exec_ctx, bs);
}
@@ -2185,18 +2281,13 @@ static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream,
gpr_slice *slice, size_t max_size_hint,
grpc_closure *on_complete) {
- GPR_TIMER_BEGIN("incoming_byte_stream_next", 0);
grpc_chttp2_incoming_byte_stream *bs =
(grpc_chttp2_incoming_byte_stream *)byte_stream;
+ incoming_byte_stream_next_arg arg = {bs, slice, max_size_hint, on_complete};
gpr_ref(&bs->refs);
- bs->next_action.slice = slice;
- bs->next_action.max_size_hint = max_size_hint;
- bs->next_action.on_complete = on_complete;
- grpc_closure_init(&bs->next_action.closure, incoming_byte_stream_next_locked,
- bs);
- grpc_combiner_execute(exec_ctx, bs->transport->executor.combiner,
- &bs->next_action.closure, GRPC_ERROR_NONE);
- GPR_TIMER_END("incoming_byte_stream_next", 0);
+ grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream,
+ incoming_byte_stream_next_locked, &arg,
+ sizeof(arg));
return 0;
}
@@ -2204,8 +2295,9 @@ static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream);
static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
- void *byte_stream,
- grpc_error *error_ignored) {
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s,
+ void *byte_stream) {
grpc_chttp2_incoming_byte_stream *bs = byte_stream;
GPR_ASSERT(bs->base.destroy == incoming_byte_stream_destroy);
decrement_active_streams_locked(exec_ctx, &bs->transport->global,
@@ -2215,14 +2307,10 @@ static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream) {
- GPR_TIMER_BEGIN("incoming_byte_stream_destroy", 0);
grpc_chttp2_incoming_byte_stream *bs =
(grpc_chttp2_incoming_byte_stream *)byte_stream;
- grpc_closure_init(&bs->destroy_action, incoming_byte_stream_destroy_locked,
- bs);
- grpc_combiner_execute(exec_ctx, bs->transport->executor.combiner,
- &bs->destroy_action, GRPC_ERROR_NONE);
- GPR_TIMER_END("incoming_byte_stream_destroy", 0);
+ grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream,
+ incoming_byte_stream_destroy_locked, bs, 0);
}
typedef struct {
@@ -2230,45 +2318,90 @@ typedef struct {
gpr_slice slice;
} incoming_byte_stream_push_arg;
-void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_incoming_byte_stream *bs,
- gpr_slice slice) {
- gpr_mu_lock(&bs->slice_mu);
+static void incoming_byte_stream_push_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s,
+ void *argp) {
+ incoming_byte_stream_push_arg *arg = argp;
+ grpc_chttp2_incoming_byte_stream *bs = arg->byte_stream;
if (bs->on_next != NULL) {
- *bs->next = slice;
+ *bs->next = arg->slice;
grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_NONE, NULL);
bs->on_next = NULL;
} else {
- gpr_slice_buffer_add(&bs->slices, slice);
+ gpr_slice_buffer_add(&bs->slices, arg->slice);
}
- gpr_mu_unlock(&bs->slice_mu);
+ incoming_byte_stream_unref(exec_ctx, bs);
}
-static void incoming_byte_stream_finished_locked(grpc_exec_ctx *exec_ctx,
- void *bsp, grpc_error *error) {
- grpc_chttp2_incoming_byte_stream *bs = bsp;
- if (error != GRPC_ERROR_NONE) {
- grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_REF(error), NULL);
- bs->on_next = NULL;
- GRPC_ERROR_UNREF(bs->error);
- bs->error = error;
- }
+void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_incoming_byte_stream *bs,
+ gpr_slice slice) {
+ incoming_byte_stream_push_arg arg = {bs, slice};
+ gpr_ref(&bs->refs);
+ grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream,
+ incoming_byte_stream_push_locked, &arg,
+ sizeof(arg));
+}
+
+typedef struct {
+ grpc_chttp2_incoming_byte_stream *bs;
+ grpc_error *error;
+} bs_fail_args;
+
+static bs_fail_args *make_bs_fail_args(grpc_chttp2_incoming_byte_stream *bs,
+ grpc_error *error) {
+ bs_fail_args *a = gpr_malloc(sizeof(*a));
+ a->bs = bs;
+ a->error = error;
+ return a;
+}
+
+static void incoming_byte_stream_finished_failed_locked(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s,
+ void *argp) {
+ bs_fail_args *a = argp;
+ grpc_chttp2_incoming_byte_stream *bs = a->bs;
+ grpc_error *error = a->error;
+ gpr_free(a);
+ grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_REF(error), NULL);
+ bs->on_next = NULL;
+ GRPC_ERROR_UNREF(bs->error);
+ bs->error = error;
+ incoming_byte_stream_unref(exec_ctx, bs);
+}
+
+static void incoming_byte_stream_finished_ok_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s,
+ void *argp) {
+ grpc_chttp2_incoming_byte_stream *bs = argp;
incoming_byte_stream_unref(exec_ctx, bs);
}
void grpc_chttp2_incoming_byte_stream_finished(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
grpc_error *error, int from_parsing_thread) {
- GPR_TIMER_BEGIN("grpc_chttp2_incoming_byte_stream_finished", 0);
if (from_parsing_thread) {
- grpc_closure_init(&bs->finished_action,
- incoming_byte_stream_finished_locked, bs);
- grpc_combiner_execute(exec_ctx, bs->transport->executor.combiner,
- &bs->finished_action, GRPC_ERROR_REF(error));
+ if (error == GRPC_ERROR_NONE) {
+ grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream,
+ incoming_byte_stream_finished_ok_locked,
+ bs, 0);
+ } else {
+ grpc_chttp2_run_with_global_lock(
+ exec_ctx, bs->transport, bs->stream,
+ incoming_byte_stream_finished_failed_locked,
+ make_bs_fail_args(bs, error), 0);
+ }
} else {
- incoming_byte_stream_finished_locked(exec_ctx, bs, error);
+ if (error == GRPC_ERROR_NONE) {
+ incoming_byte_stream_finished_ok_locked(exec_ctx, bs->transport,
+ bs->stream, bs);
+ } else {
+ incoming_byte_stream_finished_failed_locked(
+ exec_ctx, bs->transport, bs->stream, make_bs_fail_args(bs, error));
+ }
}
- GPR_TIMER_END("grpc_chttp2_incoming_byte_stream_finished", 0);
}
grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
@@ -2281,7 +2414,6 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
incoming_byte_stream->base.flags = flags;
incoming_byte_stream->base.next = incoming_byte_stream_next;
incoming_byte_stream->base.destroy = incoming_byte_stream_destroy;
- gpr_mu_init(&incoming_byte_stream->slice_mu);
gpr_ref_init(&incoming_byte_stream->refs, 2);
incoming_byte_stream->next_message = NULL;
incoming_byte_stream->transport = TRANSPORT_FROM_PARSING(transport_parsing);
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 04b788b702..d67c014e54 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -48,7 +48,6 @@
#include "src/core/ext/transport/chttp2/transport/hpack_parser.h"
#include "src/core/ext/transport/chttp2/transport/incoming_metadata.h"
#include "src/core/ext/transport/chttp2/transport/stream_map.h"
-#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/transport_impl.h"
@@ -162,20 +161,9 @@ struct grpc_chttp2_incoming_byte_stream {
grpc_chttp2_transport *transport;
grpc_chttp2_stream *stream;
int is_tail;
-
- gpr_mu slice_mu; // protects slices, on_next
gpr_slice_buffer slices;
grpc_closure *on_next;
gpr_slice *next;
-
- struct {
- grpc_closure closure;
- gpr_slice *slice;
- size_t max_size_hint;
- grpc_closure *on_complete;
- } next_action;
- grpc_closure destroy_action;
- grpc_closure finished_action;
};
typedef struct {
@@ -308,11 +296,23 @@ struct grpc_chttp2_transport_parsing {
int64_t outgoing_window;
};
+typedef void (*grpc_chttp2_locked_action)(grpc_exec_ctx *ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s, void *arg);
+
+typedef struct grpc_chttp2_executor_action_header {
+ grpc_chttp2_stream *stream;
+ grpc_chttp2_locked_action action;
+ struct grpc_chttp2_executor_action_header *next;
+ void *arg;
+} grpc_chttp2_executor_action_header;
+
typedef enum {
- /** no writing activity allowed */
- GRPC_CHTTP2_WRITES_CORKED,
/** no writing activity */
GRPC_CHTTP2_WRITING_INACTIVE,
+ /** write has been requested, but not scheduled yet */
+ GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER,
+ GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER,
/** write has been requested and scheduled against the workqueue */
GRPC_CHTTP2_WRITE_SCHEDULED,
/** write has been initiated after being reaped from the workqueue */
@@ -333,7 +333,7 @@ struct grpc_chttp2_transport {
gpr_refcount shutdown_ep_refs;
struct {
- grpc_combiner *combiner;
+ gpr_mu mu;
/** is a thread currently in the global lock */
bool global_active;
@@ -341,8 +341,9 @@ struct grpc_chttp2_transport {
bool parsing_active;
/** write execution state of the transport */
grpc_chttp2_write_state write_state;
- /** has a check_read_ops been scheduled */
- bool check_read_ops_scheduled;
+
+ grpc_chttp2_executor_action_header *pending_actions_head;
+ grpc_chttp2_executor_action_header *pending_actions_tail;
} executor;
/** is the transport destroying itself? */
@@ -379,16 +380,10 @@ struct grpc_chttp2_transport {
grpc_closure writing_action;
/** closure to start reading from the endpoint */
grpc_closure reading_action;
- grpc_closure reading_action_locked;
- grpc_closure post_parse_locked;
/** closure to actually do parsing */
grpc_closure parsing_action;
/** closure to initiate writing */
grpc_closure initiate_writing;
- /** closure to finish writing */
- grpc_closure terminate_writing;
- /** closure to flush read state up the stack */
- grpc_closure initiate_read_flush_locked;
/** incoming read bytes */
gpr_slice_buffer read_buffer;
@@ -532,16 +527,11 @@ struct grpc_chttp2_stream_parsing {
};
struct grpc_chttp2_stream {
- grpc_chttp2_transport *t;
grpc_stream_refcount *refcount;
grpc_chttp2_stream_global global;
grpc_chttp2_stream_writing writing;
grpc_chttp2_stream_parsing parsing;
- grpc_closure init_stream;
- grpc_closure destroy_stream;
- void *destroy_stream_arg;
-
grpc_chttp2_stream_link links[STREAM_LIST_COUNT];
uint8_t included[STREAM_LIST_COUNT];
};
@@ -636,7 +626,7 @@ int grpc_chttp2_list_pop_waiting_for_concurrency(
grpc_chttp2_stream_global **stream_global);
void grpc_chttp2_list_add_check_read_ops(
- grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);
bool grpc_chttp2_list_remove_check_read_ops(
grpc_chttp2_transport_global *transport_global,
@@ -716,6 +706,12 @@ void grpc_chttp2_complete_closure_step(
grpc_chttp2_stream_global *stream_global, grpc_closure **pclosure,
grpc_error *error);
+void grpc_chttp2_run_with_global_lock(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *transport,
+ grpc_chttp2_stream *optional_stream,
+ grpc_chttp2_locked_action action,
+ void *arg, size_t sizeof_arg);
+
#define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
#define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \
(sizeof(GRPC_CHTTP2_CLIENT_CONNECT_STRING) - 1)
diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c
index 0e6d579ba9..482cd55c44 100644
--- a/src/core/ext/transport/chttp2/transport/parsing.c
+++ b/src/core/ext/transport/chttp2/transport/parsing.c
@@ -177,8 +177,7 @@ void grpc_chttp2_publish_reads(
stream_global->seen_error = true;
stream_global->exceeded_metadata_size =
stream_parsing->exceeded_metadata_size;
- grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
- stream_global);
+ grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
/* flush stats to global stream state */
@@ -204,8 +203,7 @@ void grpc_chttp2_publish_reads(
stream_global->incoming_frames.tail->is_tail = 0;
}
if (stream_parsing->data_parser.incoming_frames.head != NULL) {
- grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
- stream_global);
+ grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
grpc_chttp2_incoming_frame_queue_merge(
&stream_global->incoming_frames,
@@ -221,8 +219,7 @@ void grpc_chttp2_publish_reads(
GPR_SWAP(grpc_chttp2_incoming_metadata_buffer,
stream_parsing->metadata_buffer[0],
stream_global->received_initial_metadata);
- grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
- stream_global);
+ grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
if (!stream_global->published_trailing_metadata &&
stream_parsing->got_metadata_on_parse[1]) {
@@ -231,8 +228,7 @@ void grpc_chttp2_publish_reads(
GPR_SWAP(grpc_chttp2_incoming_metadata_buffer,
stream_parsing->metadata_buffer[1],
stream_global->received_trailing_metadata);
- grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
- stream_global);
+ grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
if (stream_parsing->forced_close_error != GRPC_ERROR_NONE) {
diff --git a/src/core/ext/transport/chttp2/transport/stream_lists.c b/src/core/ext/transport/chttp2/transport/stream_lists.c
index 4dc4968248..2eb5f5f632 100644
--- a/src/core/ext/transport/chttp2/transport/stream_lists.c
+++ b/src/core/ext/transport/chttp2/transport/stream_lists.c
@@ -298,15 +298,8 @@ int grpc_chttp2_list_pop_waiting_for_concurrency(
}
void grpc_chttp2_list_add_check_read_ops(
- grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
- grpc_chttp2_transport *t = TRANSPORT_FROM_GLOBAL(transport_global);
- if (!t->executor.check_read_ops_scheduled) {
- grpc_combiner_execute_finally(exec_ctx, t->executor.combiner,
- &t->initiate_read_flush_locked,
- GRPC_ERROR_NONE, false);
- t->executor.check_read_ops_scheduled = true;
- }
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global),
GRPC_CHTTP2_LIST_CHECK_READ_OPS);
diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c
index 979515bd54..311b26e354 100644
--- a/src/core/ext/transport/chttp2/transport/writing.c
+++ b/src/core/ext/transport/chttp2/transport/writing.c
@@ -55,6 +55,15 @@ int grpc_chttp2_unlocking_check_writes(
transport_global->settings[GRPC_ACKED_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE];
+ /* simple writes are queued to qbuf, and flushed here */
+ gpr_slice_buffer_swap(&transport_global->qbuf, &transport_writing->outbuf);
+ GPR_ASSERT(transport_global->qbuf.count == 0);
+
+ grpc_chttp2_hpack_compressor_set_max_table_size(
+ &transport_writing->hpack_compressor,
+ transport_global->settings[GRPC_PEER_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]);
+
if (transport_global->dirtied_local_settings &&
!transport_global->sent_local_settings) {
gpr_slice_buffer_add(
@@ -68,16 +77,6 @@ int grpc_chttp2_unlocking_check_writes(
transport_global->sent_local_settings = 1;
}
- /* simple writes are queued to qbuf, and flushed here */
- gpr_slice_buffer_move_into(&transport_global->qbuf,
- &transport_writing->outbuf);
- GPR_ASSERT(transport_global->qbuf.count == 0);
-
- grpc_chttp2_hpack_compressor_set_max_table_size(
- &transport_writing->hpack_compressor,
- transport_global->settings[GRPC_PEER_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]);
-
GRPC_CHTTP2_FLOW_MOVE_TRANSPORT("write", transport_writing, outgoing_window,
transport_global, outgoing_window);
if (transport_writing->outgoing_window > 0) {
@@ -345,7 +344,6 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx,
void grpc_chttp2_cleanup_writing(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_transport_writing *transport_writing) {
- GPR_TIMER_BEGIN("grpc_chttp2_cleanup_writing", 0);
grpc_chttp2_stream_writing *stream_writing;
grpc_chttp2_stream_global *stream_global;
@@ -384,5 +382,4 @@ void grpc_chttp2_cleanup_writing(
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2_writing");
}
gpr_slice_buffer_reset_and_unref(&transport_writing->outbuf);
- GPR_TIMER_END("grpc_chttp2_cleanup_writing", 0);
}
diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c
index 0655b9353f..98f304f2da 100644
--- a/src/core/lib/channel/channel_stack.c
+++ b/src/core/lib/channel/channel_stack.c
@@ -32,7 +32,6 @@
*/
#include "src/core/lib/channel/channel_stack.h"
-#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <stdlib.h>
@@ -271,27 +270,21 @@ grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem) {
sizeof(grpc_call_stack)));
}
-static void destroy_op(grpc_exec_ctx *exec_ctx, void *op, grpc_error *error) {
- gpr_free(op);
-}
-
void grpc_call_element_send_cancel(grpc_exec_ctx *exec_ctx,
grpc_call_element *cur_elem) {
- grpc_transport_stream_op *op = gpr_malloc(sizeof(*op));
- memset(op, 0, sizeof(*op));
- op->cancel_error = GRPC_ERROR_CANCELLED;
- op->on_complete = grpc_closure_create(destroy_op, op);
- grpc_call_next_op(exec_ctx, cur_elem, op);
+ grpc_transport_stream_op op;
+ memset(&op, 0, sizeof(op));
+ op.cancel_error = GRPC_ERROR_CANCELLED;
+ grpc_call_next_op(exec_ctx, cur_elem, &op);
}
void grpc_call_element_send_cancel_with_message(grpc_exec_ctx *exec_ctx,
grpc_call_element *cur_elem,
grpc_status_code status,
gpr_slice *optional_message) {
- grpc_transport_stream_op *op = gpr_malloc(sizeof(*op));
- memset(op, 0, sizeof(*op));
- op->on_complete = grpc_closure_create(destroy_op, op);
- grpc_transport_stream_op_add_cancellation_with_message(op, status,
+ grpc_transport_stream_op op;
+ memset(&op, 0, sizeof(op));
+ grpc_transport_stream_op_add_cancellation_with_message(&op, status,
optional_message);
- grpc_call_next_op(exec_ctx, cur_elem, op);
+ grpc_call_next_op(exec_ctx, cur_elem, &op);
}
diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c
index 0981d59f63..134180e619 100644
--- a/src/core/lib/channel/compress_filter.c
+++ b/src/core/lib/channel/compress_filter.c
@@ -60,7 +60,7 @@ typedef struct call_data {
/** If true, contents of \a compression_algorithm are authoritative */
int has_compression_algorithm;
- grpc_transport_stream_op *send_op;
+ grpc_transport_stream_op send_op;
uint32_t send_length;
uint32_t send_flags;
gpr_slice incoming_slice;
@@ -199,11 +199,11 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx,
grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices,
calld->send_flags);
- calld->send_op->send_message = &calld->replacement_stream.base;
- calld->post_send = calld->send_op->on_complete;
- calld->send_op->on_complete = &calld->send_done;
+ calld->send_op.send_message = &calld->replacement_stream.base;
+ calld->post_send = calld->send_op.on_complete;
+ calld->send_op.on_complete = &calld->send_done;
- grpc_call_next_op(exec_ctx, elem, calld->send_op);
+ grpc_call_next_op(exec_ctx, elem, &calld->send_op);
}
static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
@@ -220,7 +220,7 @@ static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
static void continue_send_message(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {
call_data *calld = elem->call_data;
- while (grpc_byte_stream_next(exec_ctx, calld->send_op->send_message,
+ while (grpc_byte_stream_next(exec_ctx, calld->send_op.send_message,
&calld->incoming_slice, ~(size_t)0,
&calld->got_slice)) {
gpr_slice_buffer_add(&calld->slices, calld->incoming_slice);
@@ -243,7 +243,7 @@ static void compress_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
}
if (op->send_message != NULL && !skip_compression(elem) &&
0 == (op->send_message->flags & GRPC_WRITE_NO_COMPRESS)) {
- calld->send_op = op;
+ calld->send_op = *op;
calld->send_length = op->send_message->length;
calld->send_flags = op->send_message->flags;
continue_send_message(exec_ctx, elem);
diff --git a/src/core/lib/iomgr/closure.c b/src/core/lib/iomgr/closure.c
index 1ba0a5c141..0b6c3b2539 100644
--- a/src/core/lib/iomgr/closure.c
+++ b/src/core/lib/iomgr/closure.c
@@ -41,10 +41,6 @@ void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb,
closure->cb_arg = cb_arg;
}
-void grpc_closure_list_init(grpc_closure_list *closure_list) {
- closure_list->head = closure_list->tail = NULL;
-}
-
void grpc_closure_list_append(grpc_closure_list *closure_list,
grpc_closure *closure, grpc_error *error) {
if (closure == NULL) {
diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h
index c1a22b6021..08e59a168e 100644
--- a/src/core/lib/iomgr/closure.h
+++ b/src/core/lib/iomgr/closure.h
@@ -37,7 +37,6 @@
#include <grpc/support/port_platform.h>
#include <stdbool.h>
#include "src/core/lib/iomgr/error.h"
-#include "src/core/lib/support/mpscq.h"
struct grpc_closure;
typedef struct grpc_closure grpc_closure;
@@ -61,14 +60,6 @@ typedef void (*grpc_iomgr_cb_func)(grpc_exec_ctx *exec_ctx, void *arg,
/** A closure over a grpc_iomgr_cb_func. */
struct grpc_closure {
- /** Once queued, next indicates the next queued closure; before then, scratch
- * space */
- union {
- grpc_closure *next;
- gpr_mpscq_node atm_next;
- uintptr_t scratch;
- } next_data;
-
/** Bound callback. */
grpc_iomgr_cb_func cb;
@@ -77,6 +68,13 @@ struct grpc_closure {
/** Once queued, the result of the closure. Before then: scratch space */
grpc_error *error;
+
+ /** Once queued, next indicates the next queued closure; before then, scratch
+ * space */
+ union {
+ grpc_closure *next;
+ uintptr_t scratch;
+ } next_data;
};
/** Initializes \a closure with \a cb and \a cb_arg. */
@@ -89,8 +87,6 @@ grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg);
#define GRPC_CLOSURE_LIST_INIT \
{ NULL, NULL }
-void grpc_closure_list_init(grpc_closure_list *list);
-
/** add \a closure to the end of \a list
and set \a closure's result to \a error */
void grpc_closure_list_append(grpc_closure_list *list, grpc_closure *closure,
diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c
deleted file mode 100644
index 831bdb4aff..0000000000
--- a/src/core/lib/iomgr/combiner.c
+++ /dev/null
@@ -1,293 +0,0 @@
-/*
- *
- * Copyright 2016, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "src/core/lib/iomgr/combiner.h"
-
-#include <string.h>
-
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-
-#include "src/core/lib/iomgr/workqueue.h"
-#include "src/core/lib/profiling/timers.h"
-
-int grpc_combiner_trace = 0;
-
-#define GRPC_COMBINER_TRACE(fn) \
- do { \
- if (grpc_combiner_trace) { \
- fn; \
- } \
- } while (0)
-
-struct grpc_combiner {
- grpc_workqueue *optional_workqueue;
- gpr_mpscq queue;
- // state is:
- // lower bit - zero if orphaned
- // other bits - number of items queued on the lock
- gpr_atm state;
- bool take_async_break_before_final_list;
- grpc_closure_list final_list;
- grpc_closure continue_finishing;
-};
-
-grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) {
- grpc_combiner *lock = gpr_malloc(sizeof(*lock));
- lock->optional_workqueue = optional_workqueue;
- gpr_atm_no_barrier_store(&lock->state, 1);
- gpr_mpscq_init(&lock->queue);
- lock->take_async_break_before_final_list = false;
- grpc_closure_list_init(&lock->final_list);
- GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p create", lock));
- return lock;
-}
-
-static void really_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
- GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p really_destroy", lock));
- GPR_ASSERT(gpr_atm_no_barrier_load(&lock->state) == 0);
- gpr_mpscq_destroy(&lock->queue);
- GRPC_WORKQUEUE_UNREF(exec_ctx, lock->optional_workqueue, "combiner");
- gpr_free(lock);
-}
-
-void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
- gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -1);
- GRPC_COMBINER_TRACE(gpr_log(
- GPR_DEBUG, "C:%p really_destroy old_state=%" PRIdPTR, lock, old_state));
- if (old_state == 1) {
- really_destroy(exec_ctx, lock);
- }
-}
-
-static bool maybe_finish_one(grpc_exec_ctx *exec_ctx, grpc_combiner *lock);
-static void finish(grpc_exec_ctx *exec_ctx, grpc_combiner *lock);
-
-static void continue_finishing_mainline(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- GPR_TIMER_BEGIN("combiner.continue_executing_mainline", 0);
- grpc_combiner *lock = arg;
- GRPC_COMBINER_TRACE(
- gpr_log(GPR_DEBUG, "C:%p continue_finishing_mainline", lock));
- GPR_ASSERT(exec_ctx->active_combiner == NULL);
- exec_ctx->active_combiner = lock;
- if (maybe_finish_one(exec_ctx, lock)) finish(exec_ctx, lock);
- GPR_ASSERT(exec_ctx->active_combiner == lock);
- exec_ctx->active_combiner = NULL;
- GPR_TIMER_END("combiner.continue_executing_mainline", 0);
-}
-
-static void execute_final(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
- GPR_TIMER_BEGIN("combiner.execute_final", 0);
- grpc_closure *c = lock->final_list.head;
- GPR_ASSERT(c != NULL);
- grpc_closure_list_init(&lock->final_list);
- lock->take_async_break_before_final_list = false;
- int loops = 0;
- while (c != NULL) {
- GRPC_COMBINER_TRACE(
- gpr_log(GPR_DEBUG, "C:%p execute_final[%d] c=%p", lock, loops, c));
- grpc_closure *next = c->next_data.next;
- grpc_error *error = c->error;
- c->cb(exec_ctx, c->cb_arg, error);
- GRPC_ERROR_UNREF(error);
- c = next;
- loops++;
- }
- GPR_TIMER_END("combiner.execute_final", 0);
-}
-
-static void continue_executing_final(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- GPR_TIMER_BEGIN("combiner.continue_executing_final", 0);
- grpc_combiner *lock = arg;
- GRPC_COMBINER_TRACE(
- gpr_log(GPR_DEBUG, "C:%p continue_executing_final", lock));
- GPR_ASSERT(exec_ctx->active_combiner == NULL);
- exec_ctx->active_combiner = lock;
- // quick peek to see if new things have turned up on the queue: if so, go back
- // to executing them before the final list
- if ((gpr_atm_acq_load(&lock->state) >> 1) > 1) {
- if (maybe_finish_one(exec_ctx, lock)) finish(exec_ctx, lock);
- } else {
- execute_final(exec_ctx, lock);
- finish(exec_ctx, lock);
- }
- GPR_ASSERT(exec_ctx->active_combiner == lock);
- exec_ctx->active_combiner = NULL;
- GPR_TIMER_END("combiner.continue_executing_final", 0);
-}
-
-static bool start_execute_final(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
- GPR_TIMER_BEGIN("combiner.start_execute_final", 0);
- GPR_ASSERT(exec_ctx->active_combiner == lock);
- GRPC_COMBINER_TRACE(
- gpr_log(GPR_DEBUG,
- "C:%p start_execute_final take_async_break_before_final_list=%d",
- lock, lock->take_async_break_before_final_list));
- if (lock->take_async_break_before_final_list) {
- grpc_closure_init(&lock->continue_finishing, continue_executing_final,
- lock);
- grpc_exec_ctx_sched(exec_ctx, &lock->continue_finishing, GRPC_ERROR_NONE,
- GRPC_WORKQUEUE_REF(lock->optional_workqueue, "sched"));
- GPR_TIMER_END("combiner.start_execute_final", 0);
- return false;
- } else {
- execute_final(exec_ctx, lock);
- GPR_TIMER_END("combiner.start_execute_final", 0);
- return true;
- }
-}
-
-static bool maybe_finish_one(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
- GPR_TIMER_BEGIN("combiner.maybe_finish_one", 0);
- gpr_mpscq_node *n = gpr_mpscq_pop(&lock->queue);
- GRPC_COMBINER_TRACE(
- gpr_log(GPR_DEBUG, "C:%p maybe_finish_one n=%p", lock, n));
- GPR_ASSERT(exec_ctx->active_combiner == lock);
- if (n == NULL) {
- // Queue is in an transiently inconsistent state: a new item is being queued
- // but is not visible to this thread yet.
- // Use this as a cue that we should go off and do something else for a while
- // (and come back later)
- grpc_closure_init(&lock->continue_finishing, continue_finishing_mainline,
- lock);
- grpc_exec_ctx_sched(exec_ctx, &lock->continue_finishing, GRPC_ERROR_NONE,
- GRPC_WORKQUEUE_REF(lock->optional_workqueue, "sched"));
- GPR_TIMER_END("combiner.maybe_finish_one", 0);
- return false;
- }
- grpc_closure *cl = (grpc_closure *)n;
- grpc_error *error = cl->error;
- cl->cb(exec_ctx, cl->cb_arg, error);
- GRPC_ERROR_UNREF(error);
- GPR_TIMER_END("combiner.maybe_finish_one", 0);
- return true;
-}
-
-static void finish(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
- bool (*executor)(grpc_exec_ctx * exec_ctx, grpc_combiner * lock);
- GPR_TIMER_BEGIN("combiner.finish", 0);
- int loops = 0;
- do {
- executor = maybe_finish_one;
- gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -2);
- GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG,
- "C:%p finish[%d] old_state=%" PRIdPTR, lock,
- loops, old_state));
- switch (old_state) {
- default:
- // we have multiple queued work items: just continue executing them
- break;
- case 5: // we're down to one queued item: if it's the final list we
- case 4: // should do that
- if (!grpc_closure_list_empty(lock->final_list)) {
- executor = start_execute_final;
- }
- break;
- case 3: // had one count, one unorphaned --> unlocked unorphaned
- GPR_TIMER_END("combiner.finish", 0);
- return;
- case 2: // and one count, one orphaned --> unlocked and orphaned
- really_destroy(exec_ctx, lock);
- GPR_TIMER_END("combiner.finish", 0);
- return;
- case 1:
- case 0:
- // these values are illegal - representing an already unlocked or
- // deleted lock
- GPR_UNREACHABLE_CODE(return );
- }
- loops++;
- } while (executor(exec_ctx, lock));
- GPR_TIMER_END("combiner.finish", 0);
-}
-
-void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
- grpc_closure *cl, grpc_error *error) {
- GRPC_COMBINER_TRACE(
- gpr_log(GPR_DEBUG, "C:%p grpc_combiner_execute c=%p", lock, cl));
- GPR_TIMER_BEGIN("combiner.execute", 0);
- gpr_atm last = gpr_atm_full_fetch_add(&lock->state, 2);
- GPR_ASSERT(last & 1); // ensure lock has not been destroyed
- if (last == 1) {
- exec_ctx->active_combiner = lock;
- GPR_TIMER_BEGIN("combiner.execute_first_cb", 0);
- cl->cb(exec_ctx, cl->cb_arg, error);
- GPR_TIMER_END("combiner.execute_first_cb", 0);
- GRPC_ERROR_UNREF(error);
- finish(exec_ctx, lock);
- GPR_ASSERT(exec_ctx->active_combiner == lock);
- exec_ctx->active_combiner = NULL;
- } else {
- cl->error = error;
- gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next);
- }
- GPR_TIMER_END("combiner.execute", 0);
-}
-
-static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure,
- grpc_error *error) {
- grpc_combiner_execute_finally(exec_ctx, exec_ctx->active_combiner, closure,
- GRPC_ERROR_REF(error), false);
-}
-
-void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
- grpc_closure *closure, grpc_error *error,
- bool force_async_break) {
- GRPC_COMBINER_TRACE(gpr_log(
- GPR_DEBUG,
- "C:%p grpc_combiner_execute_finally c=%p force_async_break=%d; ac=%p",
- lock, closure, force_async_break, exec_ctx->active_combiner));
- GPR_TIMER_BEGIN("combiner.execute_finally", 0);
- if (exec_ctx->active_combiner != lock) {
- GPR_TIMER_MARK("slowpath", 0);
- grpc_combiner_execute(exec_ctx, lock,
- grpc_closure_create(enqueue_finally, closure), error);
- GPR_TIMER_END("combiner.execute_finally", 0);
- return;
- }
-
- if (force_async_break) {
- lock->take_async_break_before_final_list = true;
- }
- if (grpc_closure_list_empty(lock->final_list)) {
- gpr_atm_full_fetch_add(&lock->state, 2);
- }
- grpc_closure_list_append(&lock->final_list, closure, error);
- GPR_TIMER_END("combiner.execute_finally", 0);
-}
-
-void grpc_combiner_force_async_finally(grpc_combiner *lock) {
- lock->take_async_break_before_final_list = true;
-}
diff --git a/src/core/lib/iomgr/combiner.h b/src/core/lib/iomgr/combiner.h
deleted file mode 100644
index 1409db24b9..0000000000
--- a/src/core/lib/iomgr/combiner.h
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- *
- * Copyright 2016, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef GRPC_CORE_LIB_IOMGR_COMBINER_H
-#define GRPC_CORE_LIB_IOMGR_COMBINER_H
-
-#include <stddef.h>
-
-#include <grpc/support/atm.h>
-#include "src/core/lib/iomgr/exec_ctx.h"
-#include "src/core/lib/support/mpscq.h"
-
-// Provides serialized access to some resource.
-// Each action queued on a combiner is executed serially in a borrowed thread.
-// The actual thread executing actions may change over time (but there will only
-// every be one at a time).
-
-// Initialize the lock, with an optional workqueue to shift load to when
-// necessary
-grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue);
-// Destroy the lock
-void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock);
-// Execute \a action within the lock.
-void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
- grpc_closure *closure, grpc_error *error);
-// Execute \a action within the lock just prior to unlocking.
-// if \a hint_async_break is true, the combiner tries to hand execution to
-// another thread before finishing the primary queue of combined closures and
-// executing the finally list.
-// Deprecation warning: \a hint_async_break will be removed in a future version
-// Takes a very slow and round-about path if not called from a
-// grpc_combiner_execute closure.
-void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
- grpc_closure *closure, grpc_error *error,
- bool hint_async_break);
-// Deprecated: force the finally list execution onto another thread
-void grpc_combiner_force_async_finally(grpc_combiner *lock);
-
-extern int grpc_combiner_trace;
-
-#endif /* GRPC_CORE_LIB_IOMGR_COMBINER_H */
diff --git a/src/core/lib/iomgr/error.c b/src/core/lib/iomgr/error.c
index e366961936..149c55663c 100644
--- a/src/core/lib/iomgr/error.c
+++ b/src/core/lib/iomgr/error.c
@@ -332,7 +332,7 @@ grpc_error *grpc_error_add_child(grpc_error *src, grpc_error *child) {
return new;
}
-static const char *no_error_string = "\"No Error\"";
+static const char *no_error_string = "null";
static const char *oom_error_string = "\"Out of memory\"";
static const char *cancelled_error_string = "\"Cancelled\"";
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index 740920d760..eba347125e 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -1531,8 +1531,6 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_fd *fd) {
- GPR_TIMER_BEGIN("pollset_add_fd", 0);
-
grpc_error *error = GRPC_ERROR_NONE;
gpr_mu_lock(&pollset->mu);
@@ -1645,8 +1643,6 @@ retry:
gpr_mu_unlock(&pollset->mu);
GRPC_LOG_IF_ERROR("pollset_add_fd", error);
-
- GPR_TIMER_END("pollset_add_fd", 0);
}
/*******************************************************************************
diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h
index 1895ee6245..917f332f03 100644
--- a/src/core/lib/iomgr/exec_ctx.h
+++ b/src/core/lib/iomgr/exec_ctx.h
@@ -40,8 +40,8 @@
/** A workqueue represents a list of work to be executed asynchronously.
Forward declared here to avoid a circular dependency with workqueue.h. */
+struct grpc_workqueue;
typedef struct grpc_workqueue grpc_workqueue;
-typedef struct grpc_combiner grpc_combiner;
#ifndef GRPC_EXECUTION_CONTEXT_SANITIZER
/** Execution context.
@@ -66,15 +66,13 @@ typedef struct grpc_combiner grpc_combiner;
*/
struct grpc_exec_ctx {
grpc_closure_list closure_list;
- /** currently active combiner: updated only via combiner.c */
- grpc_combiner *active_combiner;
bool cached_ready_to_finish;
void *check_ready_to_finish_arg;
bool (*check_ready_to_finish)(grpc_exec_ctx *exec_ctx, void *arg);
};
#define GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(finish_check, finish_check_arg) \
- { GRPC_CLOSURE_LIST_INIT, NULL, false, finish_check_arg, finish_check }
+ { GRPC_CLOSURE_LIST_INIT, false, finish_check_arg, finish_check }
#else
struct grpc_exec_ctx {
bool cached_ready_to_finish;
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index 92767721d5..974d5ae479 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -379,19 +379,10 @@ static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
}
if (!tcp_flush(tcp, &error)) {
- if (grpc_tcp_trace) {
- gpr_log(GPR_DEBUG, "write: delayed");
- }
grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure);
} else {
cb = tcp->write_cb;
tcp->write_cb = NULL;
- if (grpc_tcp_trace) {
- const char *str = grpc_error_string(error);
- gpr_log(GPR_DEBUG, "write: %s", str);
- grpc_error_free_string(str);
- }
-
GPR_TIMER_BEGIN("tcp_handle_write.cb", 0);
cb->cb(exec_ctx, cb->cb_arg, error);
GPR_TIMER_END("tcp_handle_write.cb", 0);
@@ -434,16 +425,8 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
if (!tcp_flush(tcp, &error)) {
TCP_REF(tcp, "write");
tcp->write_cb = cb;
- if (grpc_tcp_trace) {
- gpr_log(GPR_DEBUG, "write: delayed");
- }
grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure);
} else {
- if (grpc_tcp_trace) {
- const char *str = grpc_error_string(error);
- gpr_log(GPR_DEBUG, "write: %s", str);
- grpc_error_free_string(str);
- }
grpc_exec_ctx_sched(exec_ctx, cb, error, NULL);
}
diff --git a/src/core/lib/iomgr/workqueue.h b/src/core/lib/iomgr/workqueue.h
index b2805dc66c..7156e490d7 100644
--- a/src/core/lib/iomgr/workqueue.h
+++ b/src/core/lib/iomgr/workqueue.h
@@ -50,6 +50,10 @@
/* grpc_workqueue is forward declared in exec_ctx.h */
+/* Deprecated: do not use.
+ This has *already* been removed in a future commit. */
+void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue);
+
/* Reference counting functions. Use the macro's always
(GRPC_WORKQUEUE_{REF,UNREF}).
diff --git a/src/core/lib/iomgr/workqueue_posix.c b/src/core/lib/iomgr/workqueue_posix.c
index ecfea68f56..e0d6dac230 100644
--- a/src/core/lib/iomgr/workqueue_posix.c
+++ b/src/core/lib/iomgr/workqueue_posix.c
@@ -44,7 +44,6 @@
#include <grpc/support/useful.h>
#include "src/core/lib/iomgr/ev_posix.h"
-#include "src/core/lib/profiling/timers.h"
static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
@@ -53,7 +52,8 @@ grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx,
char name[32];
*workqueue = gpr_malloc(sizeof(grpc_workqueue));
gpr_ref_init(&(*workqueue)->refs, 1);
- gpr_atm_no_barrier_store(&(*workqueue)->state, 1);
+ gpr_mu_init(&(*workqueue)->mu);
+ (*workqueue)->closure_list.head = (*workqueue)->closure_list.tail = NULL;
grpc_error *err = grpc_wakeup_fd_init(&(*workqueue)->wakeup_fd);
if (err != GRPC_ERROR_NONE) {
gpr_free(*workqueue);
@@ -62,7 +62,6 @@ grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx,
sprintf(name, "workqueue:%p", (void *)(*workqueue));
(*workqueue)->wakeup_read_fd = grpc_fd_create(
GRPC_WAKEUP_FD_GET_READ_FD(&(*workqueue)->wakeup_fd), name);
- gpr_mpscq_init(&(*workqueue)->queue);
grpc_closure_init(&(*workqueue)->read_closure, on_readable, *workqueue);
grpc_fd_notify_on_read(exec_ctx, (*workqueue)->wakeup_read_fd,
&(*workqueue)->read_closure);
@@ -71,79 +70,57 @@ grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx,
static void workqueue_destroy(grpc_exec_ctx *exec_ctx,
grpc_workqueue *workqueue) {
+ grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL);
grpc_fd_shutdown(exec_ctx, workqueue->wakeup_read_fd);
}
-static void workqueue_orphan(grpc_exec_ctx *exec_ctx,
- grpc_workqueue *workqueue) {
- if (gpr_atm_full_fetch_add(&workqueue->state, -1) == 1) {
- workqueue_destroy(exec_ctx, workqueue);
- }
-}
-
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line,
const char *reason) {
- if (workqueue == NULL) return;
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "WORKQUEUE:%p ref %d -> %d %s",
workqueue, (int)workqueue->refs.count, (int)workqueue->refs.count + 1,
reason);
- gpr_ref(&workqueue->refs);
-}
#else
void grpc_workqueue_ref(grpc_workqueue *workqueue) {
- if (workqueue == NULL) return;
+#endif
gpr_ref(&workqueue->refs);
}
-#endif
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
const char *file, int line, const char *reason) {
- if (workqueue == NULL) return;
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "WORKQUEUE:%p unref %d -> %d %s",
workqueue, (int)workqueue->refs.count, (int)workqueue->refs.count - 1,
reason);
- if (gpr_unref(&workqueue->refs)) {
- workqueue_orphan(exec_ctx, workqueue);
- }
-}
#else
void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
- if (workqueue == NULL) return;
+#endif
if (gpr_unref(&workqueue->refs)) {
- workqueue_orphan(exec_ctx, workqueue);
+ workqueue_destroy(exec_ctx, workqueue);
}
}
-#endif
-
-static void drain(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
- abort();
-}
-static void wakeup(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
- GPR_TIMER_MARK("workqueue.wakeup", 0);
- grpc_error *err = grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd);
- if (!GRPC_LOG_IF_ERROR("wakeupfd_wakeup", err)) {
- drain(exec_ctx, workqueue);
- }
+void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
+ gpr_mu_lock(&workqueue->mu);
+ grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL);
+ gpr_mu_unlock(&workqueue->mu);
}
static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
- GPR_TIMER_BEGIN("workqueue.on_readable", 0);
-
grpc_workqueue *workqueue = arg;
if (error != GRPC_ERROR_NONE) {
+ gpr_mu_destroy(&workqueue->mu);
/* HACK: let wakeup_fd code know that we stole the fd */
workqueue->wakeup_fd.read_fd = 0;
grpc_wakeup_fd_destroy(&workqueue->wakeup_fd);
grpc_fd_orphan(exec_ctx, workqueue->wakeup_read_fd, NULL, NULL, "destroy");
- GPR_ASSERT(gpr_atm_no_barrier_load(&workqueue->state) == 0);
gpr_free(workqueue);
} else {
+ gpr_mu_lock(&workqueue->mu);
+ grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL);
error = grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd);
- gpr_mpscq_node *n = gpr_mpscq_pop(&workqueue->queue);
+ gpr_mu_unlock(&workqueue->mu);
if (error == GRPC_ERROR_NONE) {
grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd,
&workqueue->read_closure);
@@ -151,46 +128,24 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
/* recurse to get error handling */
on_readable(exec_ctx, arg, error);
}
- if (n == NULL) {
- /* try again - queue in an inconsistant state */
- wakeup(exec_ctx, workqueue);
- } else {
- switch (gpr_atm_full_fetch_add(&workqueue->state, -2)) {
- case 3: // had one count, one unorphaned --> done, unorphaned
- break;
- case 2: // had one count, one orphaned --> done, orphaned
- workqueue_destroy(exec_ctx, workqueue);
- break;
- case 1:
- case 0:
- // these values are illegal - representing an already done or
- // deleted workqueue
- GPR_UNREACHABLE_CODE(break);
- default:
- // schedule a wakeup since there's more to do
- wakeup(exec_ctx, workqueue);
- }
- grpc_closure *cl = (grpc_closure *)n;
- grpc_error *clerr = cl->error;
- cl->cb(exec_ctx, cl->cb_arg, clerr);
- GRPC_ERROR_UNREF(clerr);
- }
}
-
- GPR_TIMER_END("workqueue.on_readable", 0);
}
void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
grpc_closure *closure, grpc_error *error) {
- GPR_TIMER_BEGIN("workqueue.enqueue", 0);
- gpr_atm last = gpr_atm_full_fetch_add(&workqueue->state, 2);
- GPR_ASSERT(last & 1);
- closure->error = error;
- gpr_mpscq_push(&workqueue->queue, &closure->next_data.atm_next);
- if (last == 1) {
- wakeup(exec_ctx, workqueue);
+ grpc_error *push_error = GRPC_ERROR_NONE;
+ gpr_mu_lock(&workqueue->mu);
+ if (grpc_closure_list_empty(workqueue->closure_list)) {
+ push_error = grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd);
+ }
+ grpc_closure_list_append(&workqueue->closure_list, closure, error);
+ if (push_error != GRPC_ERROR_NONE) {
+ const char *msg = grpc_error_string(push_error);
+ gpr_log(GPR_ERROR, "Failed to push to workqueue: %s", msg);
+ grpc_error_free_string(msg);
+ grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL);
}
- GPR_TIMER_END("workqueue.enqueue", 0);
+ gpr_mu_unlock(&workqueue->mu);
}
#endif /* GPR_POSIX_SOCKET */
diff --git a/src/core/lib/iomgr/workqueue_posix.h b/src/core/lib/iomgr/workqueue_posix.h
index 03ee21cef7..0f26ba58e2 100644
--- a/src/core/lib/iomgr/workqueue_posix.h
+++ b/src/core/lib/iomgr/workqueue_posix.h
@@ -35,17 +35,14 @@
#define GRPC_CORE_LIB_IOMGR_WORKQUEUE_POSIX_H
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
-#include "src/core/lib/support/mpscq.h"
struct grpc_fd;
struct grpc_workqueue {
gpr_refcount refs;
- gpr_mpscq queue;
- // state is:
- // lower bit - zero if orphaned
- // other bits - number of items enqueued
- gpr_atm state;
+
+ gpr_mu mu;
+ grpc_closure_list closure_list;
grpc_wakeup_fd wakeup_fd;
struct grpc_fd *wakeup_read_fd;
diff --git a/src/core/lib/iomgr/workqueue_windows.c b/src/core/lib/iomgr/workqueue_windows.c
index ee81dc248e..23e2dea185 100644
--- a/src/core/lib/iomgr/workqueue_windows.c
+++ b/src/core/lib/iomgr/workqueue_windows.c
@@ -42,6 +42,8 @@
// context, which is at least correct, if not performant or in the spirit of
// workqueues.
+void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {}
+
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line,
const char *reason) {}
diff --git a/src/core/lib/security/transport/client_auth_filter.c b/src/core/lib/security/transport/client_auth_filter.c
index b366d1410f..2a1bf4d4e3 100644
--- a/src/core/lib/security/transport/client_auth_filter.c
+++ b/src/core/lib/security/transport/client_auth_filter.c
@@ -40,7 +40,6 @@
#include <grpc/support/string_util.h>
#include "src/core/lib/channel/channel_stack.h"
-#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/security/context/security_context.h"
#include "src/core/lib/security/credentials/credentials.h"
#include "src/core/lib/security/transport/security_connector.h"
@@ -219,8 +218,6 @@ static void on_host_checked(grpc_exec_ctx *exec_ctx, void *user_data,
static void auth_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op *op) {
- GPR_TIMER_BEGIN("auth_start_transport_op", 0);
-
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
@@ -261,14 +258,12 @@ static void auth_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_channel_security_connector_check_call_host(
exec_ctx, chand->security_connector, call_host, chand->auth_context,
on_host_checked, elem);
- GPR_TIMER_END("auth_start_transport_op", 0);
return; /* early exit */
}
}
/* pass control down the stack */
grpc_call_next_op(exec_ctx, elem, op);
- GPR_TIMER_END("auth_start_transport_op", 0);
}
/* Constructor for call_data */
diff --git a/src/core/lib/security/transport/secure_endpoint.c b/src/core/lib/security/transport/secure_endpoint.c
index acb0113ea8..0169ccd9ef 100644
--- a/src/core/lib/security/transport/secure_endpoint.c
+++ b/src/core/lib/security/transport/secure_endpoint.c
@@ -38,7 +38,6 @@
#include <grpc/support/slice_buffer.h>
#include <grpc/support/sync.h>
#include "src/core/lib/debug/trace.h"
-#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/security/transport/tsi_error.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/tsi/transport_security_interface.h"
@@ -249,8 +248,6 @@ static void flush_write_staging_buffer(secure_endpoint *ep, uint8_t **cur,
static void endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *secure_ep,
gpr_slice_buffer *slices, grpc_closure *cb) {
- GPR_TIMER_BEGIN("secure_endpoint.endpoint_write", 0);
-
unsigned i;
tsi_result result = TSI_OK;
secure_endpoint *ep = (secure_endpoint *)secure_ep;
@@ -326,12 +323,10 @@ static void endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *secure_ep,
exec_ctx, cb,
grpc_set_tsi_error_result(GRPC_ERROR_CREATE("Wrap failed"), result),
NULL);
- GPR_TIMER_END("secure_endpoint.endpoint_write", 0);
return;
}
grpc_endpoint_write(exec_ctx, ep->wrapped_ep, &ep->output_buffer, cb);
- GPR_TIMER_END("secure_endpoint.endpoint_write", 0);
}
static void endpoint_shutdown(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c
index b2c6815af8..def16c8229 100644
--- a/src/core/lib/security/transport/server_auth_filter.c
+++ b/src/core/lib/security/transport/server_auth_filter.c
@@ -48,7 +48,7 @@ typedef struct call_data {
up-call on transport_op, and remember to call our on_done_recv member after
handling it. */
grpc_closure auth_on_recv;
- grpc_transport_stream_op *transport_op;
+ grpc_transport_stream_op transport_op;
grpc_metadata_array md;
const grpc_metadata *consumed_md;
size_t num_consumed_md;
@@ -106,10 +106,6 @@ static grpc_mdelem *remove_consumed_md(void *user_data, grpc_mdelem *md) {
return md;
}
-static void destroy_op(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
- gpr_free(arg);
-}
-
/* called from application code */
static void on_md_processing_done(
void *user_data, const grpc_metadata *consumed_md, size_t num_consumed_md,
@@ -135,22 +131,21 @@ static void on_md_processing_done(
grpc_exec_ctx_sched(&exec_ctx, calld->on_done_recv, GRPC_ERROR_NONE, NULL);
} else {
gpr_slice message;
- grpc_transport_stream_op *close_op = gpr_malloc(sizeof(*close_op));
- memset(close_op, 0, sizeof(*close_op));
+ grpc_transport_stream_op close_op;
+ memset(&close_op, 0, sizeof(close_op));
grpc_metadata_array_destroy(&calld->md);
error_details = error_details != NULL
? error_details
: "Authentication metadata processing failed.";
message = gpr_slice_from_copied_string(error_details);
- calld->transport_op->send_initial_metadata = NULL;
- if (calld->transport_op->send_message != NULL) {
- grpc_byte_stream_destroy(&exec_ctx, calld->transport_op->send_message);
- calld->transport_op->send_message = NULL;
+ calld->transport_op.send_initial_metadata = NULL;
+ if (calld->transport_op.send_message != NULL) {
+ grpc_byte_stream_destroy(&exec_ctx, calld->transport_op.send_message);
+ calld->transport_op.send_message = NULL;
}
- calld->transport_op->send_trailing_metadata = NULL;
- close_op->on_complete = grpc_closure_create(destroy_op, close_op);
- grpc_transport_stream_op_add_close(close_op, status, &message);
- grpc_call_next_op(&exec_ctx, elem, close_op);
+ calld->transport_op.send_trailing_metadata = NULL;
+ grpc_transport_stream_op_add_close(&close_op, status, &message);
+ grpc_call_next_op(&exec_ctx, elem, &close_op);
grpc_exec_ctx_sched(&exec_ctx, calld->on_done_recv,
grpc_error_set_int(GRPC_ERROR_CREATE(error_details),
GRPC_ERROR_INT_GRPC_STATUS, status),
@@ -187,7 +182,7 @@ static void set_recv_ops_md_callbacks(grpc_call_element *elem,
calld->recv_initial_metadata = op->recv_initial_metadata;
calld->on_done_recv = op->recv_initial_metadata_ready;
op->recv_initial_metadata_ready = &calld->auth_on_recv;
- calld->transport_op = op;
+ calld->transport_op = *op;
}
}
diff --git a/src/core/lib/support/mpscq.c b/src/core/lib/support/mpscq.c
deleted file mode 100644
index 5b9323275a..0000000000
--- a/src/core/lib/support/mpscq.c
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- *
- * Copyright 2016, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "src/core/lib/support/mpscq.h"
-
-#include <grpc/support/log.h>
-
-void gpr_mpscq_init(gpr_mpscq *q) {
- gpr_atm_no_barrier_store(&q->head, (gpr_atm)&q->stub);
- q->tail = &q->stub;
- gpr_atm_no_barrier_store(&q->stub.next, (gpr_atm)NULL);
-}
-
-void gpr_mpscq_destroy(gpr_mpscq *q) {
- GPR_ASSERT(gpr_atm_no_barrier_load(&q->head) == (gpr_atm)&q->stub);
- GPR_ASSERT(q->tail == &q->stub);
-}
-
-void gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n) {
- gpr_atm_no_barrier_store(&n->next, (gpr_atm)NULL);
- gpr_mpscq_node *prev =
- (gpr_mpscq_node *)gpr_atm_full_xchg(&q->head, (gpr_atm)n);
- gpr_atm_rel_store(&prev->next, (gpr_atm)n);
-}
-
-gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q) {
- gpr_mpscq_node *tail = q->tail;
- gpr_mpscq_node *next = (gpr_mpscq_node *)gpr_atm_acq_load(&tail->next);
- if (tail == &q->stub) {
- // indicates the list is actually (ephemerally) empty
- if (next == NULL) return NULL;
- q->tail = next;
- tail = next;
- next = (gpr_mpscq_node *)gpr_atm_acq_load(&tail->next);
- }
- if (next != NULL) {
- q->tail = next;
- return tail;
- }
- gpr_mpscq_node *head = (gpr_mpscq_node *)gpr_atm_acq_load(&q->head);
- if (tail != head) {
- // indicates a retry is in order: we're still adding
- return NULL;
- }
- gpr_mpscq_push(q, &q->stub);
- next = (gpr_mpscq_node *)gpr_atm_acq_load(&tail->next);
- if (next != NULL) {
- q->tail = next;
- return tail;
- }
- // indicates a retry is in order: we're still adding
- return NULL;
-}
diff --git a/src/core/lib/support/mpscq.h b/src/core/lib/support/mpscq.h
deleted file mode 100644
index 977a117952..0000000000
--- a/src/core/lib/support/mpscq.h
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- *
- * Copyright 2016, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef GRPC_CORE_LIB_SUPPORT_MPSCQ_H
-#define GRPC_CORE_LIB_SUPPORT_MPSCQ_H
-
-#include <grpc/support/atm.h>
-#include <stddef.h>
-
-// Multiple-producer single-consumer lock free queue, based upon the
-// implementation from Dmitry Vyukov here:
-// http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
-
-// List node (include this in a data structure at the top, and add application
-// fields after it - to simulate inheritance)
-typedef struct gpr_mpscq_node { gpr_atm next; } gpr_mpscq_node;
-
-// Actual queue type
-typedef struct gpr_mpscq {
- gpr_atm head;
- // make sure head & tail don't share a cacheline
- char padding[GPR_CACHELINE_SIZE];
- gpr_mpscq_node *tail;
- gpr_mpscq_node stub;
-} gpr_mpscq;
-
-void gpr_mpscq_init(gpr_mpscq *q);
-void gpr_mpscq_destroy(gpr_mpscq *q);
-// Push a node
-void gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n);
-// Pop a node (returns NULL if no node is ready - which doesn't indicate that
-// the queue is empty!!)
-gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q);
-
-#endif /* GRPC_CORE_LIB_SUPPORT_MPSCQ_H */
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 119f5e82ab..772681109a 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -109,10 +109,6 @@ typedef struct batch_control {
uint8_t recv_message;
uint8_t recv_final_op;
uint8_t is_notify_tag_closure;
-
- /* TODO(ctiller): now that this is inlined, figure out how much of the above
- state can be eliminated */
- grpc_transport_stream_op op;
} batch_control;
struct grpc_call {
@@ -782,7 +778,6 @@ typedef struct termination_closure {
grpc_error *error;
grpc_closure *op_closure;
enum { TC_CANCEL, TC_CLOSE } type;
- grpc_transport_stream_op op;
} termination_closure;
static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp,
@@ -802,24 +797,26 @@ static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp,
}
static void send_cancel(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) {
+ grpc_transport_stream_op op;
termination_closure *tc = tcp;
- memset(&tc->op, 0, sizeof(tc->op));
- tc->op.cancel_error = tc->error;
+ memset(&op, 0, sizeof(op));
+ op.cancel_error = tc->error;
/* reuse closure to catch completion */
grpc_closure_init(&tc->closure, done_termination, tc);
- tc->op.on_complete = &tc->closure;
- execute_op(exec_ctx, tc->call, &tc->op);
+ op.on_complete = &tc->closure;
+ execute_op(exec_ctx, tc->call, &op);
}
static void send_close(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) {
+ grpc_transport_stream_op op;
termination_closure *tc = tcp;
- memset(&tc->op, 0, sizeof(tc->op));
- tc->op.close_error = tc->error;
+ memset(&op, 0, sizeof(op));
+ op.close_error = tc->error;
/* reuse closure to catch completion */
grpc_closure_init(&tc->closure, done_termination, tc);
- tc->op_closure = tc->op.on_complete;
- tc->op.on_complete = &tc->closure;
- execute_op(exec_ctx, tc->call, &tc->op);
+ tc->op_closure = op.on_complete;
+ op.on_complete = &tc->closure;
+ execute_op(exec_ctx, tc->call, &op);
}
static grpc_call_error terminate_with_status(grpc_exec_ctx *exec_ctx,
@@ -1373,6 +1370,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
grpc_call *call, const grpc_op *ops,
size_t nops, void *notify_tag,
int is_notify_tag_closure) {
+ grpc_transport_stream_op stream_op;
size_t i;
const grpc_op *op;
batch_control *bctl;
@@ -1386,6 +1384,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, notify_tag);
+ memset(&stream_op, 0, sizeof(stream_op));
+
/* TODO(ctiller): this feels like it could be made lock-free */
gpr_mu_lock(&call->mu);
bctl = allocate_batch_control(call);
@@ -1394,9 +1394,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
bctl->notify_tag = notify_tag;
bctl->is_notify_tag_closure = (uint8_t)(is_notify_tag_closure != 0);
- grpc_transport_stream_op *stream_op = &bctl->op;
- memset(stream_op, 0, sizeof(*stream_op));
-
if (nops == 0) {
GRPC_CALL_INTERNAL_REF(call, "completion");
bctl->error = GRPC_ERROR_NONE;
@@ -1474,9 +1471,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
}
/* TODO(ctiller): just make these the same variable? */
call->metadata_batch[0][0].deadline = call->send_deadline;
- stream_op->send_initial_metadata =
+ stream_op.send_initial_metadata =
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */];
- stream_op->send_initial_metadata_flags = op->flags;
+ stream_op.send_initial_metadata_flags = op->flags;
break;
case GRPC_OP_SEND_MESSAGE:
if (!are_write_flags_valid(op->flags)) {
@@ -1496,7 +1493,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
grpc_slice_buffer_stream_init(
&call->sending_stream,
&op->data.send_message->data.raw.slice_buffer, op->flags);
- stream_op->send_message = &call->sending_stream.base;
+ stream_op.send_message = &call->sending_stream.base;
break;
case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
/* Flag validation: currently allow no flags */
@@ -1514,7 +1511,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
}
bctl->send_final_op = 1;
call->sent_final_op = 1;
- stream_op->send_trailing_metadata =
+ stream_op.send_trailing_metadata =
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
break;
case GRPC_OP_SEND_STATUS_FROM_SERVER:
@@ -1561,7 +1558,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
error = GRPC_CALL_ERROR_INVALID_METADATA;
goto done_with_error;
}
- stream_op->send_trailing_metadata =
+ stream_op.send_trailing_metadata =
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
break;
case GRPC_OP_RECV_INITIAL_METADATA:
@@ -1579,9 +1576,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
grpc_closure_init(&call->receiving_initial_metadata_ready,
receiving_initial_metadata_ready, bctl);
bctl->recv_initial_metadata = 1;
- stream_op->recv_initial_metadata =
+ stream_op.recv_initial_metadata =
&call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
- stream_op->recv_initial_metadata_ready =
+ stream_op.recv_initial_metadata_ready =
&call->receiving_initial_metadata_ready;
num_completion_callbacks_needed++;
break;
@@ -1598,10 +1595,10 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
call->receiving_message = 1;
bctl->recv_message = 1;
call->receiving_buffer = op->data.recv_message;
- stream_op->recv_message = &call->receiving_stream;
+ stream_op.recv_message = &call->receiving_stream;
grpc_closure_init(&call->receiving_stream_ready, receiving_stream_ready,
bctl);
- stream_op->recv_message_ready = &call->receiving_stream_ready;
+ stream_op.recv_message_ready = &call->receiving_stream_ready;
num_completion_callbacks_needed++;
break;
case GRPC_OP_RECV_STATUS_ON_CLIENT:
@@ -1627,9 +1624,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
call->final_op.client.status_details_capacity =
op->data.recv_status_on_client.status_details_capacity;
bctl->recv_final_op = 1;
- stream_op->recv_trailing_metadata =
+ stream_op.recv_trailing_metadata =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
- stream_op->collect_stats =
+ stream_op.collect_stats =
&call->final_info.stats.transport_stream_stats;
break;
case GRPC_OP_RECV_CLOSE_ON_SERVER:
@@ -1650,9 +1647,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
call->final_op.server.cancelled =
op->data.recv_close_on_server.cancelled;
bctl->recv_final_op = 1;
- stream_op->recv_trailing_metadata =
+ stream_op.recv_trailing_metadata =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
- stream_op->collect_stats =
+ stream_op.collect_stats =
&call->final_info.stats.transport_stream_stats;
break;
}
@@ -1664,12 +1661,12 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
}
gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed);
- stream_op->context = call->context;
+ stream_op.context = call->context;
grpc_closure_init(&bctl->finish_batch, finish_batch, bctl);
- stream_op->on_complete = &bctl->finish_batch;
+ stream_op.on_complete = &bctl->finish_batch;
gpr_mu_unlock(&call->mu);
- execute_op(exec_ctx, call, stream_op);
+ execute_op(exec_ctx, call, &stream_op);
done:
GPR_TIMER_END("grpc_call_start_batch", 0);
diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c
index 52e78567bd..6d2b1c4935 100644
--- a/src/core/lib/surface/channel.c
+++ b/src/core/lib/surface/channel.c
@@ -334,13 +334,14 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg,
}
void grpc_channel_destroy(grpc_channel *channel) {
- grpc_transport_op *op = grpc_make_transport_op(NULL);
+ grpc_transport_op op;
grpc_channel_element *elem;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GRPC_API_TRACE("grpc_channel_destroy(channel=%p)", 1, (channel));
- op->disconnect_with_error = GRPC_ERROR_CREATE("Channel Destroyed");
+ memset(&op, 0, sizeof(op));
+ op.disconnect_with_error = GRPC_ERROR_CREATE("Channel Destroyed");
elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CHANNEL(channel), 0);
- elem->filter->start_transport_op(&exec_ctx, elem, op);
+ elem->filter->start_transport_op(&exec_ctx, elem, &op);
GRPC_CHANNEL_INTERNAL_UNREF(&exec_ctx, channel, "channel");
diff --git a/src/core/lib/surface/channel_ping.c b/src/core/lib/surface/channel_ping.c
index 0d2f01a649..9818f9d2f2 100644
--- a/src/core/lib/surface/channel_ping.c
+++ b/src/core/lib/surface/channel_ping.c
@@ -61,20 +61,19 @@ static void ping_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
void grpc_channel_ping(grpc_channel *channel, grpc_completion_queue *cq,
void *tag, void *reserved) {
- GRPC_API_TRACE("grpc_channel_ping(channel=%p, cq=%p, tag=%p, reserved=%p)", 4,
- (channel, cq, tag, reserved));
- grpc_transport_op *op = grpc_make_transport_op(NULL);
+ grpc_transport_op op;
ping_result *pr = gpr_malloc(sizeof(*pr));
grpc_channel_element *top_elem =
grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GPR_ASSERT(reserved == NULL);
+ memset(&op, 0, sizeof(op));
pr->tag = tag;
pr->cq = cq;
grpc_closure_init(&pr->closure, ping_done, pr);
- op->send_ping = &pr->closure;
- op->bind_pollset = grpc_cq_pollset(cq);
+ op.send_ping = &pr->closure;
+ op.bind_pollset = grpc_cq_pollset(cq);
grpc_cq_begin_op(cq, tag);
- top_elem->filter->start_transport_op(&exec_ctx, top_elem, op);
+ top_elem->filter->start_transport_op(&exec_ctx, top_elem, &op);
grpc_exec_ctx_finish(&exec_ctx);
}
diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c
index edda0c85fa..5397913a21 100644
--- a/src/core/lib/surface/init.c
+++ b/src/core/lib/surface/init.c
@@ -47,7 +47,6 @@
#include "src/core/lib/channel/http_server_filter.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/http/parser.h"
-#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/profiling/timers.h"
@@ -166,7 +165,6 @@ void grpc_init(void) {
grpc_register_tracer("http1", &grpc_http1_trace);
grpc_register_tracer("compression", &grpc_compression_trace);
grpc_register_tracer("queue_pluck", &grpc_cq_pluck_trace);
- grpc_register_tracer("combiner", &grpc_combiner_trace);
// Default pluck trace to 1
grpc_cq_pluck_trace = 1;
grpc_register_tracer("queue_timeout", &grpc_cq_event_timeout_trace);
diff --git a/src/core/lib/surface/lame_client.c b/src/core/lib/surface/lame_client.c
index d32c884e8e..19b78369dd 100644
--- a/src/core/lib/surface/lame_client.c
+++ b/src/core/lib/surface/lame_client.c
@@ -97,14 +97,14 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_exec_ctx_sched(exec_ctx, op->on_connectivity_state_change,
GRPC_ERROR_NONE, NULL);
}
+ if (op->on_consumed != NULL) {
+ grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL);
+ }
if (op->send_ping != NULL) {
grpc_exec_ctx_sched(exec_ctx, op->send_ping,
GRPC_ERROR_CREATE("lame client channel"), NULL);
}
GRPC_ERROR_UNREF(op->disconnect_with_error);
- if (op->on_consumed != NULL) {
- grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL);
- }
}
static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 56fb80e92e..55e6d99057 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -273,20 +273,23 @@ static void shutdown_cleanup(grpc_exec_ctx *exec_ctx, void *arg,
}
static void send_shutdown(grpc_exec_ctx *exec_ctx, grpc_channel *channel,
- int send_goaway, grpc_error *send_disconnect) {
- struct shutdown_cleanup_args *sc = gpr_malloc(sizeof(*sc));
- grpc_closure_init(&sc->closure, shutdown_cleanup, sc);
- grpc_transport_op *op = grpc_make_transport_op(&sc->closure);
+ bool send_goaway, grpc_error *send_disconnect) {
+ grpc_transport_op op;
+ struct shutdown_cleanup_args *sc;
grpc_channel_element *elem;
- op->send_goaway = send_goaway;
+ memset(&op, 0, sizeof(op));
+ op.send_goaway = send_goaway;
+ sc = gpr_malloc(sizeof(*sc));
sc->slice = gpr_slice_from_copied_string("Server shutdown");
- op->goaway_message = &sc->slice;
- op->goaway_status = GRPC_STATUS_OK;
- op->disconnect_with_error = send_disconnect;
+ op.goaway_message = &sc->slice;
+ op.goaway_status = GRPC_STATUS_OK;
+ op.disconnect_with_error = send_disconnect;
+ grpc_closure_init(&sc->closure, shutdown_cleanup, sc);
+ op.on_consumed = &sc->closure;
elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
- elem->filter->start_transport_op(exec_ctx, elem, op);
+ elem->filter->start_transport_op(exec_ctx, elem, &op);
}
static void channel_broadcaster_shutdown(grpc_exec_ctx *exec_ctx,
@@ -429,8 +432,7 @@ static void finish_destroy_channel(grpc_exec_ctx *exec_ctx, void *cd,
server_unref(exec_ctx, server);
}
-static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand,
- grpc_error *error) {
+static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand) {
if (is_channel_orphaned(chand)) return;
GPR_ASSERT(chand->server != NULL);
orphan_channel(chand);
@@ -439,20 +441,14 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand,
chand->finish_destroy_channel_closure.cb = finish_destroy_channel;
chand->finish_destroy_channel_closure.cb_arg = chand;
- grpc_transport_op *op =
- grpc_make_transport_op(&chand->finish_destroy_channel_closure);
- op->set_accept_stream = true;
+ grpc_transport_op op;
+ memset(&op, 0, sizeof(op));
+ op.set_accept_stream = true;
+ op.on_consumed = &chand->finish_destroy_channel_closure;
grpc_channel_next_op(exec_ctx,
grpc_channel_stack_element(
grpc_channel_get_channel_stack(chand->channel), 0),
- op);
-
- if (error != GRPC_ERROR_NONE) {
- const char *msg = grpc_error_string(error);
- gpr_log(GPR_INFO, "Disconnected client: %s", msg);
- grpc_error_free_string(msg);
- }
- GRPC_ERROR_UNREF(error);
+ &op);
}
static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
@@ -849,16 +845,17 @@ static void channel_connectivity_changed(grpc_exec_ctx *exec_ctx, void *cd,
channel_data *chand = cd;
grpc_server *server = chand->server;
if (chand->connectivity_state != GRPC_CHANNEL_SHUTDOWN) {
- grpc_transport_op *op = grpc_make_transport_op(NULL);
- op->on_connectivity_state_change = &chand->channel_connectivity_changed,
- op->connectivity_state = &chand->connectivity_state;
+ grpc_transport_op op;
+ memset(&op, 0, sizeof(op));
+ op.on_connectivity_state_change = &chand->channel_connectivity_changed,
+ op.connectivity_state = &chand->connectivity_state;
grpc_channel_next_op(exec_ctx,
grpc_channel_stack_element(
grpc_channel_get_channel_stack(chand->channel), 0),
- op);
+ &op);
} else {
gpr_mu_lock(&server->mu_global);
- destroy_channel(exec_ctx, chand, GRPC_ERROR_REF(error));
+ destroy_channel(exec_ctx, chand);
gpr_mu_unlock(&server->mu_global);
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->channel, "connectivity");
}
@@ -1122,7 +1119,7 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
size_t slots;
uint32_t probes;
uint32_t max_probes = 0;
- grpc_transport_op *op = NULL;
+ grpc_transport_op op;
channel =
grpc_channel_create(exec_ctx, NULL, args, GRPC_SERVER_CHANNEL, transport);
@@ -1182,16 +1179,16 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
gpr_mu_unlock(&s->mu_global);
GRPC_CHANNEL_INTERNAL_REF(channel, "connectivity");
- op = grpc_make_transport_op(NULL);
- op->set_accept_stream = true;
- op->set_accept_stream_fn = accept_stream;
- op->set_accept_stream_user_data = chand;
- op->on_connectivity_state_change = &chand->channel_connectivity_changed;
- op->connectivity_state = &chand->connectivity_state;
+ memset(&op, 0, sizeof(op));
+ op.set_accept_stream = true;
+ op.set_accept_stream_fn = accept_stream;
+ op.set_accept_stream_user_data = chand;
+ op.on_connectivity_state_change = &chand->channel_connectivity_changed;
+ op.connectivity_state = &chand->connectivity_state;
if (gpr_atm_acq_load(&s->shutdown_flag) != 0) {
- op->disconnect_with_error = GRPC_ERROR_CREATE("Server shutdown");
+ op.disconnect_with_error = GRPC_ERROR_CREATE("Server shutdown");
}
- grpc_transport_perform_op(exec_ctx, transport, op);
+ grpc_transport_perform_op(exec_ctx, transport, &op);
}
void done_published_shutdown(grpc_exec_ctx *exec_ctx, void *done_arg,
diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c
index 08f9d7e8d9..857c3909d2 100644
--- a/src/core/lib/transport/transport.c
+++ b/src/core/lib/transport/transport.c
@@ -32,14 +32,10 @@
*/
#include "src/core/lib/transport/transport.h"
-
-#include <string.h>
-
#include <grpc/support/alloc.h>
#include <grpc/support/atm.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
-
#include "src/core/lib/support/string.h"
#include "src/core/lib/transport/transport_impl.h"
@@ -251,26 +247,3 @@ void grpc_transport_stream_op_add_close(grpc_transport_stream_op *op,
error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, status);
add_error(op, &op->close_error, error);
}
-
-typedef struct {
- grpc_closure outer_on_complete;
- grpc_closure *inner_on_complete;
- grpc_transport_op op;
-} made_transport_op;
-
-static void destroy_made_transport_op(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- made_transport_op *op = arg;
- grpc_exec_ctx_sched(exec_ctx, op->inner_on_complete, GRPC_ERROR_REF(error),
- NULL);
- gpr_free(op);
-}
-
-grpc_transport_op *grpc_make_transport_op(grpc_closure *on_complete) {
- made_transport_op *op = gpr_malloc(sizeof(*op));
- grpc_closure_init(&op->outer_on_complete, destroy_made_transport_op, op);
- op->inner_on_complete = on_complete;
- memset(&op->op, 0, sizeof(op->op));
- op->op.on_consumed = &op->outer_on_complete;
- return &op->op;
-}
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index 8dc393fd61..26ed6cb839 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -100,11 +100,6 @@ void grpc_transport_move_one_way_stats(grpc_transport_one_way_stats *from,
void grpc_transport_move_stats(grpc_transport_stream_stats *from,
grpc_transport_stream_stats *to);
-typedef struct {
- grpc_closure closure;
- void *args[2];
-} grpc_transport_private_op_data;
-
/* Transport stream op: a set of operations to perform on a transport
against a single stream */
typedef struct grpc_transport_stream_op {
@@ -154,12 +149,6 @@ typedef struct grpc_transport_stream_op {
/* Indexes correspond to grpc_context_index enum values */
grpc_call_context_element *context;
-
- /***************************************************************************
- * remaining fields are initialized and used at the discretion of the
- * transport implementation */
-
- grpc_transport_private_op_data transport_private;
} grpc_transport_stream_op;
/** Transport op: a set of operations to perform on a transport as a whole */
@@ -193,12 +182,6 @@ typedef struct grpc_transport_op {
grpc_pollset_set *bind_pollset_set;
/** send a ping, call this back if not NULL */
grpc_closure *send_ping;
-
- /***************************************************************************
- * remaining fields are initialized and used at the discretion of the
- * transport implementation */
-
- grpc_transport_private_op_data transport_private;
} grpc_transport_op;
/* Returns the amount of memory required to store a grpc_stream for this
@@ -290,10 +273,6 @@ void grpc_transport_destroy(grpc_exec_ctx *exec_ctx, grpc_transport *transport);
char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx,
grpc_transport *transport);
-/* Allocate a grpc_transport_op, and preconfigure the on_consumed closure to
- \a on_consumed and then delete the returned transport op */
-grpc_transport_op *grpc_make_transport_op(grpc_closure *on_consumed);
-
#ifdef __cplusplus
}
#endif
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index d53f46b18b..be7f30c29b 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -50,7 +50,6 @@ CORE_SOURCE_FILES = [
'src/core/lib/support/log_linux.c',
'src/core/lib/support/log_posix.c',
'src/core/lib/support/log_windows.c',
- 'src/core/lib/support/mpscq.c',
'src/core/lib/support/murmur_hash.c',
'src/core/lib/support/percent_encoding.c',
'src/core/lib/support/slice.c',
@@ -93,7 +92,6 @@ CORE_SOURCE_FILES = [
'src/core/lib/http/httpcli.c',
'src/core/lib/http/parser.c',
'src/core/lib/iomgr/closure.c',
- 'src/core/lib/iomgr/combiner.c',
'src/core/lib/iomgr/endpoint.c',
'src/core/lib/iomgr/endpoint_pair_posix.c',
'src/core/lib/iomgr/endpoint_pair_windows.c',