From 537f7c2a136641487febeac89a25e430029eb40c Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 12 Sep 2016 11:56:39 -0700 Subject: Revert "Grand unified closures" --- src/core/ext/client_config/client_channel.c | 14 +- src/core/ext/client_config/subchannel.c | 18 +- .../transport/chttp2/transport/chttp2_transport.c | 766 ++++++++++++--------- src/core/ext/transport/chttp2/transport/internal.h | 54 +- src/core/ext/transport/chttp2/transport/parsing.c | 12 +- .../ext/transport/chttp2/transport/stream_lists.c | 9 +- src/core/ext/transport/chttp2/transport/writing.c | 21 +- src/core/lib/channel/channel_stack.c | 23 +- src/core/lib/channel/compress_filter.c | 14 +- src/core/lib/iomgr/closure.c | 4 - src/core/lib/iomgr/closure.h | 18 +- src/core/lib/iomgr/combiner.c | 293 -------- src/core/lib/iomgr/combiner.h | 71 -- src/core/lib/iomgr/error.c | 2 +- src/core/lib/iomgr/ev_epoll_linux.c | 4 - src/core/lib/iomgr/exec_ctx.h | 6 +- src/core/lib/iomgr/tcp_posix.c | 17 - src/core/lib/iomgr/workqueue.h | 4 + src/core/lib/iomgr/workqueue_posix.c | 97 +-- src/core/lib/iomgr/workqueue_posix.h | 9 +- src/core/lib/iomgr/workqueue_windows.c | 2 + .../lib/security/transport/client_auth_filter.c | 5 - src/core/lib/security/transport/secure_endpoint.c | 5 - .../lib/security/transport/server_auth_filter.c | 27 +- src/core/lib/support/mpscq.c | 83 --- src/core/lib/support/mpscq.h | 65 -- src/core/lib/surface/call.c | 63 +- src/core/lib/surface/channel.c | 7 +- src/core/lib/surface/channel_ping.c | 11 +- src/core/lib/surface/init.c | 2 - src/core/lib/surface/lame_client.c | 6 +- src/core/lib/surface/server.c | 69 +- src/core/lib/transport/transport.c | 27 - src/core/lib/transport/transport.h | 21 - 34 files changed, 649 insertions(+), 1200 deletions(-) delete mode 100644 src/core/lib/iomgr/combiner.c delete mode 100644 src/core/lib/iomgr/combiner.h delete mode 100644 src/core/lib/support/mpscq.c delete mode 100644 src/core/lib/support/mpscq.h (limited to 'src/core') diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c index be333f4e0d..61e012578e 100644 --- a/src/core/ext/client_config/client_channel.c +++ b/src/core/ext/client_config/client_channel.c @@ -387,7 +387,7 @@ typedef struct client_channel_call_data { grpc_connected_subchannel *connected_subchannel; grpc_polling_entity *pollent; - grpc_transport_stream_op **waiting_ops; + grpc_transport_stream_op *waiting_ops; size_t waiting_ops_count; size_t waiting_ops_capacity; @@ -404,7 +404,7 @@ static void add_waiting_locked(call_data *calld, grpc_transport_stream_op *op) { gpr_realloc(calld->waiting_ops, calld->waiting_ops_capacity * sizeof(*calld->waiting_ops)); } - calld->waiting_ops[calld->waiting_ops_count++] = op; + calld->waiting_ops[calld->waiting_ops_count++] = *op; GPR_TIMER_END("add_waiting_locked", 0); } @@ -413,14 +413,14 @@ static void fail_locked(grpc_exec_ctx *exec_ctx, call_data *calld, size_t i; for (i = 0; i < calld->waiting_ops_count; i++) { grpc_transport_stream_op_finish_with_failure( - exec_ctx, calld->waiting_ops[i], GRPC_ERROR_REF(error)); + exec_ctx, &calld->waiting_ops[i], GRPC_ERROR_REF(error)); } calld->waiting_ops_count = 0; GRPC_ERROR_UNREF(error); } typedef struct { - grpc_transport_stream_op **ops; + grpc_transport_stream_op *ops; size_t nops; grpc_subchannel_call *call; } retry_ops_args; @@ -429,7 +429,7 @@ static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) { retry_ops_args *a = args; size_t i; for (i = 0; i < a->nops; i++) { - grpc_subchannel_call_process_op(exec_ctx, a->call, a->ops[i]); + grpc_subchannel_call_process_op(exec_ctx, a->call, &a->ops[i]); } GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, a->call, "retry_ops"); gpr_free(a->ops); @@ -437,10 +437,6 @@ static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) { } static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) { - if (calld->waiting_ops_count == 0) { - return; - } - retry_ops_args *a = gpr_malloc(sizeof(*a)); a->ops = calld->waiting_ops; a->nops = calld->waiting_ops_count; diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c index 2c4364b259..df35904b85 100644 --- a/src/core/ext/client_config/subchannel.c +++ b/src/core/ext/client_config/subchannel.c @@ -504,13 +504,14 @@ static void connected_subchannel_state_op(grpc_exec_ctx *exec_ctx, grpc_pollset_set *interested_parties, grpc_connectivity_state *state, grpc_closure *closure) { - grpc_transport_op *op = grpc_make_transport_op(NULL); + grpc_transport_op op; grpc_channel_element *elem; - op->connectivity_state = state; - op->on_connectivity_state_change = closure; - op->bind_pollset_set = interested_parties; + memset(&op, 0, sizeof(op)); + op.connectivity_state = state; + op.on_connectivity_state_change = closure; + op.bind_pollset_set = interested_parties; elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0); - elem->filter->start_transport_op(exec_ctx, elem, op); + elem->filter->start_transport_op(exec_ctx, elem, &op); } void grpc_connected_subchannel_notify_on_state_change( @@ -524,11 +525,12 @@ void grpc_connected_subchannel_notify_on_state_change( void grpc_connected_subchannel_ping(grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, grpc_closure *closure) { - grpc_transport_op *op = grpc_make_transport_op(NULL); + grpc_transport_op op; grpc_channel_element *elem; - op->send_ping = closure; + memset(&op, 0, sizeof(op)); + op.send_ping = closure; elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0); - elem->filter->start_transport_op(exec_ctx, elem, op); + elem->filter->start_transport_op(exec_ctx, elem, &op); } static void publish_transport_locked(grpc_exec_ctx *exec_ctx, diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 53da0e5d70..00999e3b94 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -89,20 +89,13 @@ static const grpc_transport_vtable vtable; static void writing_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); static void reading_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); static void parsing_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); -static void reading_action_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error); -static void post_parse_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error); -static void initiate_writing_locked(grpc_exec_ctx *exec_ctx, void *t, - grpc_error *error); -static void initiate_read_flush_locked(grpc_exec_ctx *exec_ctx, void *t, - grpc_error *error); -static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, void *t, - grpc_error *error); +static void initiate_writing(grpc_exec_ctx *exec_ctx, void *t, + grpc_error *error); static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t); static void end_waiting_for_write(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, grpc_error *error); + grpc_chttp2_transport *t, grpc_error *error, + const char *reason); /** Set a transport level setting, and push it to our peer */ static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, @@ -112,6 +105,11 @@ static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_error *error); +/** Perform a transport_op */ +static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s, void *transport_op); + /** Cancel a stream: coming from the transport API */ static void cancel_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, @@ -123,10 +121,22 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream_global *stream_global, grpc_error *error); +/** Add endpoint from this transport to pollset */ +static void add_to_pollset_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s_ignored, void *pollset); +static void add_to_pollset_set_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s_ignored, + void *pollset_set); + /** Start new streams that have been created if we can */ static void maybe_start_some_streams( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global); +static void finish_global_actions(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t); + static void connectivity_state_set( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_connectivity_state state, grpc_error *error, const char *reason); @@ -139,16 +149,14 @@ static void incoming_byte_stream_update_flow_control( grpc_chttp2_stream_global *stream_global, size_t max_size_hint, size_t have_already); static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, - void *byte_stream, - grpc_error *error_ignored); + grpc_chttp2_transport *t, + grpc_chttp2_stream *s, + void *byte_stream); static void fail_pending_writes(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, grpc_error *error); -static void set_write_state(grpc_chttp2_transport *t, - grpc_chttp2_write_state state, const char *reason); - /******************************************************************************* * CONSTRUCTION/DESTRUCTION/REFCOUNTING */ @@ -157,7 +165,9 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { size_t i; - grpc_endpoint_destroy(exec_ctx, t->ep); + gpr_mu_lock(&t->executor.mu); + + GPR_ASSERT(t->ep == NULL); gpr_slice_buffer_destroy(&t->global.qbuf); @@ -181,7 +191,8 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream_map_destroy(&t->new_stream_map); grpc_connectivity_state_destroy(exec_ctx, &t->channel_callback.state_tracker); - grpc_combiner_destroy(exec_ctx, t->executor.combiner); + gpr_mu_unlock(&t->executor.mu); + gpr_mu_destroy(&t->executor.mu); /* callback remaining pings: they're not allowed to call into the transpot, and maybe they hold resources that need to be freed */ @@ -239,13 +250,12 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, memset(t, 0, sizeof(*t)); t->base.vtable = &vtable; - t->executor.write_state = GRPC_CHTTP2_WRITES_CORKED; t->ep = ep; - /* one ref is for destroy */ - gpr_ref_init(&t->refs, 1); + /* one ref is for destroy, the other for when ep becomes NULL */ + gpr_ref_init(&t->refs, 2); /* ref is dropped at transport close() */ gpr_ref_init(&t->shutdown_ep_refs, 1); - t->executor.combiner = grpc_combiner_create(grpc_endpoint_get_workqueue(ep)); + gpr_mu_init(&t->executor.mu); t->peer_string = grpc_endpoint_get_peer(ep); t->endpoint_reading = 1; t->global.next_stream_id = is_client ? 1 : 2; @@ -271,22 +281,23 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_hpack_compressor_init(&t->writing.hpack_compressor); grpc_closure_init(&t->writing_action, writing_action, t); grpc_closure_init(&t->reading_action, reading_action, t); - grpc_closure_init(&t->reading_action_locked, reading_action_locked, t); grpc_closure_init(&t->parsing_action, parsing_action, t); - grpc_closure_init(&t->post_parse_locked, post_parse_locked, t); - grpc_closure_init(&t->initiate_writing, initiate_writing_locked, t); - grpc_closure_init(&t->terminate_writing, terminate_writing_with_lock, t); - grpc_closure_init(&t->initiate_read_flush_locked, initiate_read_flush_locked, - t); - grpc_closure_init(&t->writing.done_cb, grpc_chttp2_terminate_writing, - &t->writing); + grpc_closure_init(&t->initiate_writing, initiate_writing, t); gpr_slice_buffer_init(&t->parsing.qbuf); grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser); grpc_chttp2_hpack_parser_init(&t->parsing.hpack_parser); + grpc_closure_init(&t->writing.done_cb, grpc_chttp2_terminate_writing, + &t->writing); gpr_slice_buffer_init(&t->read_buffer); + if (is_client) { + gpr_slice_buffer_add( + &t->global.qbuf, + gpr_slice_from_copied_string(GRPC_CHTTP2_CLIENT_CONNECT_STRING)); + grpc_chttp2_initiate_write(exec_ctx, &t->global, false, "initial_write"); + } /* 8 is a random stab in the dark as to a good initial size: it's small enough that it shouldn't waste memory for infrequently used connections, yet large enough that the exponential growth should happen nicely when it's @@ -309,13 +320,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->global.force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE; t->global.sent_local_settings = 0; - if (is_client) { - gpr_slice_buffer_add( - &t->writing.outbuf, - gpr_slice_from_copied_string(GRPC_CHTTP2_CLIENT_CONNECT_STRING)); - grpc_chttp2_initiate_write(exec_ctx, &t->global, false, "initial_write"); - } - /* configure http2 the way we like it */ if (is_client) { push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0); @@ -420,39 +424,47 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } } } - - set_write_state(t, GRPC_CHTTP2_WRITING_INACTIVE, "uncork"); - grpc_chttp2_initiate_write(exec_ctx, &t->global, false, "init"); } -static void destroy_transport_locked(grpc_exec_ctx *exec_ctx, void *tp, - grpc_error *error) { - grpc_chttp2_transport *t = tp; +static void destroy_transport_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s_ignored, + void *arg_ignored) { t->destroying = 1; drop_connection(exec_ctx, t, GRPC_ERROR_CREATE("Transport destroyed")); - UNREF_TRANSPORT(exec_ctx, t, "destroy"); } static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; - grpc_combiner_execute(exec_ctx, t->executor.combiner, - grpc_closure_create(destroy_transport_locked, t), - GRPC_ERROR_NONE); + grpc_chttp2_run_with_global_lock(exec_ctx, t, NULL, destroy_transport_locked, + NULL, 0); + UNREF_TRANSPORT(exec_ctx, t, "destroy"); } /** block grpc_endpoint_shutdown being called until a paired allow_endpoint_shutdown is made */ static void prevent_endpoint_shutdown(grpc_chttp2_transport *t) { + GPR_ASSERT(t->ep); gpr_ref(&t->shutdown_ep_refs); } static void allow_endpoint_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { if (gpr_unref(&t->shutdown_ep_refs)) { - grpc_endpoint_shutdown(exec_ctx, t->ep); + if (t->ep) { + grpc_endpoint_shutdown(exec_ctx, t->ep); + } } } +static void destroy_endpoint(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t) { + grpc_endpoint_destroy(exec_ctx, t->ep); + t->ep = NULL; + /* safe because we'll still have the ref for write */ + UNREF_TRANSPORT(exec_ctx, t, "disconnect"); +} + static void close_transport_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_error *error) { @@ -463,7 +475,9 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, t->closed = 1; connectivity_state_set(exec_ctx, &t->global, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "close_transport"); - allow_endpoint_shutdown_locked(exec_ctx, t); + if (t->ep) { + allow_endpoint_shutdown_locked(exec_ctx, t); + } /* flush writable stream list to avoid dangling references */ grpc_chttp2_stream_global *stream_global; @@ -497,23 +511,21 @@ void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx, } #endif -static void finish_init_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, - grpc_error *error) { - grpc_chttp2_stream *s = sp; - grpc_chttp2_register_stream(s->t, s); - GRPC_CHTTP2_STREAM_UNREF(exec_ctx, &s->global, "init"); +static void finish_init_stream_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s, + void *arg_ignored) { + grpc_chttp2_register_stream(t, s); } static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_stream *gs, grpc_stream_refcount *refcount, const void *server_data) { - GPR_TIMER_BEGIN("init_stream", 0); grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; memset(s, 0, sizeof(*s)); - s->t = t; s->refcount = refcount; /* We reserve one 'active stream' that's dropped when the stream is read-closed. The others are for incoming_byte_streams that are actively @@ -548,21 +560,16 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, s->global.in_stream_map = true; } - grpc_closure_init(&s->init_stream, finish_init_stream_locked, s); - GRPC_CHTTP2_STREAM_REF(&s->global, "init"); - grpc_combiner_execute(exec_ctx, t->executor.combiner, &s->init_stream, - GRPC_ERROR_NONE); - - GPR_TIMER_END("init_stream", 0); + grpc_chttp2_run_with_global_lock(exec_ctx, t, s, finish_init_stream_locked, + NULL, 0); return 0; } -static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, - grpc_error *error) { +static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s, void *arg) { grpc_byte_stream *bs; - grpc_chttp2_stream *s = sp; - grpc_chttp2_transport *t = s->t; GPR_TIMER_BEGIN("destroy_stream", 0); @@ -581,7 +588,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, while ( (bs = grpc_chttp2_incoming_frame_queue_pop(&s->global.incoming_frames))) { - incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE); + incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs); } grpc_chttp2_list_remove_unannounced_incoming_window_available(&t->global, @@ -618,20 +625,16 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, GPR_TIMER_END("destroy_stream", 0); - gpr_free(s->destroy_stream_arg); + gpr_free(arg); } static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_stream *gs, void *and_free_memory) { - GPR_TIMER_BEGIN("destroy_stream", 0); grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; - s->destroy_stream_arg = and_free_memory; - grpc_closure_init(&s->destroy_stream, destroy_stream_locked, s); - grpc_combiner_execute(exec_ctx, t->executor.combiner, &s->destroy_stream, - GRPC_ERROR_NONE); - GPR_TIMER_END("destroy_stream", 0); + grpc_chttp2_run_with_global_lock(exec_ctx, t, s, destroy_stream_locked, + and_free_memory, 0); } grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream( @@ -662,10 +665,12 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream( static const char *write_state_name(grpc_chttp2_write_state state) { switch (state) { - case GRPC_CHTTP2_WRITES_CORKED: - return "CORKED"; case GRPC_CHTTP2_WRITING_INACTIVE: return "INACTIVE"; + case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER: + return "REQUESTED[p=0]"; + case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER: + return "REQUESTED[p=1]"; case GRPC_CHTTP2_WRITE_SCHEDULED: return "SCHEDULED"; case GRPC_CHTTP2_WRITING: @@ -688,18 +693,120 @@ static void set_write_state(grpc_chttp2_transport *t, t->executor.write_state = state; } -static void initiate_writing_locked(grpc_exec_ctx *exec_ctx, void *tp, - grpc_error *error) { - grpc_chttp2_transport *t = tp; - GPR_ASSERT(t->executor.write_state == GRPC_CHTTP2_WRITE_SCHEDULED); - start_writing(exec_ctx, t); +static void finish_global_actions(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t) { + grpc_chttp2_executor_action_header *hdr; + grpc_chttp2_executor_action_header *next; + + GPR_TIMER_BEGIN("finish_global_actions", 0); + + for (;;) { + check_read_ops(exec_ctx, &t->global); + + gpr_mu_lock(&t->executor.mu); + if (t->executor.pending_actions_head != NULL) { + hdr = t->executor.pending_actions_head; + t->executor.pending_actions_head = t->executor.pending_actions_tail = + NULL; + gpr_mu_unlock(&t->executor.mu); + while (hdr != NULL) { + GPR_TIMER_BEGIN("chttp2:locked_action", 0); + hdr->action(exec_ctx, t, hdr->stream, hdr->arg); + GPR_TIMER_END("chttp2:locked_action", 0); + next = hdr->next; + gpr_free(hdr); + UNREF_TRANSPORT(exec_ctx, t, "pending_action"); + hdr = next; + } + continue; + } else { + t->executor.global_active = false; + switch (t->executor.write_state) { + case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER: + set_write_state(t, GRPC_CHTTP2_WRITE_SCHEDULED, "unlocking"); + REF_TRANSPORT(t, "initiate_writing"); + gpr_mu_unlock(&t->executor.mu); + grpc_exec_ctx_sched( + exec_ctx, &t->initiate_writing, GRPC_ERROR_NONE, + t->ep != NULL ? grpc_endpoint_get_workqueue(t->ep) : NULL); + break; + case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER: + start_writing(exec_ctx, t); + gpr_mu_unlock(&t->executor.mu); + break; + case GRPC_CHTTP2_WRITING_INACTIVE: + case GRPC_CHTTP2_WRITING: + case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER: + case GRPC_CHTTP2_WRITING_STALE_NO_POLLER: + case GRPC_CHTTP2_WRITE_SCHEDULED: + gpr_mu_unlock(&t->executor.mu); + break; + } + } + break; + } + + GPR_TIMER_END("finish_global_actions", 0); } -static void initiate_read_flush_locked(grpc_exec_ctx *exec_ctx, void *tp, - grpc_error *error) { - grpc_chttp2_transport *t = tp; - t->executor.check_read_ops_scheduled = false; - check_read_ops(exec_ctx, &t->global); +void grpc_chttp2_run_with_global_lock(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *optional_stream, + grpc_chttp2_locked_action action, + void *arg, size_t sizeof_arg) { + grpc_chttp2_executor_action_header *hdr; + + GPR_TIMER_BEGIN("grpc_chttp2_run_with_global_lock", 0); + + REF_TRANSPORT(t, "run_global"); + gpr_mu_lock(&t->executor.mu); + + for (;;) { + if (!t->executor.global_active) { + t->executor.global_active = 1; + gpr_mu_unlock(&t->executor.mu); + + GPR_TIMER_BEGIN("chttp2:locked_action", 0); + action(exec_ctx, t, optional_stream, arg); + GPR_TIMER_END("chttp2:locked_action", 0); + + finish_global_actions(exec_ctx, t); + } else { + gpr_mu_unlock(&t->executor.mu); + + hdr = gpr_malloc(sizeof(*hdr) + sizeof_arg); + hdr->stream = optional_stream; + hdr->action = action; + if (sizeof_arg == 0) { + hdr->arg = arg; + } else { + hdr->arg = hdr + 1; + memcpy(hdr->arg, arg, sizeof_arg); + } + + gpr_mu_lock(&t->executor.mu); + if (!t->executor.global_active) { + /* global lock was released while allocating memory: release & retry */ + gpr_free(hdr); + continue; + } + hdr->next = NULL; + if (t->executor.pending_actions_head != NULL) { + t->executor.pending_actions_tail = + t->executor.pending_actions_tail->next = hdr; + } else { + t->executor.pending_actions_tail = t->executor.pending_actions_head = + hdr; + } + REF_TRANSPORT(t, "pending_action"); + gpr_mu_unlock(&t->executor.mu); + } + break; + } + + UNREF_TRANSPORT(exec_ctx, t, "run_global"); + + GPR_TIMER_END("grpc_chttp2_run_with_global_lock", 0); } /******************************************************************************* @@ -709,12 +816,11 @@ static void initiate_read_flush_locked(grpc_exec_ctx *exec_ctx, void *tp, void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, bool covered_by_poller, const char *reason) { - GPR_TIMER_BEGIN("grpc_chttp2_initiate_write", 0); - /* Perform state checks, and transition to a scheduled state if appropriate. - If we are inactive, schedule a write chain to begin once the transport - combiner finishes any executions in its current batch (which may be - scheduled AFTER this code executes). The write chain will: + Each time we finish the global lock execution, we check if we need to + write. If we do: + - (if there is a poller surrounding the write) schedule + initiate_writing, which locks and calls initiate_writing_locked to... - call start_writing, which verifies (under the global lock) that there are things that need to be written by calling grpc_chttp2_unlocking_check_writes, and if so schedules writing_action @@ -724,28 +830,31 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, to do *another* write immediately, and if so loops back to start_writing. - Current problems: + Current problems: - too much lock entry/exiting - the writing thread can become stuck indefinitely (punt through the workqueue periodically to fix) */ grpc_chttp2_transport *t = TRANSPORT_FROM_GLOBAL(transport_global); switch (t->executor.write_state) { - case GRPC_CHTTP2_WRITES_CORKED: - break; case GRPC_CHTTP2_WRITING_INACTIVE: - set_write_state(t, GRPC_CHTTP2_WRITE_SCHEDULED, reason); - REF_TRANSPORT(t, "writing"); - grpc_combiner_execute_finally(exec_ctx, t->executor.combiner, - &t->initiate_writing, GRPC_ERROR_NONE, - covered_by_poller); + set_write_state(t, covered_by_poller + ? GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER + : GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER, + reason); break; - case GRPC_CHTTP2_WRITE_SCHEDULED: + case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER: + /* nothing to do: write already requested */ + break; + case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER: if (covered_by_poller) { /* upgrade to note poller is available to cover the write */ - grpc_combiner_force_async_finally(t->executor.combiner); + set_write_state(t, GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER, reason); } break; + case GRPC_CHTTP2_WRITE_SCHEDULED: + /* nothing to do: write already scheduled */ + break; case GRPC_CHTTP2_WRITING: set_write_state(t, covered_by_poller ? GRPC_CHTTP2_WRITING_STALE_WITH_POLLER @@ -762,15 +871,15 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, } break; } - GPR_TIMER_END("grpc_chttp2_initiate_write", 0); } static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { - GPR_TIMER_BEGIN("start_writing", 0); - GPR_ASSERT(t->executor.write_state == GRPC_CHTTP2_WRITE_SCHEDULED); + GPR_ASSERT(t->executor.write_state == GRPC_CHTTP2_WRITE_SCHEDULED || + t->executor.write_state == GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER); if (!t->closed && grpc_chttp2_unlocking_check_writes(exec_ctx, &t->global, &t->writing)) { set_write_state(t, GRPC_CHTTP2_WRITING, "start_writing"); + REF_TRANSPORT(t, "writing"); prevent_endpoint_shutdown(t); grpc_exec_ctx_sched(exec_ctx, &t->writing_action, GRPC_ERROR_NONE, NULL); } else { @@ -781,10 +890,25 @@ static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { set_write_state(t, GRPC_CHTTP2_WRITING_INACTIVE, "start_writing:nothing_to_write"); } - end_waiting_for_write(exec_ctx, t, GRPC_ERROR_NONE); - UNREF_TRANSPORT(exec_ctx, t, "writing"); + end_waiting_for_write(exec_ctx, t, GRPC_ERROR_NONE, "Nothing to write"); + if (t->ep && !t->endpoint_reading) { + destroy_endpoint(exec_ctx, t); + } } - GPR_TIMER_END("start_writing", 0); +} + +static void initiate_writing_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s_unused, + void *arg_ignored) { + start_writing(exec_ctx, t); + UNREF_TRANSPORT(exec_ctx, t, "initiate_writing"); +} + +static void initiate_writing(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_chttp2_run_with_global_lock(exec_ctx, arg, NULL, initiate_writing_locked, + NULL, 0); } void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx, @@ -818,10 +942,15 @@ static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, /* error may be GRPC_ERROR_NONE if there is no error allocated yet. In that case, use "reason" as the text for a new error. */ static void end_waiting_for_write(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, grpc_error *error) { + grpc_chttp2_transport *t, grpc_error *error, + const char *reason) { grpc_chttp2_stream_global *stream_global; while (grpc_chttp2_list_pop_closed_waiting_for_writing(&t->global, &stream_global)) { + if (error == GRPC_ERROR_NONE && reason != NULL) { + /* create error object. */ + error = GRPC_ERROR_CREATE(reason); + } fail_pending_writes(exec_ctx, &t->global, stream_global, GRPC_ERROR_REF(error)); GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "finish_writes"); @@ -829,10 +958,12 @@ static void end_waiting_for_write(grpc_exec_ctx *exec_ctx, GRPC_ERROR_UNREF(error); } -static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, void *tp, - grpc_error *error) { - GPR_TIMER_BEGIN("terminate_writing_with_lock", 0); - grpc_chttp2_transport *t = tp; +static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s_ignored, + void *a) { + grpc_error *error = a; + allow_endpoint_shutdown_locked(exec_ctx, t); if (error != GRPC_ERROR_NONE) { @@ -841,46 +972,39 @@ static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, void *tp, grpc_chttp2_cleanup_writing(exec_ctx, &t->global, &t->writing); - end_waiting_for_write(exec_ctx, t, GRPC_ERROR_REF(error)); + end_waiting_for_write(exec_ctx, t, error, NULL); switch (t->executor.write_state) { - case GRPC_CHTTP2_WRITES_CORKED: case GRPC_CHTTP2_WRITING_INACTIVE: + case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER: + case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER: case GRPC_CHTTP2_WRITE_SCHEDULED: GPR_UNREACHABLE_CODE(break); case GRPC_CHTTP2_WRITING: - GPR_TIMER_MARK("state=writing", 0); set_write_state(t, GRPC_CHTTP2_WRITING_INACTIVE, "terminate_writing"); break; case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER: - GPR_TIMER_MARK("state=writing_stale_with_poller", 0); - set_write_state(t, GRPC_CHTTP2_WRITE_SCHEDULED, "terminate_writing"); - REF_TRANSPORT(t, "writing"); - grpc_combiner_execute_finally(exec_ctx, t->executor.combiner, - &t->initiate_writing, GRPC_ERROR_NONE, - true); + set_write_state(t, GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER, + "terminate_writing"); break; case GRPC_CHTTP2_WRITING_STALE_NO_POLLER: - GPR_TIMER_MARK("state=writing_stale_no_poller", 0); - set_write_state(t, GRPC_CHTTP2_WRITE_SCHEDULED, "terminate_writing"); - REF_TRANSPORT(t, "writing"); - grpc_combiner_execute_finally(exec_ctx, t->executor.combiner, - &t->initiate_writing, GRPC_ERROR_NONE, - false); + set_write_state(t, GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER, + "terminate_writing"); break; } + if (t->ep && !t->endpoint_reading) { + destroy_endpoint(exec_ctx, t); + } + UNREF_TRANSPORT(exec_ctx, t, "writing"); - GPR_TIMER_END("terminate_writing_with_lock", 0); } void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx, void *transport_writing, grpc_error *error) { - GPR_TIMER_BEGIN("grpc_chttp2_terminate_writing", 0); grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing); - grpc_combiner_execute(exec_ctx, t->executor.combiner, &t->terminate_writing, - GRPC_ERROR_REF(error)); - GPR_TIMER_END("grpc_chttp2_terminate_writing", 0); + grpc_chttp2_run_with_global_lock( + exec_ctx, t, NULL, terminate_writing_with_lock, GRPC_ERROR_REF(error), 0); } static void writing_action(grpc_exec_ctx *exec_ctx, void *gt, @@ -1024,22 +1148,15 @@ static int contains_non_ok_status( static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {} -static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, - grpc_error *error_ignored) { +static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s, void *stream_op) { GPR_TIMER_BEGIN("perform_stream_op_locked", 0); grpc_transport_stream_op *op = stream_op; - grpc_chttp2_transport *t = op->transport_private.args[0]; - grpc_chttp2_stream *s = op->transport_private.args[1]; grpc_chttp2_transport_global *transport_global = &t->global; grpc_chttp2_stream_global *stream_global = &s->global; - if (grpc_http_trace) { - char *str = grpc_transport_stream_op_string(op); - gpr_log(GPR_DEBUG, "perform_stream_op_locked: %s", str); - gpr_free(str); - } - grpc_closure *on_complete = op->on_complete; if (on_complete == NULL) { on_complete = grpc_closure_create(do_nothing, NULL); @@ -1094,8 +1211,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } else { if (contains_non_ok_status(transport_global, op->send_initial_metadata)) { stream_global->seen_error = true; - grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global, - stream_global); + grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); } if (!stream_global->write_closed) { if (transport_global->is_client) { @@ -1162,8 +1278,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, if (contains_non_ok_status(transport_global, op->send_trailing_metadata)) { stream_global->seen_error = true; - grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global, - stream_global); + grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); } if (stream_global->write_closed) { stream_global->send_trailing_metadata = NULL; @@ -1188,8 +1303,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, stream_global->recv_initial_metadata_ready = op->recv_initial_metadata_ready; stream_global->recv_initial_metadata = op->recv_initial_metadata; - grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global, - stream_global); + grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); } if (op->recv_message != NULL) { @@ -1203,8 +1317,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, exec_ctx, transport_global, stream_global, transport_global->stream_lookahead, 0); } - grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global, - stream_global); + grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); } if (op->recv_trailing_metadata != NULL) { @@ -1213,30 +1326,21 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, add_closure_barrier(on_complete); stream_global->recv_trailing_metadata = op->recv_trailing_metadata; stream_global->final_metadata_requested = true; - grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global, - stream_global); + grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); } grpc_chttp2_complete_closure_step(exec_ctx, transport_global, stream_global, &on_complete, GRPC_ERROR_NONE); GPR_TIMER_END("perform_stream_op_locked", 0); - GRPC_CHTTP2_STREAM_UNREF(exec_ctx, &s->global, "perform_stream_op"); } static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_stream *gs, grpc_transport_stream_op *op) { - GPR_TIMER_BEGIN("perform_stream_op", 0); grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; - grpc_closure_init(&op->transport_private.closure, perform_stream_op_locked, - op); - op->transport_private.args[0] = gt; - op->transport_private.args[1] = gs; - GRPC_CHTTP2_STREAM_REF(&s->global, "perform_stream_op"); - grpc_combiner_execute(exec_ctx, t->executor.combiner, - &op->transport_private.closure, GRPC_ERROR_NONE); - GPR_TIMER_END("perform_stream_op", 0); + grpc_chttp2_run_with_global_lock(exec_ctx, t, s, perform_stream_op_locked, op, + sizeof(*op)); } static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, @@ -1258,20 +1362,13 @@ static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_initiate_write(exec_ctx, &t->global, true, "send_ping"); } -typedef struct ack_ping_args { - grpc_closure closure; - grpc_chttp2_transport *t; - uint8_t opaque_8bytes[8]; -} ack_ping_args; - -static void ack_ping_locked(grpc_exec_ctx *exec_ctx, void *a, - grpc_error *error_ignored) { - ack_ping_args *args = a; +static void ack_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_chttp2_stream *s, void *opaque_8bytes) { grpc_chttp2_outstanding_ping *ping; - grpc_chttp2_transport_global *transport_global = &args->t->global; + grpc_chttp2_transport_global *transport_global = &t->global; for (ping = transport_global->pings.next; ping != &transport_global->pings; ping = ping->next) { - if (0 == memcmp(args->opaque_8bytes, ping->id, 8)) { + if (0 == memcmp(opaque_8bytes, ping->id, 8)) { grpc_exec_ctx_sched(exec_ctx, ping->on_recv, GRPC_ERROR_NONE, NULL); ping->next->prev = ping->prev; ping->prev->next = ping->next; @@ -1279,27 +1376,21 @@ static void ack_ping_locked(grpc_exec_ctx *exec_ctx, void *a, break; } } - UNREF_TRANSPORT(exec_ctx, args->t, "ack_ping"); - gpr_free(args); } void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing, const uint8_t *opaque_8bytes) { - ack_ping_args *args = gpr_malloc(sizeof(*args)); - args->t = TRANSPORT_FROM_PARSING(transport_parsing); - memcpy(args->opaque_8bytes, opaque_8bytes, sizeof(args->opaque_8bytes)); - grpc_closure_init(&args->closure, ack_ping_locked, args); - REF_TRANSPORT(args->t, "ack_ping"); - grpc_combiner_execute(exec_ctx, args->t->executor.combiner, &args->closure, - GRPC_ERROR_NONE); + grpc_chttp2_run_with_global_lock( + exec_ctx, TRANSPORT_FROM_PARSING(transport_parsing), NULL, + ack_ping_locked, (void *)opaque_8bytes, 8); } static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, - void *stream_op, - grpc_error *error_ignored) { + grpc_chttp2_transport *t, + grpc_chttp2_stream *s_unused, + void *stream_op) { grpc_transport_op *op = stream_op; - grpc_chttp2_transport *t = op->transport_private.args[0]; grpc_error *close_transport = op->disconnect_with_error; /* If there's a set_accept_stream ensure that we're not parsing @@ -1311,6 +1402,8 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, return; } + grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL); + if (op->on_connectivity_state_change != NULL) { grpc_connectivity_state_notify_on_state_change( exec_ctx, &t->channel_callback.state_tracker, op->connectivity_state, @@ -1336,11 +1429,11 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, } if (op->bind_pollset) { - grpc_endpoint_add_to_pollset(exec_ctx, t->ep, op->bind_pollset); + add_to_pollset_locked(exec_ctx, t, NULL, op->bind_pollset); } if (op->bind_pollset_set) { - grpc_endpoint_add_to_pollset_set(exec_ctx, t->ep, op->bind_pollset_set); + add_to_pollset_set_locked(exec_ctx, t, NULL, op->bind_pollset_set); } if (op->send_ping) { @@ -1350,21 +1443,13 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, if (close_transport != GRPC_ERROR_NONE) { close_transport_locked(exec_ctx, t, close_transport); } - - grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL); - - UNREF_TRANSPORT(exec_ctx, t, "transport_op"); } static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_transport_op *op) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; - op->transport_private.args[0] = gt; - grpc_closure_init(&op->transport_private.closure, perform_transport_op_locked, - op); - REF_TRANSPORT(t, "transport_op"); - grpc_combiner_execute(exec_ctx, t->executor.combiner, - &op->transport_private.closure, GRPC_ERROR_NONE); + grpc_chttp2_run_with_global_lock( + exec_ctx, t, NULL, perform_transport_op_locked, op, sizeof(*op)); } /******************************************************************************* @@ -1373,7 +1458,6 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, static void check_read_ops(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global) { - GPR_TIMER_BEGIN("check_read_ops", 0); grpc_chttp2_stream_global *stream_global; grpc_byte_stream *bs; while ( @@ -1383,7 +1467,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, if (stream_global->seen_error) { while ((bs = grpc_chttp2_incoming_frame_queue_pop( &stream_global->incoming_frames)) != NULL) { - incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE); + incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs); } if (stream_global->exceeded_metadata_size) { cancel_from_api( @@ -1406,7 +1490,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, stream_global->seen_error && (bs = grpc_chttp2_incoming_frame_queue_pop( &stream_global->incoming_frames)) != NULL) { - incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE); + incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs); } if (stream_global->incoming_frames.head != NULL) { *stream_global->recv_message = grpc_chttp2_incoming_frame_queue_pop( @@ -1427,7 +1511,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, if (stream_global->seen_error) { while ((bs = grpc_chttp2_incoming_frame_queue_pop( &stream_global->incoming_frames)) != NULL) { - incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE); + incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs); } if (stream_global->exceeded_metadata_size) { cancel_from_api( @@ -1448,7 +1532,6 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, } } } - GPR_TIMER_END("check_read_ops", 0); } static void decrement_active_streams_locked( @@ -1456,8 +1539,7 @@ static void decrement_active_streams_locked( grpc_chttp2_stream_global *stream_global) { if ((stream_global->all_incoming_byte_streams_finished = gpr_unref(&stream_global->active_streams))) { - grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global, - stream_global); + grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); } } @@ -1561,8 +1643,7 @@ static void cancel_from_api(grpc_exec_ctx *exec_ctx, } if (due_to_error != GRPC_ERROR_NONE && !stream_global->seen_error) { stream_global->seen_error = true; - grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global, - stream_global); + grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); } grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, 1, 1, due_to_error); @@ -1574,8 +1655,7 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, grpc_status_code status, gpr_slice *slice) { if (status != GRPC_STATUS_OK) { stream_global->seen_error = true; - grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global, - stream_global); + grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); } /* stream_global->recv_trailing_metadata_finished gives us a last chance replacement: we've received trailing metadata, @@ -1599,8 +1679,7 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, grpc_mdstr_from_slice(gpr_slice_ref(*slice)))); } stream_global->published_trailing_metadata = true; - grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global, - stream_global); + grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); } if (slice) { gpr_slice_unref(*slice); @@ -1660,8 +1739,7 @@ void grpc_chttp2_mark_stream_closed( GRPC_ERROR_UNREF(error); return; } - grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global, - stream_global); + grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); if (close_reads && !stream_global->read_closed) { stream_global->read_closed_error = GRPC_ERROR_REF(error); stream_global->read_closed = true; @@ -1842,10 +1920,6 @@ static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_error *error) { - if (!grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, NULL)) { - error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, - GRPC_STATUS_UNAVAILABLE); - } close_transport_locked(exec_ctx, t, GRPC_ERROR_REF(error)); end_all_the_calls(exec_ctx, t, error); } @@ -1881,12 +1955,16 @@ static void update_global_window(void *args, uint32_t id, void *stream) { * INPUT PROCESSING - PARSING */ +static void reading_action_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s_unused, void *arg); static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); -static void post_reading_action_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error); -static void post_parse_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error); +static void post_reading_action_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s_unused, void *arg); +static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_chttp2_stream *s_unused, void *arg); static void reading_action(grpc_exec_ctx *exec_ctx, void *tp, grpc_error *error) { @@ -1894,20 +1972,16 @@ static void reading_action(grpc_exec_ctx *exec_ctx, void *tp, reading_action_locked -> (parse_unlocked -> post_parse_locked)? -> post_reading_action_locked */ - GPR_TIMER_BEGIN("reading_action", 0); - grpc_chttp2_transport *t = tp; - grpc_combiner_execute(exec_ctx, t->executor.combiner, - &t->reading_action_locked, GRPC_ERROR_REF(error)); - GPR_TIMER_END("reading_action", 0); + grpc_chttp2_run_with_global_lock(exec_ctx, tp, NULL, reading_action_locked, + GRPC_ERROR_REF(error), 0); } -static void reading_action_locked(grpc_exec_ctx *exec_ctx, void *tp, - grpc_error *error) { - GPR_TIMER_BEGIN("reading_action_locked", 0); - - grpc_chttp2_transport *t = tp; +static void reading_action_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s_unused, void *arg) { grpc_chttp2_transport_global *transport_global = &t->global; grpc_chttp2_transport_parsing *transport_parsing = &t->parsing; + grpc_error *error = arg; GPR_ASSERT(!t->executor.parsing_active); if (!t->closed) { @@ -1916,13 +1990,10 @@ static void reading_action_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_chttp2_stream_map_move_into(&t->new_stream_map, &t->parsing_stream_map); grpc_chttp2_prepare_to_read(transport_global, transport_parsing); - grpc_exec_ctx_sched(exec_ctx, &t->parsing_action, GRPC_ERROR_REF(error), - NULL); + grpc_exec_ctx_sched(exec_ctx, &t->parsing_action, error, NULL); } else { - post_reading_action_locked(exec_ctx, t, error); + post_reading_action_locked(exec_ctx, t, s_unused, arg); } - - GPR_TIMER_END("reading_action_locked", 0); } static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx, @@ -1974,15 +2045,13 @@ static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg, for (i = 0; i < GPR_ARRAY_SIZE(errors); i++) { GRPC_ERROR_UNREF(errors[i]); } - grpc_combiner_execute(exec_ctx, t->executor.combiner, &t->post_parse_locked, - err); GPR_TIMER_END("reading_action.parse", 0); + grpc_chttp2_run_with_global_lock(exec_ctx, t, NULL, post_parse_locked, err, + 0); } -static void post_parse_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - GPR_TIMER_BEGIN("post_parse_locked", 0); - grpc_chttp2_transport *t = arg; +static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_chttp2_stream *s_unused, void *arg) { grpc_chttp2_transport_global *transport_global = &t->global; grpc_chttp2_transport_parsing *transport_parsing = &t->parsing; /* copy parsing qbuf to global qbuf */ @@ -2008,7 +2077,7 @@ static void post_parse_locked(grpc_exec_ctx *exec_ctx, void *arg, if (t->post_parsing_op) { grpc_transport_op *op = t->post_parsing_op; t->post_parsing_op = NULL; - perform_transport_op_locked(exec_ctx, op, GRPC_ERROR_NONE); + perform_transport_op_locked(exec_ctx, t, NULL, op); gpr_free(op); } /* if a stream is in the stream map, and gets cancelled, we need to @@ -2025,32 +2094,39 @@ static void post_parse_locked(grpc_exec_ctx *exec_ctx, void *arg, GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2"); } - post_reading_action_locked(exec_ctx, t, error); - GPR_TIMER_END("post_parse_locked", 0); + post_reading_action_locked(exec_ctx, t, s_unused, arg); } -static void post_reading_action_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - GPR_TIMER_BEGIN("post_reading_action_locked", 0); - grpc_chttp2_transport *t = arg; +static void post_reading_action_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s_unused, + void *arg) { + grpc_error *error = arg; bool keep_reading = false; - GRPC_ERROR_REF(error); if (error == GRPC_ERROR_NONE && t->closed) { error = GRPC_ERROR_CREATE("Transport closed"); } if (error != GRPC_ERROR_NONE) { + if (!grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, NULL)) { + error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, + GRPC_STATUS_UNAVAILABLE); + } drop_connection(exec_ctx, t, GRPC_ERROR_REF(error)); t->endpoint_reading = 0; if (grpc_http_write_state_trace) { gpr_log(GPR_DEBUG, "R:%p -> 0 ws=%s", t, write_state_name(t->executor.write_state)); } + if (t->executor.write_state == GRPC_CHTTP2_WRITING_INACTIVE && t->ep) { + destroy_endpoint(exec_ctx, t); + } } else if (!t->closed) { keep_reading = true; REF_TRANSPORT(t, "keep_reading"); prevent_endpoint_shutdown(t); } gpr_slice_buffer_reset_and_unref(&t->read_buffer); + GRPC_ERROR_UNREF(error); if (keep_reading) { grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, &t->reading_action); @@ -2059,9 +2135,6 @@ static void post_reading_action_locked(grpc_exec_ctx *exec_ctx, void *arg, } else { UNREF_TRANSPORT(exec_ctx, t, "reading_action"); } - GRPC_ERROR_UNREF(error); - - GPR_TIMER_END("post_reading_action_locked", 0); } /******************************************************************************* @@ -2083,16 +2156,36 @@ static void connectivity_state_set( * POLLSET STUFF */ +static void add_to_pollset_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s_unused, void *pollset) { + if (t->ep) { + grpc_endpoint_add_to_pollset(exec_ctx, t->ep, pollset); + } +} + +static void add_to_pollset_set_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s_unused, + void *pollset_set) { + if (t->ep) { + grpc_endpoint_add_to_pollset_set(exec_ctx, t->ep, pollset_set); + } +} + static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_stream *gs, grpc_pollset *pollset) { - grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; - grpc_endpoint_add_to_pollset(exec_ctx, t->ep, pollset); + /* TODO(ctiller): keep pollset alive */ + grpc_chttp2_run_with_global_lock(exec_ctx, (grpc_chttp2_transport *)gt, + (grpc_chttp2_stream *)gs, + add_to_pollset_locked, pollset, 0); } static void set_pollset_set(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_stream *gs, grpc_pollset_set *pollset_set) { - grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; - grpc_endpoint_add_to_pollset_set(exec_ctx, t->ep, pollset_set); + grpc_chttp2_run_with_global_lock(exec_ctx, (grpc_chttp2_transport *)gt, + (grpc_chttp2_stream *)gs, + add_to_pollset_set_locked, pollset_set, 0); } /******************************************************************************* @@ -2104,7 +2197,6 @@ static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx, if (gpr_unref(&bs->refs)) { GRPC_ERROR_UNREF(bs->error); gpr_slice_buffer_destroy(&bs->slices); - gpr_mu_destroy(&bs->slice_mu); gpr_free(bs); } } @@ -2150,34 +2242,38 @@ static void incoming_byte_stream_update_flow_control( } } +typedef struct { + grpc_chttp2_incoming_byte_stream *byte_stream; + gpr_slice *slice; + size_t max_size_hint; + grpc_closure *on_complete; +} incoming_byte_stream_next_arg; + static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx, - void *argp, - grpc_error *error_ignored) { - grpc_chttp2_incoming_byte_stream *bs = argp; + grpc_chttp2_transport *t, + grpc_chttp2_stream *s, + void *argp) { + incoming_byte_stream_next_arg *arg = argp; + grpc_chttp2_incoming_byte_stream *bs = + (grpc_chttp2_incoming_byte_stream *)arg->byte_stream; grpc_chttp2_transport_global *transport_global = &bs->transport->global; grpc_chttp2_stream_global *stream_global = &bs->stream->global; if (bs->is_tail) { - gpr_mu_lock(&bs->slice_mu); - size_t cur_length = bs->slices.length; - gpr_mu_unlock(&bs->slice_mu); - incoming_byte_stream_update_flow_control( - exec_ctx, transport_global, stream_global, - bs->next_action.max_size_hint, cur_length); - } - gpr_mu_lock(&bs->slice_mu); + incoming_byte_stream_update_flow_control(exec_ctx, transport_global, + stream_global, arg->max_size_hint, + bs->slices.length); + } if (bs->slices.count > 0) { - *bs->next_action.slice = gpr_slice_buffer_take_first(&bs->slices); - grpc_exec_ctx_sched(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE, - NULL); + *arg->slice = gpr_slice_buffer_take_first(&bs->slices); + grpc_exec_ctx_sched(exec_ctx, arg->on_complete, GRPC_ERROR_NONE, NULL); } else if (bs->error != GRPC_ERROR_NONE) { - grpc_exec_ctx_sched(exec_ctx, bs->next_action.on_complete, - GRPC_ERROR_REF(bs->error), NULL); + grpc_exec_ctx_sched(exec_ctx, arg->on_complete, GRPC_ERROR_REF(bs->error), + NULL); } else { - bs->on_next = bs->next_action.on_complete; - bs->next = bs->next_action.slice; + bs->on_next = arg->on_complete; + bs->next = arg->slice; } - gpr_mu_unlock(&bs->slice_mu); incoming_byte_stream_unref(exec_ctx, bs); } @@ -2185,18 +2281,13 @@ static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, gpr_slice *slice, size_t max_size_hint, grpc_closure *on_complete) { - GPR_TIMER_BEGIN("incoming_byte_stream_next", 0); grpc_chttp2_incoming_byte_stream *bs = (grpc_chttp2_incoming_byte_stream *)byte_stream; + incoming_byte_stream_next_arg arg = {bs, slice, max_size_hint, on_complete}; gpr_ref(&bs->refs); - bs->next_action.slice = slice; - bs->next_action.max_size_hint = max_size_hint; - bs->next_action.on_complete = on_complete; - grpc_closure_init(&bs->next_action.closure, incoming_byte_stream_next_locked, - bs); - grpc_combiner_execute(exec_ctx, bs->transport->executor.combiner, - &bs->next_action.closure, GRPC_ERROR_NONE); - GPR_TIMER_END("incoming_byte_stream_next", 0); + grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream, + incoming_byte_stream_next_locked, &arg, + sizeof(arg)); return 0; } @@ -2204,8 +2295,9 @@ static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream); static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, - void *byte_stream, - grpc_error *error_ignored) { + grpc_chttp2_transport *t, + grpc_chttp2_stream *s, + void *byte_stream) { grpc_chttp2_incoming_byte_stream *bs = byte_stream; GPR_ASSERT(bs->base.destroy == incoming_byte_stream_destroy); decrement_active_streams_locked(exec_ctx, &bs->transport->global, @@ -2215,14 +2307,10 @@ static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream) { - GPR_TIMER_BEGIN("incoming_byte_stream_destroy", 0); grpc_chttp2_incoming_byte_stream *bs = (grpc_chttp2_incoming_byte_stream *)byte_stream; - grpc_closure_init(&bs->destroy_action, incoming_byte_stream_destroy_locked, - bs); - grpc_combiner_execute(exec_ctx, bs->transport->executor.combiner, - &bs->destroy_action, GRPC_ERROR_NONE); - GPR_TIMER_END("incoming_byte_stream_destroy", 0); + grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream, + incoming_byte_stream_destroy_locked, bs, 0); } typedef struct { @@ -2230,45 +2318,90 @@ typedef struct { gpr_slice slice; } incoming_byte_stream_push_arg; -void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, - grpc_chttp2_incoming_byte_stream *bs, - gpr_slice slice) { - gpr_mu_lock(&bs->slice_mu); +static void incoming_byte_stream_push_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s, + void *argp) { + incoming_byte_stream_push_arg *arg = argp; + grpc_chttp2_incoming_byte_stream *bs = arg->byte_stream; if (bs->on_next != NULL) { - *bs->next = slice; + *bs->next = arg->slice; grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_NONE, NULL); bs->on_next = NULL; } else { - gpr_slice_buffer_add(&bs->slices, slice); + gpr_slice_buffer_add(&bs->slices, arg->slice); } - gpr_mu_unlock(&bs->slice_mu); + incoming_byte_stream_unref(exec_ctx, bs); } -static void incoming_byte_stream_finished_locked(grpc_exec_ctx *exec_ctx, - void *bsp, grpc_error *error) { - grpc_chttp2_incoming_byte_stream *bs = bsp; - if (error != GRPC_ERROR_NONE) { - grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_REF(error), NULL); - bs->on_next = NULL; - GRPC_ERROR_UNREF(bs->error); - bs->error = error; - } +void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, + grpc_chttp2_incoming_byte_stream *bs, + gpr_slice slice) { + incoming_byte_stream_push_arg arg = {bs, slice}; + gpr_ref(&bs->refs); + grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream, + incoming_byte_stream_push_locked, &arg, + sizeof(arg)); +} + +typedef struct { + grpc_chttp2_incoming_byte_stream *bs; + grpc_error *error; +} bs_fail_args; + +static bs_fail_args *make_bs_fail_args(grpc_chttp2_incoming_byte_stream *bs, + grpc_error *error) { + bs_fail_args *a = gpr_malloc(sizeof(*a)); + a->bs = bs; + a->error = error; + return a; +} + +static void incoming_byte_stream_finished_failed_locked( + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, + void *argp) { + bs_fail_args *a = argp; + grpc_chttp2_incoming_byte_stream *bs = a->bs; + grpc_error *error = a->error; + gpr_free(a); + grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_REF(error), NULL); + bs->on_next = NULL; + GRPC_ERROR_UNREF(bs->error); + bs->error = error; + incoming_byte_stream_unref(exec_ctx, bs); +} + +static void incoming_byte_stream_finished_ok_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s, + void *argp) { + grpc_chttp2_incoming_byte_stream *bs = argp; incoming_byte_stream_unref(exec_ctx, bs); } void grpc_chttp2_incoming_byte_stream_finished( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, grpc_error *error, int from_parsing_thread) { - GPR_TIMER_BEGIN("grpc_chttp2_incoming_byte_stream_finished", 0); if (from_parsing_thread) { - grpc_closure_init(&bs->finished_action, - incoming_byte_stream_finished_locked, bs); - grpc_combiner_execute(exec_ctx, bs->transport->executor.combiner, - &bs->finished_action, GRPC_ERROR_REF(error)); + if (error == GRPC_ERROR_NONE) { + grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream, + incoming_byte_stream_finished_ok_locked, + bs, 0); + } else { + grpc_chttp2_run_with_global_lock( + exec_ctx, bs->transport, bs->stream, + incoming_byte_stream_finished_failed_locked, + make_bs_fail_args(bs, error), 0); + } } else { - incoming_byte_stream_finished_locked(exec_ctx, bs, error); + if (error == GRPC_ERROR_NONE) { + incoming_byte_stream_finished_ok_locked(exec_ctx, bs->transport, + bs->stream, bs); + } else { + incoming_byte_stream_finished_failed_locked( + exec_ctx, bs->transport, bs->stream, make_bs_fail_args(bs, error)); + } } - GPR_TIMER_END("grpc_chttp2_incoming_byte_stream_finished", 0); } grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( @@ -2281,7 +2414,6 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( incoming_byte_stream->base.flags = flags; incoming_byte_stream->base.next = incoming_byte_stream_next; incoming_byte_stream->base.destroy = incoming_byte_stream_destroy; - gpr_mu_init(&incoming_byte_stream->slice_mu); gpr_ref_init(&incoming_byte_stream->refs, 2); incoming_byte_stream->next_message = NULL; incoming_byte_stream->transport = TRANSPORT_FROM_PARSING(transport_parsing); diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 04b788b702..d67c014e54 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -48,7 +48,6 @@ #include "src/core/ext/transport/chttp2/transport/hpack_parser.h" #include "src/core/ext/transport/chttp2/transport/incoming_metadata.h" #include "src/core/ext/transport/chttp2/transport/stream_map.h" -#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/transport_impl.h" @@ -162,20 +161,9 @@ struct grpc_chttp2_incoming_byte_stream { grpc_chttp2_transport *transport; grpc_chttp2_stream *stream; int is_tail; - - gpr_mu slice_mu; // protects slices, on_next gpr_slice_buffer slices; grpc_closure *on_next; gpr_slice *next; - - struct { - grpc_closure closure; - gpr_slice *slice; - size_t max_size_hint; - grpc_closure *on_complete; - } next_action; - grpc_closure destroy_action; - grpc_closure finished_action; }; typedef struct { @@ -308,11 +296,23 @@ struct grpc_chttp2_transport_parsing { int64_t outgoing_window; }; +typedef void (*grpc_chttp2_locked_action)(grpc_exec_ctx *ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s, void *arg); + +typedef struct grpc_chttp2_executor_action_header { + grpc_chttp2_stream *stream; + grpc_chttp2_locked_action action; + struct grpc_chttp2_executor_action_header *next; + void *arg; +} grpc_chttp2_executor_action_header; + typedef enum { - /** no writing activity allowed */ - GRPC_CHTTP2_WRITES_CORKED, /** no writing activity */ GRPC_CHTTP2_WRITING_INACTIVE, + /** write has been requested, but not scheduled yet */ + GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER, + GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER, /** write has been requested and scheduled against the workqueue */ GRPC_CHTTP2_WRITE_SCHEDULED, /** write has been initiated after being reaped from the workqueue */ @@ -333,7 +333,7 @@ struct grpc_chttp2_transport { gpr_refcount shutdown_ep_refs; struct { - grpc_combiner *combiner; + gpr_mu mu; /** is a thread currently in the global lock */ bool global_active; @@ -341,8 +341,9 @@ struct grpc_chttp2_transport { bool parsing_active; /** write execution state of the transport */ grpc_chttp2_write_state write_state; - /** has a check_read_ops been scheduled */ - bool check_read_ops_scheduled; + + grpc_chttp2_executor_action_header *pending_actions_head; + grpc_chttp2_executor_action_header *pending_actions_tail; } executor; /** is the transport destroying itself? */ @@ -379,16 +380,10 @@ struct grpc_chttp2_transport { grpc_closure writing_action; /** closure to start reading from the endpoint */ grpc_closure reading_action; - grpc_closure reading_action_locked; - grpc_closure post_parse_locked; /** closure to actually do parsing */ grpc_closure parsing_action; /** closure to initiate writing */ grpc_closure initiate_writing; - /** closure to finish writing */ - grpc_closure terminate_writing; - /** closure to flush read state up the stack */ - grpc_closure initiate_read_flush_locked; /** incoming read bytes */ gpr_slice_buffer read_buffer; @@ -532,16 +527,11 @@ struct grpc_chttp2_stream_parsing { }; struct grpc_chttp2_stream { - grpc_chttp2_transport *t; grpc_stream_refcount *refcount; grpc_chttp2_stream_global global; grpc_chttp2_stream_writing writing; grpc_chttp2_stream_parsing parsing; - grpc_closure init_stream; - grpc_closure destroy_stream; - void *destroy_stream_arg; - grpc_chttp2_stream_link links[STREAM_LIST_COUNT]; uint8_t included[STREAM_LIST_COUNT]; }; @@ -636,7 +626,7 @@ int grpc_chttp2_list_pop_waiting_for_concurrency( grpc_chttp2_stream_global **stream_global); void grpc_chttp2_list_add_check_read_ops( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, + grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global); bool grpc_chttp2_list_remove_check_read_ops( grpc_chttp2_transport_global *transport_global, @@ -716,6 +706,12 @@ void grpc_chttp2_complete_closure_step( grpc_chttp2_stream_global *stream_global, grpc_closure **pclosure, grpc_error *error); +void grpc_chttp2_run_with_global_lock(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *transport, + grpc_chttp2_stream *optional_stream, + grpc_chttp2_locked_action action, + void *arg, size_t sizeof_arg); + #define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" #define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \ (sizeof(GRPC_CHTTP2_CLIENT_CONNECT_STRING) - 1) diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c index 0e6d579ba9..482cd55c44 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.c +++ b/src/core/ext/transport/chttp2/transport/parsing.c @@ -177,8 +177,7 @@ void grpc_chttp2_publish_reads( stream_global->seen_error = true; stream_global->exceeded_metadata_size = stream_parsing->exceeded_metadata_size; - grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global, - stream_global); + grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); } /* flush stats to global stream state */ @@ -204,8 +203,7 @@ void grpc_chttp2_publish_reads( stream_global->incoming_frames.tail->is_tail = 0; } if (stream_parsing->data_parser.incoming_frames.head != NULL) { - grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global, - stream_global); + grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); } grpc_chttp2_incoming_frame_queue_merge( &stream_global->incoming_frames, @@ -221,8 +219,7 @@ void grpc_chttp2_publish_reads( GPR_SWAP(grpc_chttp2_incoming_metadata_buffer, stream_parsing->metadata_buffer[0], stream_global->received_initial_metadata); - grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global, - stream_global); + grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); } if (!stream_global->published_trailing_metadata && stream_parsing->got_metadata_on_parse[1]) { @@ -231,8 +228,7 @@ void grpc_chttp2_publish_reads( GPR_SWAP(grpc_chttp2_incoming_metadata_buffer, stream_parsing->metadata_buffer[1], stream_global->received_trailing_metadata); - grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global, - stream_global); + grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); } if (stream_parsing->forced_close_error != GRPC_ERROR_NONE) { diff --git a/src/core/ext/transport/chttp2/transport/stream_lists.c b/src/core/ext/transport/chttp2/transport/stream_lists.c index 4dc4968248..2eb5f5f632 100644 --- a/src/core/ext/transport/chttp2/transport/stream_lists.c +++ b/src/core/ext/transport/chttp2/transport/stream_lists.c @@ -298,15 +298,8 @@ int grpc_chttp2_list_pop_waiting_for_concurrency( } void grpc_chttp2_list_add_check_read_ops( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, + grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global) { - grpc_chttp2_transport *t = TRANSPORT_FROM_GLOBAL(transport_global); - if (!t->executor.check_read_ops_scheduled) { - grpc_combiner_execute_finally(exec_ctx, t->executor.combiner, - &t->initiate_read_flush_locked, - GRPC_ERROR_NONE, false); - t->executor.check_read_ops_scheduled = true; - } stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_CHECK_READ_OPS); diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index 979515bd54..311b26e354 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -55,6 +55,15 @@ int grpc_chttp2_unlocking_check_writes( transport_global->settings[GRPC_ACKED_SETTINGS] [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE]; + /* simple writes are queued to qbuf, and flushed here */ + gpr_slice_buffer_swap(&transport_global->qbuf, &transport_writing->outbuf); + GPR_ASSERT(transport_global->qbuf.count == 0); + + grpc_chttp2_hpack_compressor_set_max_table_size( + &transport_writing->hpack_compressor, + transport_global->settings[GRPC_PEER_SETTINGS] + [GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]); + if (transport_global->dirtied_local_settings && !transport_global->sent_local_settings) { gpr_slice_buffer_add( @@ -68,16 +77,6 @@ int grpc_chttp2_unlocking_check_writes( transport_global->sent_local_settings = 1; } - /* simple writes are queued to qbuf, and flushed here */ - gpr_slice_buffer_move_into(&transport_global->qbuf, - &transport_writing->outbuf); - GPR_ASSERT(transport_global->qbuf.count == 0); - - grpc_chttp2_hpack_compressor_set_max_table_size( - &transport_writing->hpack_compressor, - transport_global->settings[GRPC_PEER_SETTINGS] - [GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]); - GRPC_CHTTP2_FLOW_MOVE_TRANSPORT("write", transport_writing, outgoing_window, transport_global, outgoing_window); if (transport_writing->outgoing_window > 0) { @@ -345,7 +344,6 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx, void grpc_chttp2_cleanup_writing( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing) { - GPR_TIMER_BEGIN("grpc_chttp2_cleanup_writing", 0); grpc_chttp2_stream_writing *stream_writing; grpc_chttp2_stream_global *stream_global; @@ -384,5 +382,4 @@ void grpc_chttp2_cleanup_writing( GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2_writing"); } gpr_slice_buffer_reset_and_unref(&transport_writing->outbuf); - GPR_TIMER_END("grpc_chttp2_cleanup_writing", 0); } diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c index 0655b9353f..98f304f2da 100644 --- a/src/core/lib/channel/channel_stack.c +++ b/src/core/lib/channel/channel_stack.c @@ -32,7 +32,6 @@ */ #include "src/core/lib/channel/channel_stack.h" -#include #include #include @@ -271,27 +270,21 @@ grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem) { sizeof(grpc_call_stack))); } -static void destroy_op(grpc_exec_ctx *exec_ctx, void *op, grpc_error *error) { - gpr_free(op); -} - void grpc_call_element_send_cancel(grpc_exec_ctx *exec_ctx, grpc_call_element *cur_elem) { - grpc_transport_stream_op *op = gpr_malloc(sizeof(*op)); - memset(op, 0, sizeof(*op)); - op->cancel_error = GRPC_ERROR_CANCELLED; - op->on_complete = grpc_closure_create(destroy_op, op); - grpc_call_next_op(exec_ctx, cur_elem, op); + grpc_transport_stream_op op; + memset(&op, 0, sizeof(op)); + op.cancel_error = GRPC_ERROR_CANCELLED; + grpc_call_next_op(exec_ctx, cur_elem, &op); } void grpc_call_element_send_cancel_with_message(grpc_exec_ctx *exec_ctx, grpc_call_element *cur_elem, grpc_status_code status, gpr_slice *optional_message) { - grpc_transport_stream_op *op = gpr_malloc(sizeof(*op)); - memset(op, 0, sizeof(*op)); - op->on_complete = grpc_closure_create(destroy_op, op); - grpc_transport_stream_op_add_cancellation_with_message(op, status, + grpc_transport_stream_op op; + memset(&op, 0, sizeof(op)); + grpc_transport_stream_op_add_cancellation_with_message(&op, status, optional_message); - grpc_call_next_op(exec_ctx, cur_elem, op); + grpc_call_next_op(exec_ctx, cur_elem, &op); } diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c index 0981d59f63..134180e619 100644 --- a/src/core/lib/channel/compress_filter.c +++ b/src/core/lib/channel/compress_filter.c @@ -60,7 +60,7 @@ typedef struct call_data { /** If true, contents of \a compression_algorithm are authoritative */ int has_compression_algorithm; - grpc_transport_stream_op *send_op; + grpc_transport_stream_op send_op; uint32_t send_length; uint32_t send_flags; gpr_slice incoming_slice; @@ -199,11 +199,11 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices, calld->send_flags); - calld->send_op->send_message = &calld->replacement_stream.base; - calld->post_send = calld->send_op->on_complete; - calld->send_op->on_complete = &calld->send_done; + calld->send_op.send_message = &calld->replacement_stream.base; + calld->post_send = calld->send_op.on_complete; + calld->send_op.on_complete = &calld->send_done; - grpc_call_next_op(exec_ctx, elem, calld->send_op); + grpc_call_next_op(exec_ctx, elem, &calld->send_op); } static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { @@ -220,7 +220,7 @@ static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { static void continue_send_message(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { call_data *calld = elem->call_data; - while (grpc_byte_stream_next(exec_ctx, calld->send_op->send_message, + while (grpc_byte_stream_next(exec_ctx, calld->send_op.send_message, &calld->incoming_slice, ~(size_t)0, &calld->got_slice)) { gpr_slice_buffer_add(&calld->slices, calld->incoming_slice); @@ -243,7 +243,7 @@ static void compress_start_transport_stream_op(grpc_exec_ctx *exec_ctx, } if (op->send_message != NULL && !skip_compression(elem) && 0 == (op->send_message->flags & GRPC_WRITE_NO_COMPRESS)) { - calld->send_op = op; + calld->send_op = *op; calld->send_length = op->send_message->length; calld->send_flags = op->send_message->flags; continue_send_message(exec_ctx, elem); diff --git a/src/core/lib/iomgr/closure.c b/src/core/lib/iomgr/closure.c index 1ba0a5c141..0b6c3b2539 100644 --- a/src/core/lib/iomgr/closure.c +++ b/src/core/lib/iomgr/closure.c @@ -41,10 +41,6 @@ void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb, closure->cb_arg = cb_arg; } -void grpc_closure_list_init(grpc_closure_list *closure_list) { - closure_list->head = closure_list->tail = NULL; -} - void grpc_closure_list_append(grpc_closure_list *closure_list, grpc_closure *closure, grpc_error *error) { if (closure == NULL) { diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h index c1a22b6021..08e59a168e 100644 --- a/src/core/lib/iomgr/closure.h +++ b/src/core/lib/iomgr/closure.h @@ -37,7 +37,6 @@ #include #include #include "src/core/lib/iomgr/error.h" -#include "src/core/lib/support/mpscq.h" struct grpc_closure; typedef struct grpc_closure grpc_closure; @@ -61,14 +60,6 @@ typedef void (*grpc_iomgr_cb_func)(grpc_exec_ctx *exec_ctx, void *arg, /** A closure over a grpc_iomgr_cb_func. */ struct grpc_closure { - /** Once queued, next indicates the next queued closure; before then, scratch - * space */ - union { - grpc_closure *next; - gpr_mpscq_node atm_next; - uintptr_t scratch; - } next_data; - /** Bound callback. */ grpc_iomgr_cb_func cb; @@ -77,6 +68,13 @@ struct grpc_closure { /** Once queued, the result of the closure. Before then: scratch space */ grpc_error *error; + + /** Once queued, next indicates the next queued closure; before then, scratch + * space */ + union { + grpc_closure *next; + uintptr_t scratch; + } next_data; }; /** Initializes \a closure with \a cb and \a cb_arg. */ @@ -89,8 +87,6 @@ grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg); #define GRPC_CLOSURE_LIST_INIT \ { NULL, NULL } -void grpc_closure_list_init(grpc_closure_list *list); - /** add \a closure to the end of \a list and set \a closure's result to \a error */ void grpc_closure_list_append(grpc_closure_list *list, grpc_closure *closure, diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c deleted file mode 100644 index 831bdb4aff..0000000000 --- a/src/core/lib/iomgr/combiner.c +++ /dev/null @@ -1,293 +0,0 @@ -/* - * - * Copyright 2016, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include "src/core/lib/iomgr/combiner.h" - -#include - -#include -#include - -#include "src/core/lib/iomgr/workqueue.h" -#include "src/core/lib/profiling/timers.h" - -int grpc_combiner_trace = 0; - -#define GRPC_COMBINER_TRACE(fn) \ - do { \ - if (grpc_combiner_trace) { \ - fn; \ - } \ - } while (0) - -struct grpc_combiner { - grpc_workqueue *optional_workqueue; - gpr_mpscq queue; - // state is: - // lower bit - zero if orphaned - // other bits - number of items queued on the lock - gpr_atm state; - bool take_async_break_before_final_list; - grpc_closure_list final_list; - grpc_closure continue_finishing; -}; - -grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) { - grpc_combiner *lock = gpr_malloc(sizeof(*lock)); - lock->optional_workqueue = optional_workqueue; - gpr_atm_no_barrier_store(&lock->state, 1); - gpr_mpscq_init(&lock->queue); - lock->take_async_break_before_final_list = false; - grpc_closure_list_init(&lock->final_list); - GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p create", lock)); - return lock; -} - -static void really_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { - GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p really_destroy", lock)); - GPR_ASSERT(gpr_atm_no_barrier_load(&lock->state) == 0); - gpr_mpscq_destroy(&lock->queue); - GRPC_WORKQUEUE_UNREF(exec_ctx, lock->optional_workqueue, "combiner"); - gpr_free(lock); -} - -void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { - gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -1); - GRPC_COMBINER_TRACE(gpr_log( - GPR_DEBUG, "C:%p really_destroy old_state=%" PRIdPTR, lock, old_state)); - if (old_state == 1) { - really_destroy(exec_ctx, lock); - } -} - -static bool maybe_finish_one(grpc_exec_ctx *exec_ctx, grpc_combiner *lock); -static void finish(grpc_exec_ctx *exec_ctx, grpc_combiner *lock); - -static void continue_finishing_mainline(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - GPR_TIMER_BEGIN("combiner.continue_executing_mainline", 0); - grpc_combiner *lock = arg; - GRPC_COMBINER_TRACE( - gpr_log(GPR_DEBUG, "C:%p continue_finishing_mainline", lock)); - GPR_ASSERT(exec_ctx->active_combiner == NULL); - exec_ctx->active_combiner = lock; - if (maybe_finish_one(exec_ctx, lock)) finish(exec_ctx, lock); - GPR_ASSERT(exec_ctx->active_combiner == lock); - exec_ctx->active_combiner = NULL; - GPR_TIMER_END("combiner.continue_executing_mainline", 0); -} - -static void execute_final(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { - GPR_TIMER_BEGIN("combiner.execute_final", 0); - grpc_closure *c = lock->final_list.head; - GPR_ASSERT(c != NULL); - grpc_closure_list_init(&lock->final_list); - lock->take_async_break_before_final_list = false; - int loops = 0; - while (c != NULL) { - GRPC_COMBINER_TRACE( - gpr_log(GPR_DEBUG, "C:%p execute_final[%d] c=%p", lock, loops, c)); - grpc_closure *next = c->next_data.next; - grpc_error *error = c->error; - c->cb(exec_ctx, c->cb_arg, error); - GRPC_ERROR_UNREF(error); - c = next; - loops++; - } - GPR_TIMER_END("combiner.execute_final", 0); -} - -static void continue_executing_final(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - GPR_TIMER_BEGIN("combiner.continue_executing_final", 0); - grpc_combiner *lock = arg; - GRPC_COMBINER_TRACE( - gpr_log(GPR_DEBUG, "C:%p continue_executing_final", lock)); - GPR_ASSERT(exec_ctx->active_combiner == NULL); - exec_ctx->active_combiner = lock; - // quick peek to see if new things have turned up on the queue: if so, go back - // to executing them before the final list - if ((gpr_atm_acq_load(&lock->state) >> 1) > 1) { - if (maybe_finish_one(exec_ctx, lock)) finish(exec_ctx, lock); - } else { - execute_final(exec_ctx, lock); - finish(exec_ctx, lock); - } - GPR_ASSERT(exec_ctx->active_combiner == lock); - exec_ctx->active_combiner = NULL; - GPR_TIMER_END("combiner.continue_executing_final", 0); -} - -static bool start_execute_final(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { - GPR_TIMER_BEGIN("combiner.start_execute_final", 0); - GPR_ASSERT(exec_ctx->active_combiner == lock); - GRPC_COMBINER_TRACE( - gpr_log(GPR_DEBUG, - "C:%p start_execute_final take_async_break_before_final_list=%d", - lock, lock->take_async_break_before_final_list)); - if (lock->take_async_break_before_final_list) { - grpc_closure_init(&lock->continue_finishing, continue_executing_final, - lock); - grpc_exec_ctx_sched(exec_ctx, &lock->continue_finishing, GRPC_ERROR_NONE, - GRPC_WORKQUEUE_REF(lock->optional_workqueue, "sched")); - GPR_TIMER_END("combiner.start_execute_final", 0); - return false; - } else { - execute_final(exec_ctx, lock); - GPR_TIMER_END("combiner.start_execute_final", 0); - return true; - } -} - -static bool maybe_finish_one(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { - GPR_TIMER_BEGIN("combiner.maybe_finish_one", 0); - gpr_mpscq_node *n = gpr_mpscq_pop(&lock->queue); - GRPC_COMBINER_TRACE( - gpr_log(GPR_DEBUG, "C:%p maybe_finish_one n=%p", lock, n)); - GPR_ASSERT(exec_ctx->active_combiner == lock); - if (n == NULL) { - // Queue is in an transiently inconsistent state: a new item is being queued - // but is not visible to this thread yet. - // Use this as a cue that we should go off and do something else for a while - // (and come back later) - grpc_closure_init(&lock->continue_finishing, continue_finishing_mainline, - lock); - grpc_exec_ctx_sched(exec_ctx, &lock->continue_finishing, GRPC_ERROR_NONE, - GRPC_WORKQUEUE_REF(lock->optional_workqueue, "sched")); - GPR_TIMER_END("combiner.maybe_finish_one", 0); - return false; - } - grpc_closure *cl = (grpc_closure *)n; - grpc_error *error = cl->error; - cl->cb(exec_ctx, cl->cb_arg, error); - GRPC_ERROR_UNREF(error); - GPR_TIMER_END("combiner.maybe_finish_one", 0); - return true; -} - -static void finish(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { - bool (*executor)(grpc_exec_ctx * exec_ctx, grpc_combiner * lock); - GPR_TIMER_BEGIN("combiner.finish", 0); - int loops = 0; - do { - executor = maybe_finish_one; - gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -2); - GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, - "C:%p finish[%d] old_state=%" PRIdPTR, lock, - loops, old_state)); - switch (old_state) { - default: - // we have multiple queued work items: just continue executing them - break; - case 5: // we're down to one queued item: if it's the final list we - case 4: // should do that - if (!grpc_closure_list_empty(lock->final_list)) { - executor = start_execute_final; - } - break; - case 3: // had one count, one unorphaned --> unlocked unorphaned - GPR_TIMER_END("combiner.finish", 0); - return; - case 2: // and one count, one orphaned --> unlocked and orphaned - really_destroy(exec_ctx, lock); - GPR_TIMER_END("combiner.finish", 0); - return; - case 1: - case 0: - // these values are illegal - representing an already unlocked or - // deleted lock - GPR_UNREACHABLE_CODE(return ); - } - loops++; - } while (executor(exec_ctx, lock)); - GPR_TIMER_END("combiner.finish", 0); -} - -void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, - grpc_closure *cl, grpc_error *error) { - GRPC_COMBINER_TRACE( - gpr_log(GPR_DEBUG, "C:%p grpc_combiner_execute c=%p", lock, cl)); - GPR_TIMER_BEGIN("combiner.execute", 0); - gpr_atm last = gpr_atm_full_fetch_add(&lock->state, 2); - GPR_ASSERT(last & 1); // ensure lock has not been destroyed - if (last == 1) { - exec_ctx->active_combiner = lock; - GPR_TIMER_BEGIN("combiner.execute_first_cb", 0); - cl->cb(exec_ctx, cl->cb_arg, error); - GPR_TIMER_END("combiner.execute_first_cb", 0); - GRPC_ERROR_UNREF(error); - finish(exec_ctx, lock); - GPR_ASSERT(exec_ctx->active_combiner == lock); - exec_ctx->active_combiner = NULL; - } else { - cl->error = error; - gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next); - } - GPR_TIMER_END("combiner.execute", 0); -} - -static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure, - grpc_error *error) { - grpc_combiner_execute_finally(exec_ctx, exec_ctx->active_combiner, closure, - GRPC_ERROR_REF(error), false); -} - -void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, - grpc_closure *closure, grpc_error *error, - bool force_async_break) { - GRPC_COMBINER_TRACE(gpr_log( - GPR_DEBUG, - "C:%p grpc_combiner_execute_finally c=%p force_async_break=%d; ac=%p", - lock, closure, force_async_break, exec_ctx->active_combiner)); - GPR_TIMER_BEGIN("combiner.execute_finally", 0); - if (exec_ctx->active_combiner != lock) { - GPR_TIMER_MARK("slowpath", 0); - grpc_combiner_execute(exec_ctx, lock, - grpc_closure_create(enqueue_finally, closure), error); - GPR_TIMER_END("combiner.execute_finally", 0); - return; - } - - if (force_async_break) { - lock->take_async_break_before_final_list = true; - } - if (grpc_closure_list_empty(lock->final_list)) { - gpr_atm_full_fetch_add(&lock->state, 2); - } - grpc_closure_list_append(&lock->final_list, closure, error); - GPR_TIMER_END("combiner.execute_finally", 0); -} - -void grpc_combiner_force_async_finally(grpc_combiner *lock) { - lock->take_async_break_before_final_list = true; -} diff --git a/src/core/lib/iomgr/combiner.h b/src/core/lib/iomgr/combiner.h deleted file mode 100644 index 1409db24b9..0000000000 --- a/src/core/lib/iomgr/combiner.h +++ /dev/null @@ -1,71 +0,0 @@ -/* - * - * Copyright 2016, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPC_CORE_LIB_IOMGR_COMBINER_H -#define GRPC_CORE_LIB_IOMGR_COMBINER_H - -#include - -#include -#include "src/core/lib/iomgr/exec_ctx.h" -#include "src/core/lib/support/mpscq.h" - -// Provides serialized access to some resource. -// Each action queued on a combiner is executed serially in a borrowed thread. -// The actual thread executing actions may change over time (but there will only -// every be one at a time). - -// Initialize the lock, with an optional workqueue to shift load to when -// necessary -grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue); -// Destroy the lock -void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock); -// Execute \a action within the lock. -void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, - grpc_closure *closure, grpc_error *error); -// Execute \a action within the lock just prior to unlocking. -// if \a hint_async_break is true, the combiner tries to hand execution to -// another thread before finishing the primary queue of combined closures and -// executing the finally list. -// Deprecation warning: \a hint_async_break will be removed in a future version -// Takes a very slow and round-about path if not called from a -// grpc_combiner_execute closure. -void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, - grpc_closure *closure, grpc_error *error, - bool hint_async_break); -// Deprecated: force the finally list execution onto another thread -void grpc_combiner_force_async_finally(grpc_combiner *lock); - -extern int grpc_combiner_trace; - -#endif /* GRPC_CORE_LIB_IOMGR_COMBINER_H */ diff --git a/src/core/lib/iomgr/error.c b/src/core/lib/iomgr/error.c index e366961936..149c55663c 100644 --- a/src/core/lib/iomgr/error.c +++ b/src/core/lib/iomgr/error.c @@ -332,7 +332,7 @@ grpc_error *grpc_error_add_child(grpc_error *src, grpc_error *child) { return new; } -static const char *no_error_string = "\"No Error\""; +static const char *no_error_string = "null"; static const char *oom_error_string = "\"Out of memory\""; static const char *cancelled_error_string = "\"Cancelled\""; diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index 740920d760..eba347125e 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -1531,8 +1531,6 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd *fd) { - GPR_TIMER_BEGIN("pollset_add_fd", 0); - grpc_error *error = GRPC_ERROR_NONE; gpr_mu_lock(&pollset->mu); @@ -1645,8 +1643,6 @@ retry: gpr_mu_unlock(&pollset->mu); GRPC_LOG_IF_ERROR("pollset_add_fd", error); - - GPR_TIMER_END("pollset_add_fd", 0); } /******************************************************************************* diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h index 1895ee6245..917f332f03 100644 --- a/src/core/lib/iomgr/exec_ctx.h +++ b/src/core/lib/iomgr/exec_ctx.h @@ -40,8 +40,8 @@ /** A workqueue represents a list of work to be executed asynchronously. Forward declared here to avoid a circular dependency with workqueue.h. */ +struct grpc_workqueue; typedef struct grpc_workqueue grpc_workqueue; -typedef struct grpc_combiner grpc_combiner; #ifndef GRPC_EXECUTION_CONTEXT_SANITIZER /** Execution context. @@ -66,15 +66,13 @@ typedef struct grpc_combiner grpc_combiner; */ struct grpc_exec_ctx { grpc_closure_list closure_list; - /** currently active combiner: updated only via combiner.c */ - grpc_combiner *active_combiner; bool cached_ready_to_finish; void *check_ready_to_finish_arg; bool (*check_ready_to_finish)(grpc_exec_ctx *exec_ctx, void *arg); }; #define GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(finish_check, finish_check_arg) \ - { GRPC_CLOSURE_LIST_INIT, NULL, false, finish_check_arg, finish_check } + { GRPC_CLOSURE_LIST_INIT, false, finish_check_arg, finish_check } #else struct grpc_exec_ctx { bool cached_ready_to_finish; diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index 92767721d5..974d5ae479 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -379,19 +379,10 @@ static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, } if (!tcp_flush(tcp, &error)) { - if (grpc_tcp_trace) { - gpr_log(GPR_DEBUG, "write: delayed"); - } grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure); } else { cb = tcp->write_cb; tcp->write_cb = NULL; - if (grpc_tcp_trace) { - const char *str = grpc_error_string(error); - gpr_log(GPR_DEBUG, "write: %s", str); - grpc_error_free_string(str); - } - GPR_TIMER_BEGIN("tcp_handle_write.cb", 0); cb->cb(exec_ctx, cb->cb_arg, error); GPR_TIMER_END("tcp_handle_write.cb", 0); @@ -434,16 +425,8 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, if (!tcp_flush(tcp, &error)) { TCP_REF(tcp, "write"); tcp->write_cb = cb; - if (grpc_tcp_trace) { - gpr_log(GPR_DEBUG, "write: delayed"); - } grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure); } else { - if (grpc_tcp_trace) { - const char *str = grpc_error_string(error); - gpr_log(GPR_DEBUG, "write: %s", str); - grpc_error_free_string(str); - } grpc_exec_ctx_sched(exec_ctx, cb, error, NULL); } diff --git a/src/core/lib/iomgr/workqueue.h b/src/core/lib/iomgr/workqueue.h index b2805dc66c..7156e490d7 100644 --- a/src/core/lib/iomgr/workqueue.h +++ b/src/core/lib/iomgr/workqueue.h @@ -50,6 +50,10 @@ /* grpc_workqueue is forward declared in exec_ctx.h */ +/* Deprecated: do not use. + This has *already* been removed in a future commit. */ +void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue); + /* Reference counting functions. Use the macro's always (GRPC_WORKQUEUE_{REF,UNREF}). diff --git a/src/core/lib/iomgr/workqueue_posix.c b/src/core/lib/iomgr/workqueue_posix.c index ecfea68f56..e0d6dac230 100644 --- a/src/core/lib/iomgr/workqueue_posix.c +++ b/src/core/lib/iomgr/workqueue_posix.c @@ -44,7 +44,6 @@ #include #include "src/core/lib/iomgr/ev_posix.h" -#include "src/core/lib/profiling/timers.h" static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); @@ -53,7 +52,8 @@ grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx, char name[32]; *workqueue = gpr_malloc(sizeof(grpc_workqueue)); gpr_ref_init(&(*workqueue)->refs, 1); - gpr_atm_no_barrier_store(&(*workqueue)->state, 1); + gpr_mu_init(&(*workqueue)->mu); + (*workqueue)->closure_list.head = (*workqueue)->closure_list.tail = NULL; grpc_error *err = grpc_wakeup_fd_init(&(*workqueue)->wakeup_fd); if (err != GRPC_ERROR_NONE) { gpr_free(*workqueue); @@ -62,7 +62,6 @@ grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx, sprintf(name, "workqueue:%p", (void *)(*workqueue)); (*workqueue)->wakeup_read_fd = grpc_fd_create( GRPC_WAKEUP_FD_GET_READ_FD(&(*workqueue)->wakeup_fd), name); - gpr_mpscq_init(&(*workqueue)->queue); grpc_closure_init(&(*workqueue)->read_closure, on_readable, *workqueue); grpc_fd_notify_on_read(exec_ctx, (*workqueue)->wakeup_read_fd, &(*workqueue)->read_closure); @@ -71,79 +70,57 @@ grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx, static void workqueue_destroy(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { + grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL); grpc_fd_shutdown(exec_ctx, workqueue->wakeup_read_fd); } -static void workqueue_orphan(grpc_exec_ctx *exec_ctx, - grpc_workqueue *workqueue) { - if (gpr_atm_full_fetch_add(&workqueue->state, -1) == 1) { - workqueue_destroy(exec_ctx, workqueue); - } -} - #ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line, const char *reason) { - if (workqueue == NULL) return; gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "WORKQUEUE:%p ref %d -> %d %s", workqueue, (int)workqueue->refs.count, (int)workqueue->refs.count + 1, reason); - gpr_ref(&workqueue->refs); -} #else void grpc_workqueue_ref(grpc_workqueue *workqueue) { - if (workqueue == NULL) return; +#endif gpr_ref(&workqueue->refs); } -#endif #ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, const char *file, int line, const char *reason) { - if (workqueue == NULL) return; gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "WORKQUEUE:%p unref %d -> %d %s", workqueue, (int)workqueue->refs.count, (int)workqueue->refs.count - 1, reason); - if (gpr_unref(&workqueue->refs)) { - workqueue_orphan(exec_ctx, workqueue); - } -} #else void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { - if (workqueue == NULL) return; +#endif if (gpr_unref(&workqueue->refs)) { - workqueue_orphan(exec_ctx, workqueue); + workqueue_destroy(exec_ctx, workqueue); } } -#endif - -static void drain(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { - abort(); -} -static void wakeup(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { - GPR_TIMER_MARK("workqueue.wakeup", 0); - grpc_error *err = grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd); - if (!GRPC_LOG_IF_ERROR("wakeupfd_wakeup", err)) { - drain(exec_ctx, workqueue); - } +void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { + gpr_mu_lock(&workqueue->mu); + grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL); + gpr_mu_unlock(&workqueue->mu); } static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - GPR_TIMER_BEGIN("workqueue.on_readable", 0); - grpc_workqueue *workqueue = arg; if (error != GRPC_ERROR_NONE) { + gpr_mu_destroy(&workqueue->mu); /* HACK: let wakeup_fd code know that we stole the fd */ workqueue->wakeup_fd.read_fd = 0; grpc_wakeup_fd_destroy(&workqueue->wakeup_fd); grpc_fd_orphan(exec_ctx, workqueue->wakeup_read_fd, NULL, NULL, "destroy"); - GPR_ASSERT(gpr_atm_no_barrier_load(&workqueue->state) == 0); gpr_free(workqueue); } else { + gpr_mu_lock(&workqueue->mu); + grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL); error = grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd); - gpr_mpscq_node *n = gpr_mpscq_pop(&workqueue->queue); + gpr_mu_unlock(&workqueue->mu); if (error == GRPC_ERROR_NONE) { grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd, &workqueue->read_closure); @@ -151,46 +128,24 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { /* recurse to get error handling */ on_readable(exec_ctx, arg, error); } - if (n == NULL) { - /* try again - queue in an inconsistant state */ - wakeup(exec_ctx, workqueue); - } else { - switch (gpr_atm_full_fetch_add(&workqueue->state, -2)) { - case 3: // had one count, one unorphaned --> done, unorphaned - break; - case 2: // had one count, one orphaned --> done, orphaned - workqueue_destroy(exec_ctx, workqueue); - break; - case 1: - case 0: - // these values are illegal - representing an already done or - // deleted workqueue - GPR_UNREACHABLE_CODE(break); - default: - // schedule a wakeup since there's more to do - wakeup(exec_ctx, workqueue); - } - grpc_closure *cl = (grpc_closure *)n; - grpc_error *clerr = cl->error; - cl->cb(exec_ctx, cl->cb_arg, clerr); - GRPC_ERROR_UNREF(clerr); - } } - - GPR_TIMER_END("workqueue.on_readable", 0); } void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, grpc_closure *closure, grpc_error *error) { - GPR_TIMER_BEGIN("workqueue.enqueue", 0); - gpr_atm last = gpr_atm_full_fetch_add(&workqueue->state, 2); - GPR_ASSERT(last & 1); - closure->error = error; - gpr_mpscq_push(&workqueue->queue, &closure->next_data.atm_next); - if (last == 1) { - wakeup(exec_ctx, workqueue); + grpc_error *push_error = GRPC_ERROR_NONE; + gpr_mu_lock(&workqueue->mu); + if (grpc_closure_list_empty(workqueue->closure_list)) { + push_error = grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd); + } + grpc_closure_list_append(&workqueue->closure_list, closure, error); + if (push_error != GRPC_ERROR_NONE) { + const char *msg = grpc_error_string(push_error); + gpr_log(GPR_ERROR, "Failed to push to workqueue: %s", msg); + grpc_error_free_string(msg); + grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL); } - GPR_TIMER_END("workqueue.enqueue", 0); + gpr_mu_unlock(&workqueue->mu); } #endif /* GPR_POSIX_SOCKET */ diff --git a/src/core/lib/iomgr/workqueue_posix.h b/src/core/lib/iomgr/workqueue_posix.h index 03ee21cef7..0f26ba58e2 100644 --- a/src/core/lib/iomgr/workqueue_posix.h +++ b/src/core/lib/iomgr/workqueue_posix.h @@ -35,17 +35,14 @@ #define GRPC_CORE_LIB_IOMGR_WORKQUEUE_POSIX_H #include "src/core/lib/iomgr/wakeup_fd_posix.h" -#include "src/core/lib/support/mpscq.h" struct grpc_fd; struct grpc_workqueue { gpr_refcount refs; - gpr_mpscq queue; - // state is: - // lower bit - zero if orphaned - // other bits - number of items enqueued - gpr_atm state; + + gpr_mu mu; + grpc_closure_list closure_list; grpc_wakeup_fd wakeup_fd; struct grpc_fd *wakeup_read_fd; diff --git a/src/core/lib/iomgr/workqueue_windows.c b/src/core/lib/iomgr/workqueue_windows.c index ee81dc248e..23e2dea185 100644 --- a/src/core/lib/iomgr/workqueue_windows.c +++ b/src/core/lib/iomgr/workqueue_windows.c @@ -42,6 +42,8 @@ // context, which is at least correct, if not performant or in the spirit of // workqueues. +void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {} + #ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line, const char *reason) {} diff --git a/src/core/lib/security/transport/client_auth_filter.c b/src/core/lib/security/transport/client_auth_filter.c index b366d1410f..2a1bf4d4e3 100644 --- a/src/core/lib/security/transport/client_auth_filter.c +++ b/src/core/lib/security/transport/client_auth_filter.c @@ -40,7 +40,6 @@ #include #include "src/core/lib/channel/channel_stack.h" -#include "src/core/lib/profiling/timers.h" #include "src/core/lib/security/context/security_context.h" #include "src/core/lib/security/credentials/credentials.h" #include "src/core/lib/security/transport/security_connector.h" @@ -219,8 +218,6 @@ static void on_host_checked(grpc_exec_ctx *exec_ctx, void *user_data, static void auth_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_transport_stream_op *op) { - GPR_TIMER_BEGIN("auth_start_transport_op", 0); - /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; @@ -261,14 +258,12 @@ static void auth_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_channel_security_connector_check_call_host( exec_ctx, chand->security_connector, call_host, chand->auth_context, on_host_checked, elem); - GPR_TIMER_END("auth_start_transport_op", 0); return; /* early exit */ } } /* pass control down the stack */ grpc_call_next_op(exec_ctx, elem, op); - GPR_TIMER_END("auth_start_transport_op", 0); } /* Constructor for call_data */ diff --git a/src/core/lib/security/transport/secure_endpoint.c b/src/core/lib/security/transport/secure_endpoint.c index acb0113ea8..0169ccd9ef 100644 --- a/src/core/lib/security/transport/secure_endpoint.c +++ b/src/core/lib/security/transport/secure_endpoint.c @@ -38,7 +38,6 @@ #include #include #include "src/core/lib/debug/trace.h" -#include "src/core/lib/profiling/timers.h" #include "src/core/lib/security/transport/tsi_error.h" #include "src/core/lib/support/string.h" #include "src/core/lib/tsi/transport_security_interface.h" @@ -249,8 +248,6 @@ static void flush_write_staging_buffer(secure_endpoint *ep, uint8_t **cur, static void endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *secure_ep, gpr_slice_buffer *slices, grpc_closure *cb) { - GPR_TIMER_BEGIN("secure_endpoint.endpoint_write", 0); - unsigned i; tsi_result result = TSI_OK; secure_endpoint *ep = (secure_endpoint *)secure_ep; @@ -326,12 +323,10 @@ static void endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *secure_ep, exec_ctx, cb, grpc_set_tsi_error_result(GRPC_ERROR_CREATE("Wrap failed"), result), NULL); - GPR_TIMER_END("secure_endpoint.endpoint_write", 0); return; } grpc_endpoint_write(exec_ctx, ep->wrapped_ep, &ep->output_buffer, cb); - GPR_TIMER_END("secure_endpoint.endpoint_write", 0); } static void endpoint_shutdown(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c index b2c6815af8..def16c8229 100644 --- a/src/core/lib/security/transport/server_auth_filter.c +++ b/src/core/lib/security/transport/server_auth_filter.c @@ -48,7 +48,7 @@ typedef struct call_data { up-call on transport_op, and remember to call our on_done_recv member after handling it. */ grpc_closure auth_on_recv; - grpc_transport_stream_op *transport_op; + grpc_transport_stream_op transport_op; grpc_metadata_array md; const grpc_metadata *consumed_md; size_t num_consumed_md; @@ -106,10 +106,6 @@ static grpc_mdelem *remove_consumed_md(void *user_data, grpc_mdelem *md) { return md; } -static void destroy_op(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - gpr_free(arg); -} - /* called from application code */ static void on_md_processing_done( void *user_data, const grpc_metadata *consumed_md, size_t num_consumed_md, @@ -135,22 +131,21 @@ static void on_md_processing_done( grpc_exec_ctx_sched(&exec_ctx, calld->on_done_recv, GRPC_ERROR_NONE, NULL); } else { gpr_slice message; - grpc_transport_stream_op *close_op = gpr_malloc(sizeof(*close_op)); - memset(close_op, 0, sizeof(*close_op)); + grpc_transport_stream_op close_op; + memset(&close_op, 0, sizeof(close_op)); grpc_metadata_array_destroy(&calld->md); error_details = error_details != NULL ? error_details : "Authentication metadata processing failed."; message = gpr_slice_from_copied_string(error_details); - calld->transport_op->send_initial_metadata = NULL; - if (calld->transport_op->send_message != NULL) { - grpc_byte_stream_destroy(&exec_ctx, calld->transport_op->send_message); - calld->transport_op->send_message = NULL; + calld->transport_op.send_initial_metadata = NULL; + if (calld->transport_op.send_message != NULL) { + grpc_byte_stream_destroy(&exec_ctx, calld->transport_op.send_message); + calld->transport_op.send_message = NULL; } - calld->transport_op->send_trailing_metadata = NULL; - close_op->on_complete = grpc_closure_create(destroy_op, close_op); - grpc_transport_stream_op_add_close(close_op, status, &message); - grpc_call_next_op(&exec_ctx, elem, close_op); + calld->transport_op.send_trailing_metadata = NULL; + grpc_transport_stream_op_add_close(&close_op, status, &message); + grpc_call_next_op(&exec_ctx, elem, &close_op); grpc_exec_ctx_sched(&exec_ctx, calld->on_done_recv, grpc_error_set_int(GRPC_ERROR_CREATE(error_details), GRPC_ERROR_INT_GRPC_STATUS, status), @@ -187,7 +182,7 @@ static void set_recv_ops_md_callbacks(grpc_call_element *elem, calld->recv_initial_metadata = op->recv_initial_metadata; calld->on_done_recv = op->recv_initial_metadata_ready; op->recv_initial_metadata_ready = &calld->auth_on_recv; - calld->transport_op = op; + calld->transport_op = *op; } } diff --git a/src/core/lib/support/mpscq.c b/src/core/lib/support/mpscq.c deleted file mode 100644 index 5b9323275a..0000000000 --- a/src/core/lib/support/mpscq.c +++ /dev/null @@ -1,83 +0,0 @@ -/* - * - * Copyright 2016, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include "src/core/lib/support/mpscq.h" - -#include - -void gpr_mpscq_init(gpr_mpscq *q) { - gpr_atm_no_barrier_store(&q->head, (gpr_atm)&q->stub); - q->tail = &q->stub; - gpr_atm_no_barrier_store(&q->stub.next, (gpr_atm)NULL); -} - -void gpr_mpscq_destroy(gpr_mpscq *q) { - GPR_ASSERT(gpr_atm_no_barrier_load(&q->head) == (gpr_atm)&q->stub); - GPR_ASSERT(q->tail == &q->stub); -} - -void gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n) { - gpr_atm_no_barrier_store(&n->next, (gpr_atm)NULL); - gpr_mpscq_node *prev = - (gpr_mpscq_node *)gpr_atm_full_xchg(&q->head, (gpr_atm)n); - gpr_atm_rel_store(&prev->next, (gpr_atm)n); -} - -gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q) { - gpr_mpscq_node *tail = q->tail; - gpr_mpscq_node *next = (gpr_mpscq_node *)gpr_atm_acq_load(&tail->next); - if (tail == &q->stub) { - // indicates the list is actually (ephemerally) empty - if (next == NULL) return NULL; - q->tail = next; - tail = next; - next = (gpr_mpscq_node *)gpr_atm_acq_load(&tail->next); - } - if (next != NULL) { - q->tail = next; - return tail; - } - gpr_mpscq_node *head = (gpr_mpscq_node *)gpr_atm_acq_load(&q->head); - if (tail != head) { - // indicates a retry is in order: we're still adding - return NULL; - } - gpr_mpscq_push(q, &q->stub); - next = (gpr_mpscq_node *)gpr_atm_acq_load(&tail->next); - if (next != NULL) { - q->tail = next; - return tail; - } - // indicates a retry is in order: we're still adding - return NULL; -} diff --git a/src/core/lib/support/mpscq.h b/src/core/lib/support/mpscq.h deleted file mode 100644 index 977a117952..0000000000 --- a/src/core/lib/support/mpscq.h +++ /dev/null @@ -1,65 +0,0 @@ -/* - * - * Copyright 2016, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPC_CORE_LIB_SUPPORT_MPSCQ_H -#define GRPC_CORE_LIB_SUPPORT_MPSCQ_H - -#include -#include - -// Multiple-producer single-consumer lock free queue, based upon the -// implementation from Dmitry Vyukov here: -// http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue - -// List node (include this in a data structure at the top, and add application -// fields after it - to simulate inheritance) -typedef struct gpr_mpscq_node { gpr_atm next; } gpr_mpscq_node; - -// Actual queue type -typedef struct gpr_mpscq { - gpr_atm head; - // make sure head & tail don't share a cacheline - char padding[GPR_CACHELINE_SIZE]; - gpr_mpscq_node *tail; - gpr_mpscq_node stub; -} gpr_mpscq; - -void gpr_mpscq_init(gpr_mpscq *q); -void gpr_mpscq_destroy(gpr_mpscq *q); -// Push a node -void gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n); -// Pop a node (returns NULL if no node is ready - which doesn't indicate that -// the queue is empty!!) -gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q); - -#endif /* GRPC_CORE_LIB_SUPPORT_MPSCQ_H */ diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 119f5e82ab..772681109a 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -109,10 +109,6 @@ typedef struct batch_control { uint8_t recv_message; uint8_t recv_final_op; uint8_t is_notify_tag_closure; - - /* TODO(ctiller): now that this is inlined, figure out how much of the above - state can be eliminated */ - grpc_transport_stream_op op; } batch_control; struct grpc_call { @@ -782,7 +778,6 @@ typedef struct termination_closure { grpc_error *error; grpc_closure *op_closure; enum { TC_CANCEL, TC_CLOSE } type; - grpc_transport_stream_op op; } termination_closure; static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp, @@ -802,24 +797,26 @@ static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp, } static void send_cancel(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) { + grpc_transport_stream_op op; termination_closure *tc = tcp; - memset(&tc->op, 0, sizeof(tc->op)); - tc->op.cancel_error = tc->error; + memset(&op, 0, sizeof(op)); + op.cancel_error = tc->error; /* reuse closure to catch completion */ grpc_closure_init(&tc->closure, done_termination, tc); - tc->op.on_complete = &tc->closure; - execute_op(exec_ctx, tc->call, &tc->op); + op.on_complete = &tc->closure; + execute_op(exec_ctx, tc->call, &op); } static void send_close(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) { + grpc_transport_stream_op op; termination_closure *tc = tcp; - memset(&tc->op, 0, sizeof(tc->op)); - tc->op.close_error = tc->error; + memset(&op, 0, sizeof(op)); + op.close_error = tc->error; /* reuse closure to catch completion */ grpc_closure_init(&tc->closure, done_termination, tc); - tc->op_closure = tc->op.on_complete; - tc->op.on_complete = &tc->closure; - execute_op(exec_ctx, tc->call, &tc->op); + tc->op_closure = op.on_complete; + op.on_complete = &tc->closure; + execute_op(exec_ctx, tc->call, &op); } static grpc_call_error terminate_with_status(grpc_exec_ctx *exec_ctx, @@ -1373,6 +1370,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, grpc_call *call, const grpc_op *ops, size_t nops, void *notify_tag, int is_notify_tag_closure) { + grpc_transport_stream_op stream_op; size_t i; const grpc_op *op; batch_control *bctl; @@ -1386,6 +1384,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, notify_tag); + memset(&stream_op, 0, sizeof(stream_op)); + /* TODO(ctiller): this feels like it could be made lock-free */ gpr_mu_lock(&call->mu); bctl = allocate_batch_control(call); @@ -1394,9 +1394,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, bctl->notify_tag = notify_tag; bctl->is_notify_tag_closure = (uint8_t)(is_notify_tag_closure != 0); - grpc_transport_stream_op *stream_op = &bctl->op; - memset(stream_op, 0, sizeof(*stream_op)); - if (nops == 0) { GRPC_CALL_INTERNAL_REF(call, "completion"); bctl->error = GRPC_ERROR_NONE; @@ -1474,9 +1471,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, } /* TODO(ctiller): just make these the same variable? */ call->metadata_batch[0][0].deadline = call->send_deadline; - stream_op->send_initial_metadata = + stream_op.send_initial_metadata = &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]; - stream_op->send_initial_metadata_flags = op->flags; + stream_op.send_initial_metadata_flags = op->flags; break; case GRPC_OP_SEND_MESSAGE: if (!are_write_flags_valid(op->flags)) { @@ -1496,7 +1493,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, grpc_slice_buffer_stream_init( &call->sending_stream, &op->data.send_message->data.raw.slice_buffer, op->flags); - stream_op->send_message = &call->sending_stream.base; + stream_op.send_message = &call->sending_stream.base; break; case GRPC_OP_SEND_CLOSE_FROM_CLIENT: /* Flag validation: currently allow no flags */ @@ -1514,7 +1511,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, } bctl->send_final_op = 1; call->sent_final_op = 1; - stream_op->send_trailing_metadata = + stream_op.send_trailing_metadata = &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]; break; case GRPC_OP_SEND_STATUS_FROM_SERVER: @@ -1561,7 +1558,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, error = GRPC_CALL_ERROR_INVALID_METADATA; goto done_with_error; } - stream_op->send_trailing_metadata = + stream_op.send_trailing_metadata = &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]; break; case GRPC_OP_RECV_INITIAL_METADATA: @@ -1579,9 +1576,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, grpc_closure_init(&call->receiving_initial_metadata_ready, receiving_initial_metadata_ready, bctl); bctl->recv_initial_metadata = 1; - stream_op->recv_initial_metadata = + stream_op.recv_initial_metadata = &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; - stream_op->recv_initial_metadata_ready = + stream_op.recv_initial_metadata_ready = &call->receiving_initial_metadata_ready; num_completion_callbacks_needed++; break; @@ -1598,10 +1595,10 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, call->receiving_message = 1; bctl->recv_message = 1; call->receiving_buffer = op->data.recv_message; - stream_op->recv_message = &call->receiving_stream; + stream_op.recv_message = &call->receiving_stream; grpc_closure_init(&call->receiving_stream_ready, receiving_stream_ready, bctl); - stream_op->recv_message_ready = &call->receiving_stream_ready; + stream_op.recv_message_ready = &call->receiving_stream_ready; num_completion_callbacks_needed++; break; case GRPC_OP_RECV_STATUS_ON_CLIENT: @@ -1627,9 +1624,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, call->final_op.client.status_details_capacity = op->data.recv_status_on_client.status_details_capacity; bctl->recv_final_op = 1; - stream_op->recv_trailing_metadata = + stream_op.recv_trailing_metadata = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; - stream_op->collect_stats = + stream_op.collect_stats = &call->final_info.stats.transport_stream_stats; break; case GRPC_OP_RECV_CLOSE_ON_SERVER: @@ -1650,9 +1647,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, call->final_op.server.cancelled = op->data.recv_close_on_server.cancelled; bctl->recv_final_op = 1; - stream_op->recv_trailing_metadata = + stream_op.recv_trailing_metadata = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; - stream_op->collect_stats = + stream_op.collect_stats = &call->final_info.stats.transport_stream_stats; break; } @@ -1664,12 +1661,12 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, } gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed); - stream_op->context = call->context; + stream_op.context = call->context; grpc_closure_init(&bctl->finish_batch, finish_batch, bctl); - stream_op->on_complete = &bctl->finish_batch; + stream_op.on_complete = &bctl->finish_batch; gpr_mu_unlock(&call->mu); - execute_op(exec_ctx, call, stream_op); + execute_op(exec_ctx, call, &stream_op); done: GPR_TIMER_END("grpc_call_start_batch", 0); diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c index 52e78567bd..6d2b1c4935 100644 --- a/src/core/lib/surface/channel.c +++ b/src/core/lib/surface/channel.c @@ -334,13 +334,14 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg, } void grpc_channel_destroy(grpc_channel *channel) { - grpc_transport_op *op = grpc_make_transport_op(NULL); + grpc_transport_op op; grpc_channel_element *elem; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GRPC_API_TRACE("grpc_channel_destroy(channel=%p)", 1, (channel)); - op->disconnect_with_error = GRPC_ERROR_CREATE("Channel Destroyed"); + memset(&op, 0, sizeof(op)); + op.disconnect_with_error = GRPC_ERROR_CREATE("Channel Destroyed"); elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CHANNEL(channel), 0); - elem->filter->start_transport_op(&exec_ctx, elem, op); + elem->filter->start_transport_op(&exec_ctx, elem, &op); GRPC_CHANNEL_INTERNAL_UNREF(&exec_ctx, channel, "channel"); diff --git a/src/core/lib/surface/channel_ping.c b/src/core/lib/surface/channel_ping.c index 0d2f01a649..9818f9d2f2 100644 --- a/src/core/lib/surface/channel_ping.c +++ b/src/core/lib/surface/channel_ping.c @@ -61,20 +61,19 @@ static void ping_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { void grpc_channel_ping(grpc_channel *channel, grpc_completion_queue *cq, void *tag, void *reserved) { - GRPC_API_TRACE("grpc_channel_ping(channel=%p, cq=%p, tag=%p, reserved=%p)", 4, - (channel, cq, tag, reserved)); - grpc_transport_op *op = grpc_make_transport_op(NULL); + grpc_transport_op op; ping_result *pr = gpr_malloc(sizeof(*pr)); grpc_channel_element *top_elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GPR_ASSERT(reserved == NULL); + memset(&op, 0, sizeof(op)); pr->tag = tag; pr->cq = cq; grpc_closure_init(&pr->closure, ping_done, pr); - op->send_ping = &pr->closure; - op->bind_pollset = grpc_cq_pollset(cq); + op.send_ping = &pr->closure; + op.bind_pollset = grpc_cq_pollset(cq); grpc_cq_begin_op(cq, tag); - top_elem->filter->start_transport_op(&exec_ctx, top_elem, op); + top_elem->filter->start_transport_op(&exec_ctx, top_elem, &op); grpc_exec_ctx_finish(&exec_ctx); } diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c index edda0c85fa..5397913a21 100644 --- a/src/core/lib/surface/init.c +++ b/src/core/lib/surface/init.c @@ -47,7 +47,6 @@ #include "src/core/lib/channel/http_server_filter.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/http/parser.h" -#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/profiling/timers.h" @@ -166,7 +165,6 @@ void grpc_init(void) { grpc_register_tracer("http1", &grpc_http1_trace); grpc_register_tracer("compression", &grpc_compression_trace); grpc_register_tracer("queue_pluck", &grpc_cq_pluck_trace); - grpc_register_tracer("combiner", &grpc_combiner_trace); // Default pluck trace to 1 grpc_cq_pluck_trace = 1; grpc_register_tracer("queue_timeout", &grpc_cq_event_timeout_trace); diff --git a/src/core/lib/surface/lame_client.c b/src/core/lib/surface/lame_client.c index d32c884e8e..19b78369dd 100644 --- a/src/core/lib/surface/lame_client.c +++ b/src/core/lib/surface/lame_client.c @@ -97,14 +97,14 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_exec_ctx_sched(exec_ctx, op->on_connectivity_state_change, GRPC_ERROR_NONE, NULL); } + if (op->on_consumed != NULL) { + grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL); + } if (op->send_ping != NULL) { grpc_exec_ctx_sched(exec_ctx, op->send_ping, GRPC_ERROR_CREATE("lame client channel"), NULL); } GRPC_ERROR_UNREF(op->disconnect_with_error); - if (op->on_consumed != NULL) { - grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL); - } } static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 56fb80e92e..55e6d99057 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -273,20 +273,23 @@ static void shutdown_cleanup(grpc_exec_ctx *exec_ctx, void *arg, } static void send_shutdown(grpc_exec_ctx *exec_ctx, grpc_channel *channel, - int send_goaway, grpc_error *send_disconnect) { - struct shutdown_cleanup_args *sc = gpr_malloc(sizeof(*sc)); - grpc_closure_init(&sc->closure, shutdown_cleanup, sc); - grpc_transport_op *op = grpc_make_transport_op(&sc->closure); + bool send_goaway, grpc_error *send_disconnect) { + grpc_transport_op op; + struct shutdown_cleanup_args *sc; grpc_channel_element *elem; - op->send_goaway = send_goaway; + memset(&op, 0, sizeof(op)); + op.send_goaway = send_goaway; + sc = gpr_malloc(sizeof(*sc)); sc->slice = gpr_slice_from_copied_string("Server shutdown"); - op->goaway_message = &sc->slice; - op->goaway_status = GRPC_STATUS_OK; - op->disconnect_with_error = send_disconnect; + op.goaway_message = &sc->slice; + op.goaway_status = GRPC_STATUS_OK; + op.disconnect_with_error = send_disconnect; + grpc_closure_init(&sc->closure, shutdown_cleanup, sc); + op.on_consumed = &sc->closure; elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0); - elem->filter->start_transport_op(exec_ctx, elem, op); + elem->filter->start_transport_op(exec_ctx, elem, &op); } static void channel_broadcaster_shutdown(grpc_exec_ctx *exec_ctx, @@ -429,8 +432,7 @@ static void finish_destroy_channel(grpc_exec_ctx *exec_ctx, void *cd, server_unref(exec_ctx, server); } -static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand, - grpc_error *error) { +static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand) { if (is_channel_orphaned(chand)) return; GPR_ASSERT(chand->server != NULL); orphan_channel(chand); @@ -439,20 +441,14 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand, chand->finish_destroy_channel_closure.cb = finish_destroy_channel; chand->finish_destroy_channel_closure.cb_arg = chand; - grpc_transport_op *op = - grpc_make_transport_op(&chand->finish_destroy_channel_closure); - op->set_accept_stream = true; + grpc_transport_op op; + memset(&op, 0, sizeof(op)); + op.set_accept_stream = true; + op.on_consumed = &chand->finish_destroy_channel_closure; grpc_channel_next_op(exec_ctx, grpc_channel_stack_element( grpc_channel_get_channel_stack(chand->channel), 0), - op); - - if (error != GRPC_ERROR_NONE) { - const char *msg = grpc_error_string(error); - gpr_log(GPR_INFO, "Disconnected client: %s", msg); - grpc_error_free_string(msg); - } - GRPC_ERROR_UNREF(error); + &op); } static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) { @@ -849,16 +845,17 @@ static void channel_connectivity_changed(grpc_exec_ctx *exec_ctx, void *cd, channel_data *chand = cd; grpc_server *server = chand->server; if (chand->connectivity_state != GRPC_CHANNEL_SHUTDOWN) { - grpc_transport_op *op = grpc_make_transport_op(NULL); - op->on_connectivity_state_change = &chand->channel_connectivity_changed, - op->connectivity_state = &chand->connectivity_state; + grpc_transport_op op; + memset(&op, 0, sizeof(op)); + op.on_connectivity_state_change = &chand->channel_connectivity_changed, + op.connectivity_state = &chand->connectivity_state; grpc_channel_next_op(exec_ctx, grpc_channel_stack_element( grpc_channel_get_channel_stack(chand->channel), 0), - op); + &op); } else { gpr_mu_lock(&server->mu_global); - destroy_channel(exec_ctx, chand, GRPC_ERROR_REF(error)); + destroy_channel(exec_ctx, chand); gpr_mu_unlock(&server->mu_global); GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->channel, "connectivity"); } @@ -1122,7 +1119,7 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, size_t slots; uint32_t probes; uint32_t max_probes = 0; - grpc_transport_op *op = NULL; + grpc_transport_op op; channel = grpc_channel_create(exec_ctx, NULL, args, GRPC_SERVER_CHANNEL, transport); @@ -1182,16 +1179,16 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, gpr_mu_unlock(&s->mu_global); GRPC_CHANNEL_INTERNAL_REF(channel, "connectivity"); - op = grpc_make_transport_op(NULL); - op->set_accept_stream = true; - op->set_accept_stream_fn = accept_stream; - op->set_accept_stream_user_data = chand; - op->on_connectivity_state_change = &chand->channel_connectivity_changed; - op->connectivity_state = &chand->connectivity_state; + memset(&op, 0, sizeof(op)); + op.set_accept_stream = true; + op.set_accept_stream_fn = accept_stream; + op.set_accept_stream_user_data = chand; + op.on_connectivity_state_change = &chand->channel_connectivity_changed; + op.connectivity_state = &chand->connectivity_state; if (gpr_atm_acq_load(&s->shutdown_flag) != 0) { - op->disconnect_with_error = GRPC_ERROR_CREATE("Server shutdown"); + op.disconnect_with_error = GRPC_ERROR_CREATE("Server shutdown"); } - grpc_transport_perform_op(exec_ctx, transport, op); + grpc_transport_perform_op(exec_ctx, transport, &op); } void done_published_shutdown(grpc_exec_ctx *exec_ctx, void *done_arg, diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c index 08f9d7e8d9..857c3909d2 100644 --- a/src/core/lib/transport/transport.c +++ b/src/core/lib/transport/transport.c @@ -32,14 +32,10 @@ */ #include "src/core/lib/transport/transport.h" - -#include - #include #include #include #include - #include "src/core/lib/support/string.h" #include "src/core/lib/transport/transport_impl.h" @@ -251,26 +247,3 @@ void grpc_transport_stream_op_add_close(grpc_transport_stream_op *op, error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, status); add_error(op, &op->close_error, error); } - -typedef struct { - grpc_closure outer_on_complete; - grpc_closure *inner_on_complete; - grpc_transport_op op; -} made_transport_op; - -static void destroy_made_transport_op(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - made_transport_op *op = arg; - grpc_exec_ctx_sched(exec_ctx, op->inner_on_complete, GRPC_ERROR_REF(error), - NULL); - gpr_free(op); -} - -grpc_transport_op *grpc_make_transport_op(grpc_closure *on_complete) { - made_transport_op *op = gpr_malloc(sizeof(*op)); - grpc_closure_init(&op->outer_on_complete, destroy_made_transport_op, op); - op->inner_on_complete = on_complete; - memset(&op->op, 0, sizeof(op->op)); - op->op.on_consumed = &op->outer_on_complete; - return &op->op; -} diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index 8dc393fd61..26ed6cb839 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -100,11 +100,6 @@ void grpc_transport_move_one_way_stats(grpc_transport_one_way_stats *from, void grpc_transport_move_stats(grpc_transport_stream_stats *from, grpc_transport_stream_stats *to); -typedef struct { - grpc_closure closure; - void *args[2]; -} grpc_transport_private_op_data; - /* Transport stream op: a set of operations to perform on a transport against a single stream */ typedef struct grpc_transport_stream_op { @@ -154,12 +149,6 @@ typedef struct grpc_transport_stream_op { /* Indexes correspond to grpc_context_index enum values */ grpc_call_context_element *context; - - /*************************************************************************** - * remaining fields are initialized and used at the discretion of the - * transport implementation */ - - grpc_transport_private_op_data transport_private; } grpc_transport_stream_op; /** Transport op: a set of operations to perform on a transport as a whole */ @@ -193,12 +182,6 @@ typedef struct grpc_transport_op { grpc_pollset_set *bind_pollset_set; /** send a ping, call this back if not NULL */ grpc_closure *send_ping; - - /*************************************************************************** - * remaining fields are initialized and used at the discretion of the - * transport implementation */ - - grpc_transport_private_op_data transport_private; } grpc_transport_op; /* Returns the amount of memory required to store a grpc_stream for this @@ -290,10 +273,6 @@ void grpc_transport_destroy(grpc_exec_ctx *exec_ctx, grpc_transport *transport); char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx, grpc_transport *transport); -/* Allocate a grpc_transport_op, and preconfigure the on_consumed closure to - \a on_consumed and then delete the returned transport op */ -grpc_transport_op *grpc_make_transport_op(grpc_closure *on_consumed); - #ifdef __cplusplus } #endif -- cgit v1.2.3