diff options
Diffstat (limited to 'src/core')
27 files changed, 694 insertions, 351 deletions
diff --git a/src/core/ext/census/grpc_filter.c b/src/core/ext/census/grpc_filter.c index 42cda4637c..b750f12623 100644 --- a/src/core/ext/census/grpc_filter.c +++ b/src/core/ext/census/grpc_filter.c @@ -134,7 +134,7 @@ static void client_init_call_elem(grpc_exec_ctx *exec_ctx, } static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { + grpc_call_element *elem, void *ignored) { call_data *d = elem->call_data; GPR_ASSERT(d != NULL); /* TODO(hongyu): record rpc client stats and census_rpc_end_op here */ @@ -152,7 +152,7 @@ static void server_init_call_elem(grpc_exec_ctx *exec_ctx, } static void server_destroy_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { + grpc_call_element *elem, void *ignored) { call_data *d = elem->call_data; GPR_ASSERT(d != NULL); /* TODO(hongyu): record rpc server stats and census_tracing_end_op here */ diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c index 94360793af..473fb33b12 100644 --- a/src/core/ext/client_config/client_channel.c +++ b/src/core/ext/client_config/client_channel.c @@ -415,9 +415,10 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, } /* Destructor for call_data */ -static void destroy_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { +static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + void *and_free_memory) { grpc_subchannel_call_holder_destroy(exec_ctx, elem->call_data); + gpr_free(and_free_memory); } /* Constructor for channel_data */ diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c index 146ee5e662..08795d1358 100644 --- a/src/core/ext/client_config/subchannel.c +++ b/src/core/ext/client_config/subchannel.c @@ -268,7 +268,7 @@ static void disconnect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { con = GET_CONNECTED_SUBCHANNEL(c, no_barrier); if (con != NULL) { GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, con, "connection"); - gpr_atm_no_barrier_store(&c->connected_subchannel, 0xdeadbeef); + gpr_atm_no_barrier_store(&c->connected_subchannel, (gpr_atm)0xdeadbeef); } gpr_mu_unlock(&c->mu); } @@ -644,9 +644,9 @@ static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call, bool success) { grpc_subchannel_call *c = call; GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0); - grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c)); - GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, c->connection, "subchannel_call"); - gpr_free(c); + grpc_connected_subchannel *connection = c->connection; + grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), c); + GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, connection, "subchannel_call"); GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0); } diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 5bba056741..6e02a5e33c 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -81,27 +81,25 @@ int grpc_flowctl_trace = 0; static const grpc_transport_vtable vtable; -static void lock(grpc_chttp2_transport *t); -static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t); - /* forward declarations of various callbacks that we'll build closures around */ static void writing_action(grpc_exec_ctx *exec_ctx, void *t, bool iomgr_success_ignored); +static void reading_action(grpc_exec_ctx *exec_ctx, void *t, + bool iomgr_success_ignored); +static void parsing_action(grpc_exec_ctx *exec_ctx, void *t, + bool iomgr_success_ignored); /** Set a transport level setting, and push it to our peer */ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, uint32_t value); -/** Endpoint callback to process incoming data */ -static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, bool success); - /** Start disconnection chain */ static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t); /** Perform a transport_op */ -static void perform_stream_op_locked( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global *stream_global, grpc_transport_stream_op *op); +static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s, void *transport_op); /** Cancel a stream: coming from the transport API */ static void cancel_from_api(grpc_exec_ctx *exec_ctx, @@ -118,15 +116,19 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, /** Add endpoint from this transport to pollset */ static void add_to_pollset_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_pollset *pollset); + grpc_chttp2_stream *s_ignored, void *pollset); static void add_to_pollset_set_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_pollset_set *pollset_set); + grpc_chttp2_stream *s_ignored, + void *pollset_set); /** Start new streams that have been created if we can */ static void maybe_start_some_streams( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global); +static void finish_global_actions(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t); + static void connectivity_state_set( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_connectivity_state state, const char *reason); @@ -138,7 +140,10 @@ static void incoming_byte_stream_update_flow_control( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, size_t max_size_hint, size_t have_already); - +static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s, + void *byte_stream); static void fail_pending_writes(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream_global *stream_global); @@ -150,7 +155,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { size_t i; - gpr_mu_lock(&t->mu); + gpr_mu_lock(&t->executor.mu); GPR_ASSERT(t->ep == NULL); @@ -176,8 +181,8 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream_map_destroy(&t->new_stream_map); grpc_connectivity_state_destroy(exec_ctx, &t->channel_callback.state_tracker); - gpr_mu_unlock(&t->mu); - gpr_mu_destroy(&t->mu); + gpr_mu_unlock(&t->executor.mu); + gpr_mu_destroy(&t->executor.mu); /* callback remaining pings: they're not allowed to call into the transpot, and maybe they hold resources that need to be freed */ @@ -238,7 +243,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, gpr_ref_init(&t->refs, 2); /* ref is dropped at transport close() */ gpr_ref_init(&t->shutdown_ep_refs, 1); - gpr_mu_init(&t->mu); + gpr_mu_init(&t->executor.mu); t->peer_string = grpc_endpoint_get_peer(ep); t->endpoint_reading = 1; t->global.next_stream_id = is_client ? 1 : 2; @@ -262,6 +267,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, gpr_slice_buffer_init(&t->writing.outbuf); grpc_chttp2_hpack_compressor_init(&t->writing.hpack_compressor); grpc_closure_init(&t->writing_action, writing_action, t); + grpc_closure_init(&t->reading_action, reading_action, t); + grpc_closure_init(&t->parsing_action, parsing_action, t); gpr_slice_buffer_init(&t->parsing.qbuf); grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser); @@ -269,7 +276,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_closure_init(&t->writing.done_cb, grpc_chttp2_terminate_writing, &t->writing); - grpc_closure_init(&t->recv_data, recv_data, t); gpr_slice_buffer_init(&t->read_buffer); if (is_client) { @@ -377,14 +383,18 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } } -static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) { - grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; - - lock(t); +static void destroy_transport_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s_ignored, + void *arg_ignored) { t->destroying = 1; drop_connection(exec_ctx, t); - unlock(exec_ctx, t); +} +static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) { + grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; + grpc_chttp2_run_with_global_lock(exec_ctx, t, NULL, destroy_transport_locked, + NULL, 0); UNREF_TRANSPORT(exec_ctx, t, "destroy"); } @@ -404,17 +414,6 @@ static void allow_endpoint_shutdown_locked(grpc_exec_ctx *exec_ctx, } } -static void allow_endpoint_shutdown_unlocked(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t) { - if (gpr_unref(&t->shutdown_ep_refs)) { - gpr_mu_lock(&t->mu); - if (t->ep) { - grpc_endpoint_shutdown(exec_ctx, t->ep); - } - gpr_mu_unlock(&t->mu); - } -} - static void destroy_endpoint(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { grpc_endpoint_destroy(exec_ctx, t->ep); @@ -424,7 +423,9 @@ static void destroy_endpoint(grpc_exec_ctx *exec_ctx, } static void close_transport_locked(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t) { + grpc_chttp2_transport *t, + grpc_chttp2_stream *s_ignored, + void *arg_ignored) { if (!t->closed) { t->closed = 1; connectivity_state_set(exec_ctx, &t->global, GRPC_CHANNEL_FATAL_FAILURE, @@ -464,6 +465,13 @@ void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx, } #endif +static void finish_init_stream_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s, + void *arg_ignored) { + grpc_chttp2_register_stream(t, s); +} + static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_stream *gs, grpc_stream_refcount *refcount, const void *server_data) { @@ -473,6 +481,10 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, memset(s, 0, sizeof(*s)); s->refcount = refcount; + /* We reserve one 'active stream' that's dropped when the stream is + read-closed. The others are for incoming_byte_streams that are actively + reading */ + gpr_ref_init(&s->global.active_streams, 1); GRPC_CHTTP2_STREAM_REF(&s->global, "chttp2"); grpc_chttp2_incoming_metadata_buffer_init(&s->parsing.metadata_buffer[0]); @@ -486,10 +498,8 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, REF_TRANSPORT(t, "stream"); - lock(t); - grpc_chttp2_register_stream(t, s); if (server_data) { - GPR_ASSERT(t->parsing_active); + GPR_ASSERT(t->executor.parsing_active); s->global.id = (uint32_t)(uintptr_t)server_data; s->parsing.id = s->global.id; s->global.outgoing_window = @@ -502,40 +512,42 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_chttp2_stream_map_add(&t->parsing_stream_map, s->global.id, s); s->global.in_stream_map = 1; } - unlock(exec_ctx, t); + + grpc_chttp2_run_with_global_lock(exec_ctx, t, s, finish_init_stream_locked, + NULL, 0); return 0; } -static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs) { - grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; - grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; - int i; +static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s, void *arg) { grpc_byte_stream *bs; GPR_TIMER_BEGIN("destroy_stream", 0); - gpr_mu_lock(&t->mu); - GPR_ASSERT((s->global.write_closed && s->global.read_closed) || s->global.id == 0); GPR_ASSERT(!s->global.in_stream_map); if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) { - close_transport_locked(exec_ctx, t); + close_transport_locked(exec_ctx, t, NULL, NULL); } - if (!t->parsing_active && s->global.id) { + if (!t->executor.parsing_active && s->global.id) { GPR_ASSERT(grpc_chttp2_stream_map_find(&t->parsing_stream_map, s->global.id) == NULL); } + while ( + (bs = grpc_chttp2_incoming_frame_queue_pop(&s->global.incoming_frames))) { + incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs); + } + grpc_chttp2_list_remove_unannounced_incoming_window_available(&t->global, &s->global); grpc_chttp2_list_remove_stalled_by_transport(&t->global, &s->global); + grpc_chttp2_list_remove_check_read_ops(&t->global, &s->global); - gpr_mu_unlock(&t->mu); - - for (i = 0; i < STREAM_LIST_COUNT; i++) { + for (int i = 0; i < STREAM_LIST_COUNT; i++) { if (s->included[i]) { gpr_log(GPR_ERROR, "%s stream %d still included in list %d", t->global.is_client ? "client" : "server", s->global.id, i); @@ -543,11 +555,6 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, } } - while ( - (bs = grpc_chttp2_incoming_frame_queue_pop(&s->global.incoming_frames))) { - grpc_byte_stream_destroy(exec_ctx, bs); - } - GPR_ASSERT(s->global.send_initial_metadata_finished == NULL); GPR_ASSERT(s->global.send_message_finished == NULL); GPR_ASSERT(s->global.send_trailing_metadata_finished == NULL); @@ -566,6 +573,17 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, UNREF_TRANSPORT(exec_ctx, t, "stream"); GPR_TIMER_END("destroy_stream", 0); + + gpr_free(arg); +} + +static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, + grpc_stream *gs, void *and_free_memory) { + grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; + grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; + + grpc_chttp2_run_with_global_lock(exec_ctx, t, s, destroy_stream_locked, + and_free_memory, 0); } grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream( @@ -594,28 +612,96 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream( * LOCK MANAGEMENT */ -/* We take a grpc_chttp2_transport-global lock in response to calls coming in - from above, - and in response to data being received from below. New data to be written - is always queued, as are callbacks to process data. During unlock() we - check our todo lists and initiate callbacks and flush writes. */ - -static void lock(grpc_chttp2_transport *t) { gpr_mu_lock(&t->mu); } - -static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { - GPR_TIMER_BEGIN("unlock", 0); - if (!t->writing_active && !t->closed && - grpc_chttp2_unlocking_check_writes(exec_ctx, &t->global, &t->writing, - t->parsing_active)) { - t->writing_active = 1; - REF_TRANSPORT(t, "writing"); - grpc_exec_ctx_enqueue(exec_ctx, &t->writing_action, true, NULL); - prevent_endpoint_shutdown(t); +static void finish_global_actions(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t) { + grpc_chttp2_executor_action_header *hdr; + grpc_chttp2_executor_action_header *next; + + for (;;) { + if (!t->executor.writing_active && !t->closed && + grpc_chttp2_unlocking_check_writes(exec_ctx, &t->global, &t->writing, + t->executor.parsing_active)) { + t->executor.writing_active = 1; + REF_TRANSPORT(t, "writing"); + prevent_endpoint_shutdown(t); + grpc_exec_ctx_enqueue(exec_ctx, &t->writing_action, true, NULL); + } + check_read_ops(exec_ctx, &t->global); + + gpr_mu_lock(&t->executor.mu); + if (t->executor.pending_actions_head != NULL) { + hdr = t->executor.pending_actions_head; + t->executor.pending_actions_head = t->executor.pending_actions_tail = + NULL; + gpr_mu_unlock(&t->executor.mu); + while (hdr != NULL) { + hdr->action(exec_ctx, t, hdr->stream, hdr->arg); + next = hdr->next; + gpr_free(hdr); + UNREF_TRANSPORT(exec_ctx, t, "pending_action"); + hdr = next; + } + continue; + } else { + t->executor.global_active = false; + } + gpr_mu_unlock(&t->executor.mu); + break; } - check_read_ops(exec_ctx, &t->global); +} + +void grpc_chttp2_run_with_global_lock(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *optional_stream, + grpc_chttp2_locked_action action, + void *arg, size_t sizeof_arg) { + grpc_chttp2_executor_action_header *hdr; + + REF_TRANSPORT(t, "run_global"); + gpr_mu_lock(&t->executor.mu); + + for (;;) { + if (!t->executor.global_active) { + t->executor.global_active = 1; + gpr_mu_unlock(&t->executor.mu); + + action(exec_ctx, t, optional_stream, arg); + + finish_global_actions(exec_ctx, t); + } else { + gpr_mu_unlock(&t->executor.mu); + + hdr = gpr_malloc(sizeof(*hdr) + sizeof_arg); + hdr->stream = optional_stream; + hdr->action = action; + if (sizeof_arg == 0) { + hdr->arg = arg; + } else { + hdr->arg = hdr + 1; + memcpy(hdr->arg, arg, sizeof_arg); + } - gpr_mu_unlock(&t->mu); - GPR_TIMER_END("unlock", 0); + gpr_mu_lock(&t->executor.mu); + if (!t->executor.global_active) { + /* global lock was released while allocating memory: release & retry */ + gpr_free(hdr); + continue; + } + hdr->next = NULL; + if (t->executor.pending_actions_head != NULL) { + t->executor.pending_actions_tail = + t->executor.pending_actions_tail->next = hdr; + } else { + t->executor.pending_actions_tail = t->executor.pending_actions_head = + hdr; + } + REF_TRANSPORT(t, "pending_action"); + gpr_mu_unlock(&t->executor.mu); + } + break; + } + + UNREF_TRANSPORT(exec_ctx, t, "run_global"); } /******************************************************************************* @@ -645,15 +731,11 @@ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, } } -void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx, - void *transport_writing_ptr, bool success) { - grpc_chttp2_transport_writing *transport_writing = transport_writing_ptr; - grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing); - grpc_chttp2_stream_global *stream_global; - - GPR_TIMER_BEGIN("grpc_chttp2_terminate_writing", 0); - - lock(t); +static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s_ignored, + void *a) { + bool success = (bool)(uintptr_t)a; allow_endpoint_shutdown_locked(exec_ctx, t); @@ -663,24 +745,30 @@ void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_cleanup_writing(exec_ctx, &t->global, &t->writing); + grpc_chttp2_stream_global *stream_global; while (grpc_chttp2_list_pop_closed_waiting_for_writing(&t->global, &stream_global)) { fail_pending_writes(exec_ctx, stream_global); GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "finish_writes"); } - /* leave the writing flag up on shutdown to prevent further writes in unlock() + /* leave the writing flag up on shutdown to prevent further writes in + unlock() from starting */ - t->writing_active = 0; + t->executor.writing_active = 0; if (t->ep && !t->endpoint_reading) { destroy_endpoint(exec_ctx, t); } - unlock(exec_ctx, t); - UNREF_TRANSPORT(exec_ctx, t, "writing"); +} - GPR_TIMER_END("grpc_chttp2_terminate_writing", 0); +void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx, + void *transport_writing, bool success) { + grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing); + grpc_chttp2_run_with_global_lock(exec_ctx, t, NULL, + terminate_writing_with_lock, + (void *)(uintptr_t)success, 0); } static void writing_action(grpc_exec_ctx *exec_ctx, void *gt, @@ -806,14 +894,16 @@ static int contains_non_ok_status( static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, bool success) {} -static void perform_stream_op_locked( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global *stream_global, grpc_transport_stream_op *op) { - grpc_closure *on_complete; - +static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s, void *stream_op) { GPR_TIMER_BEGIN("perform_stream_op_locked", 0); - on_complete = op->on_complete; + grpc_transport_stream_op *op = stream_op; + grpc_chttp2_transport_global *transport_global = &t->global; + grpc_chttp2_stream_global *stream_global = &s->global; + + grpc_closure *on_complete = op->on_complete; if (on_complete == NULL) { on_complete = grpc_closure_create(do_nothing, NULL); } @@ -938,10 +1028,8 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_stream *gs, grpc_transport_stream_op *op) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; - - lock(t); - perform_stream_op_locked(exec_ctx, &t->global, &s->global, op); - unlock(exec_ctx, t); + grpc_chttp2_run_with_global_lock(exec_ctx, t, s, perform_stream_op_locked, op, + sizeof(*op)); } static void send_ping_locked(grpc_chttp2_transport *t, grpc_closure *on_recv) { @@ -961,13 +1049,10 @@ static void send_ping_locked(grpc_chttp2_transport *t, grpc_closure *on_recv) { gpr_slice_buffer_add(&t->global.qbuf, grpc_chttp2_ping_create(0, p->id)); } -void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport_parsing *transport_parsing, - const uint8_t *opaque_8bytes) { +static void ack_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_chttp2_stream *s, void *opaque_8bytes) { grpc_chttp2_outstanding_ping *ping; - grpc_chttp2_transport *t = TRANSPORT_FROM_PARSING(transport_parsing); grpc_chttp2_transport_global *transport_global = &t->global; - lock(t); for (ping = transport_global->pings.next; ping != &transport_global->pings; ping = ping->next) { if (0 == memcmp(opaque_8bytes, ping->id, 8)) { @@ -978,13 +1063,31 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, break; } } - unlock(exec_ctx, t); +} + +void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_parsing *transport_parsing, + const uint8_t *opaque_8bytes) { + grpc_chttp2_run_with_global_lock( + exec_ctx, TRANSPORT_FROM_PARSING(transport_parsing), NULL, + ack_ping_locked, (void *)opaque_8bytes, 8); } static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_transport_op *op) { - bool close_transport = false; + grpc_chttp2_stream *s_unused, + void *stream_op) { + grpc_transport_op *op = stream_op; + bool close_transport = op->disconnect; + + /* If there's a set_accept_stream ensure that we're not parsing + to avoid changing things out from underneath */ + if (t->executor.parsing_active && op->set_accept_stream) { + GPR_ASSERT(t->post_parsing_op == NULL); + t->post_parsing_op = gpr_malloc(sizeof(*op)); + memcpy(t->post_parsing_op, op, sizeof(*op)); + return; + } grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL); @@ -1010,47 +1113,31 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, } if (op->bind_pollset) { - add_to_pollset_locked(exec_ctx, t, op->bind_pollset); + add_to_pollset_locked(exec_ctx, t, NULL, op->bind_pollset); } if (op->bind_pollset_set) { - add_to_pollset_set_locked(exec_ctx, t, op->bind_pollset_set); + add_to_pollset_set_locked(exec_ctx, t, NULL, op->bind_pollset_set); } if (op->send_ping) { send_ping_locked(t, op->send_ping); } - if (op->disconnect) { - close_transport_locked(exec_ctx, t); - } - if (close_transport) { - close_transport_locked(exec_ctx, t); + close_transport_locked(exec_ctx, t, NULL, NULL); } } static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_transport_op *op) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; - - lock(t); - - /* If there's a set_accept_stream ensure that we're not parsing - to avoid changing things out from underneath */ - if (t->parsing_active && op->set_accept_stream) { - GPR_ASSERT(t->post_parsing_op == NULL); - t->post_parsing_op = gpr_malloc(sizeof(*op)); - memcpy(t->post_parsing_op, op, sizeof(*op)); - } else { - perform_transport_op_locked(exec_ctx, t, op); - } - - unlock(exec_ctx, t); + grpc_chttp2_run_with_global_lock( + exec_ctx, t, NULL, perform_transport_op_locked, op, sizeof(*op)); } /******************************************************************************* - * INPUT PROCESSING + * INPUT PROCESSING - GENERAL */ static void check_read_ops(grpc_exec_ctx *exec_ctx, @@ -1072,7 +1159,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, while (stream_global->seen_error && (bs = grpc_chttp2_incoming_frame_queue_pop( &stream_global->incoming_frames)) != NULL) { - grpc_byte_stream_destroy(exec_ctx, bs); + incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs); } if (stream_global->incoming_frames.head != NULL) { *stream_global->recv_message = grpc_chttp2_incoming_frame_queue_pop( @@ -1093,9 +1180,9 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, while (stream_global->seen_error && (bs = grpc_chttp2_incoming_frame_queue_pop( &stream_global->incoming_frames)) != NULL) { - grpc_byte_stream_destroy(exec_ctx, bs); + incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs); } - if (stream_global->incoming_frames.head == NULL) { + if (stream_global->all_incoming_byte_streams_finished) { grpc_chttp2_incoming_metadata_buffer_publish( &stream_global->received_trailing_metadata, stream_global->recv_trailing_metadata); @@ -1107,6 +1194,15 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, } } +static void decrement_active_streams_locked( + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global) { + if ((stream_global->all_incoming_byte_streams_finished = + gpr_unref(&stream_global->active_streams))) { + grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); + } +} + static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, uint32_t id) { size_t new_stream_count; @@ -1128,7 +1224,7 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) { - close_transport_locked(exec_ctx, t); + close_transport_locked(exec_ctx, t, NULL, NULL); } if (grpc_chttp2_list_remove_writable_stream(&t->global, &s->global)) { GRPC_CHTTP2_STREAM_UNREF(exec_ctx, &s->global, "chttp2_writing"); @@ -1229,10 +1325,11 @@ void grpc_chttp2_mark_stream_closed( stream_global->read_closed = 1; stream_global->published_initial_metadata = 1; stream_global->published_trailing_metadata = 1; + decrement_active_streams_locked(exec_ctx, transport_global, stream_global); } if (close_writes && !stream_global->write_closed) { stream_global->write_closed = 1; - if (TRANSPORT_FROM_GLOBAL(transport_global)->writing_active) { + if (TRANSPORT_FROM_GLOBAL(transport_global)->executor.writing_active) { GRPC_CHTTP2_STREAM_REF(stream_global, "finish_writes"); grpc_chttp2_list_add_closed_waiting_for_writing(transport_global, stream_global); @@ -1242,7 +1339,7 @@ void grpc_chttp2_mark_stream_closed( } if (stream_global->read_closed && stream_global->write_closed) { if (stream_global->id != 0 && - TRANSPORT_FROM_GLOBAL(transport_global)->parsing_active) { + TRANSPORT_FROM_GLOBAL(transport_global)->executor.parsing_active) { grpc_chttp2_list_add_closed_waiting_for_parsing(transport_global, stream_global); } else { @@ -1374,7 +1471,7 @@ static void end_all_the_calls(grpc_exec_ctx *exec_ctx, } static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { - close_transport_locked(exec_ctx, t); + close_transport_locked(exec_ctx, t, NULL, NULL); end_all_the_calls(exec_ctx, t); } @@ -1398,102 +1495,136 @@ static void update_global_window(void *args, uint32_t id, void *stream) { } } -static void read_error_locked(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t) { - t->endpoint_reading = 0; - if (!t->writing_active && t->ep) { - destroy_endpoint(exec_ctx, t); - } -} +/******************************************************************************* + * INPUT PROCESSING - PARSING + */ -/* tcp read callback */ -static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, bool success) { - size_t i; - int keep_reading = 0; - grpc_chttp2_transport *t = tp; +static void reading_action_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s_unused, void *arg); +static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg, bool success); +static void post_reading_action_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s_unused, void *arg); +static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_chttp2_stream *s_unused, void *arg); + +static void reading_action(grpc_exec_ctx *exec_ctx, void *tp, bool success) { + /* Control flow: + reading_action_locked -> + (parse_unlocked -> post_parse_locked)? -> + post_reading_action_locked */ + grpc_chttp2_run_with_global_lock(exec_ctx, tp, NULL, reading_action_locked, + (void *)(uintptr_t)success, 0); +} + +static void reading_action_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s_unused, void *arg) { grpc_chttp2_transport_global *transport_global = &t->global; grpc_chttp2_transport_parsing *transport_parsing = &t->parsing; - grpc_chttp2_stream_global *stream_global; - - GPR_TIMER_BEGIN("recv_data", 0); + bool success = (bool)(uintptr_t)arg; - lock(t); - i = 0; - GPR_ASSERT(!t->parsing_active); + GPR_ASSERT(!t->executor.parsing_active); if (!t->closed) { - t->parsing_active = 1; + t->executor.parsing_active = 1; /* merge stream lists */ grpc_chttp2_stream_map_move_into(&t->new_stream_map, &t->parsing_stream_map); grpc_chttp2_prepare_to_read(transport_global, transport_parsing); - gpr_mu_unlock(&t->mu); - GPR_TIMER_BEGIN("recv_data.parse", 0); - for (; i < t->read_buffer.count && - grpc_chttp2_perform_read(exec_ctx, transport_parsing, - t->read_buffer.slices[i]); - i++) - ; - GPR_TIMER_END("recv_data.parse", 0); - gpr_mu_lock(&t->mu); - /* copy parsing qbuf to global qbuf */ - gpr_slice_buffer_move_into(&t->parsing.qbuf, &t->global.qbuf); - if (i != t->read_buffer.count) { - unlock(exec_ctx, t); - lock(t); - drop_connection(exec_ctx, t); - } - /* merge stream lists */ - grpc_chttp2_stream_map_move_into(&t->new_stream_map, - &t->parsing_stream_map); - transport_global->concurrent_stream_count = - (uint32_t)grpc_chttp2_stream_map_size(&t->parsing_stream_map); - if (transport_parsing->initial_window_update != 0) { - grpc_chttp2_stream_map_for_each(&t->parsing_stream_map, - update_global_window, t); - transport_parsing->initial_window_update = 0; - } - /* handle higher level things */ - grpc_chttp2_publish_reads(exec_ctx, transport_global, transport_parsing); - t->parsing_active = 0; - /* handle delayed transport ops (if there is one) */ - if (t->post_parsing_op) { - grpc_transport_op *op = t->post_parsing_op; - t->post_parsing_op = NULL; - perform_transport_op_locked(exec_ctx, t, op); - gpr_free(op); - } - /* if a stream is in the stream map, and gets cancelled, we need to ensure - * we are not parsing before continuing the cancellation to keep things in - * a sane state */ - while (grpc_chttp2_list_pop_closed_waiting_for_parsing(transport_global, - &stream_global)) { - GPR_ASSERT(stream_global->in_stream_map); - GPR_ASSERT(stream_global->write_closed); - GPR_ASSERT(stream_global->read_closed); - remove_stream(exec_ctx, t, stream_global->id); - GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2"); - } + grpc_exec_ctx_enqueue(exec_ctx, &t->parsing_action, success, NULL); + } else { + post_reading_action_locked(exec_ctx, t, s_unused, arg); + } +} + +static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg, bool success) { + grpc_chttp2_transport *t = arg; + GPR_TIMER_BEGIN("reading_action.parse", 0); + size_t i = 0; + for (; i < t->read_buffer.count && + grpc_chttp2_perform_read(exec_ctx, &t->parsing, + t->read_buffer.slices[i]); + i++) + ; + if (i != t->read_buffer.count) { + success = false; + } + GPR_TIMER_END("reading_action.parse", 0); + grpc_chttp2_run_with_global_lock(exec_ctx, t, NULL, post_parse_locked, + (void *)(uintptr_t)success, 0); +} + +static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_chttp2_stream *s_unused, void *arg) { + grpc_chttp2_transport_global *transport_global = &t->global; + grpc_chttp2_transport_parsing *transport_parsing = &t->parsing; + /* copy parsing qbuf to global qbuf */ + gpr_slice_buffer_move_into(&t->parsing.qbuf, &t->global.qbuf); + /* merge stream lists */ + grpc_chttp2_stream_map_move_into(&t->new_stream_map, &t->parsing_stream_map); + transport_global->concurrent_stream_count = + (uint32_t)grpc_chttp2_stream_map_size(&t->parsing_stream_map); + if (transport_parsing->initial_window_update != 0) { + grpc_chttp2_stream_map_for_each(&t->parsing_stream_map, + update_global_window, t); + transport_parsing->initial_window_update = 0; + } + /* handle higher level things */ + grpc_chttp2_publish_reads(exec_ctx, transport_global, transport_parsing); + t->executor.parsing_active = 0; + /* handle delayed transport ops (if there is one) */ + if (t->post_parsing_op) { + grpc_transport_op *op = t->post_parsing_op; + t->post_parsing_op = NULL; + perform_transport_op_locked(exec_ctx, t, NULL, op); + gpr_free(op); + } + /* if a stream is in the stream map, and gets cancelled, we need to + * ensure we are not parsing before continuing the cancellation to keep + * things in a sane state */ + grpc_chttp2_stream_global *stream_global; + while (grpc_chttp2_list_pop_closed_waiting_for_parsing(transport_global, + &stream_global)) { + GPR_ASSERT(stream_global->in_stream_map); + GPR_ASSERT(stream_global->write_closed); + GPR_ASSERT(stream_global->read_closed); + remove_stream(exec_ctx, t, stream_global->id); + GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2"); } - if (!success || i != t->read_buffer.count || t->closed) { + + post_reading_action_locked(exec_ctx, t, s_unused, arg); +} + +static void post_reading_action_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s_unused, + void *arg) { + bool success = (bool)(uintptr_t)arg; + bool keep_reading = false; + if (!success || t->closed) { drop_connection(exec_ctx, t); - read_error_locked(exec_ctx, t); + t->endpoint_reading = 0; + if (!t->executor.writing_active && t->ep) { + grpc_endpoint_destroy(exec_ctx, t->ep); + t->ep = NULL; + /* safe as we still have a ref for read */ + UNREF_TRANSPORT(exec_ctx, t, "disconnect"); + } } else if (!t->closed) { - keep_reading = 1; + keep_reading = true; REF_TRANSPORT(t, "keep_reading"); prevent_endpoint_shutdown(t); } gpr_slice_buffer_reset_and_unref(&t->read_buffer); - unlock(exec_ctx, t); if (keep_reading) { - grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, &t->recv_data); - allow_endpoint_shutdown_unlocked(exec_ctx, t); + grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, &t->reading_action); + allow_endpoint_shutdown_locked(exec_ctx, t); UNREF_TRANSPORT(exec_ctx, t, "keep_reading"); } else { - UNREF_TRANSPORT(exec_ctx, t, "recv_data"); + UNREF_TRANSPORT(exec_ctx, t, "reading_action"); } - - GPR_TIMER_END("recv_data", 0); } /******************************************************************************* @@ -1517,7 +1648,7 @@ static void connectivity_state_set( static void add_to_pollset_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_pollset *pollset) { + grpc_chttp2_stream *s_unused, void *pollset) { if (t->ep) { grpc_endpoint_add_to_pollset(exec_ctx, t->ep, pollset); } @@ -1525,7 +1656,8 @@ static void add_to_pollset_locked(grpc_exec_ctx *exec_ctx, static void add_to_pollset_set_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_pollset_set *pollset_set) { + grpc_chttp2_stream *s_unused, + void *pollset_set) { if (t->ep) { grpc_endpoint_add_to_pollset_set(exec_ctx, t->ep, pollset_set); } @@ -1533,24 +1665,31 @@ static void add_to_pollset_set_locked(grpc_exec_ctx *exec_ctx, static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_stream *gs, grpc_pollset *pollset) { - grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; - lock(t); - add_to_pollset_locked(exec_ctx, t, pollset); - unlock(exec_ctx, t); + /* TODO(ctiller): keep pollset alive */ + grpc_chttp2_run_with_global_lock(exec_ctx, (grpc_chttp2_transport *)gt, + (grpc_chttp2_stream *)gs, + add_to_pollset_locked, pollset, 0); } static void set_pollset_set(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_stream *gs, grpc_pollset_set *pollset_set) { - grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; - lock(t); - add_to_pollset_set_locked(exec_ctx, t, pollset_set); - unlock(exec_ctx, t); + grpc_chttp2_run_with_global_lock(exec_ctx, (grpc_chttp2_transport *)gt, + (grpc_chttp2_stream *)gs, + add_to_pollset_set_locked, pollset_set, 0); } /******************************************************************************* * BYTE STREAM */ +static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx, + grpc_chttp2_incoming_byte_stream *bs) { + if (gpr_unref(&bs->refs)) { + gpr_slice_buffer_destroy(&bs->slices); + gpr_free(bs); + } +} + static void incoming_byte_stream_update_flow_control( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, size_t max_size_hint, @@ -1591,87 +1730,146 @@ static void incoming_byte_stream_update_flow_control( } } -static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *byte_stream, - gpr_slice *slice, size_t max_size_hint, - grpc_closure *on_complete) { +typedef struct { + grpc_chttp2_incoming_byte_stream *byte_stream; + gpr_slice *slice; + size_t max_size_hint; + grpc_closure *on_complete; +} incoming_byte_stream_next_arg; + +static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s, + void *argp) { + incoming_byte_stream_next_arg *arg = argp; grpc_chttp2_incoming_byte_stream *bs = - (grpc_chttp2_incoming_byte_stream *)byte_stream; + (grpc_chttp2_incoming_byte_stream *)arg->byte_stream; grpc_chttp2_transport_global *transport_global = &bs->transport->global; grpc_chttp2_stream_global *stream_global = &bs->stream->global; - lock(bs->transport); if (bs->is_tail) { - incoming_byte_stream_update_flow_control(transport_global, stream_global, - max_size_hint, bs->slices.length); + incoming_byte_stream_update_flow_control( + transport_global, stream_global, arg->max_size_hint, bs->slices.length); } if (bs->slices.count > 0) { - *slice = gpr_slice_buffer_take_first(&bs->slices); - unlock(exec_ctx, bs->transport); - return 1; + *arg->slice = gpr_slice_buffer_take_first(&bs->slices); + grpc_exec_ctx_enqueue(exec_ctx, arg->on_complete, true, NULL); } else if (bs->failed) { - grpc_exec_ctx_enqueue(exec_ctx, on_complete, false, NULL); - unlock(exec_ctx, bs->transport); - return 0; + grpc_exec_ctx_enqueue(exec_ctx, arg->on_complete, false, NULL); } else { - bs->on_next = on_complete; - bs->next = slice; - unlock(exec_ctx, bs->transport); - return 0; + bs->on_next = arg->on_complete; + bs->next = arg->slice; } + incoming_byte_stream_unref(exec_ctx, bs); } -static void incoming_byte_stream_unref(grpc_chttp2_incoming_byte_stream *bs) { - if (gpr_unref(&bs->refs)) { - gpr_slice_buffer_destroy(&bs->slices); - gpr_free(bs); - } +static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream, + gpr_slice *slice, size_t max_size_hint, + grpc_closure *on_complete) { + grpc_chttp2_incoming_byte_stream *bs = + (grpc_chttp2_incoming_byte_stream *)byte_stream; + incoming_byte_stream_next_arg arg = {bs, slice, max_size_hint, on_complete}; + gpr_ref(&bs->refs); + grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream, + incoming_byte_stream_next_locked, &arg, + sizeof(arg)); + return 0; +} + +static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream); + +static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s, + void *byte_stream) { + grpc_chttp2_incoming_byte_stream *bs = byte_stream; + GPR_ASSERT(bs->base.destroy == incoming_byte_stream_destroy); + decrement_active_streams_locked(exec_ctx, &bs->transport->global, + &bs->stream->global); + incoming_byte_stream_unref(exec_ctx, bs); } static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream) { - incoming_byte_stream_unref((grpc_chttp2_incoming_byte_stream *)byte_stream); + grpc_chttp2_incoming_byte_stream *bs = + (grpc_chttp2_incoming_byte_stream *)byte_stream; + grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream, + incoming_byte_stream_destroy_locked, bs, 0); } -void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, - grpc_chttp2_incoming_byte_stream *bs, - gpr_slice slice) { - gpr_mu_lock(&bs->transport->mu); +typedef struct { + grpc_chttp2_incoming_byte_stream *byte_stream; + gpr_slice slice; +} incoming_byte_stream_push_arg; + +static void incoming_byte_stream_push_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s, + void *argp) { + incoming_byte_stream_push_arg *arg = argp; + grpc_chttp2_incoming_byte_stream *bs = arg->byte_stream; if (bs->on_next != NULL) { - *bs->next = slice; + *bs->next = arg->slice; grpc_exec_ctx_enqueue(exec_ctx, bs->on_next, true, NULL); bs->on_next = NULL; } else { - gpr_slice_buffer_add(&bs->slices, slice); + gpr_slice_buffer_add(&bs->slices, arg->slice); } - gpr_mu_unlock(&bs->transport->mu); + incoming_byte_stream_unref(exec_ctx, bs); +} + +void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, + grpc_chttp2_incoming_byte_stream *bs, + gpr_slice slice) { + incoming_byte_stream_push_arg arg = {bs, slice}; + gpr_ref(&bs->refs); + grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream, + incoming_byte_stream_push_locked, &arg, + sizeof(arg)); +} + +static void incoming_byte_stream_finished_failed_locked( + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, + void *argp) { + grpc_chttp2_incoming_byte_stream *bs = argp; + grpc_exec_ctx_enqueue(exec_ctx, bs->on_next, false, NULL); + bs->on_next = NULL; + bs->failed = 1; + incoming_byte_stream_unref(exec_ctx, bs); +} + +static void incoming_byte_stream_finished_ok_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s, + void *argp) { + grpc_chttp2_incoming_byte_stream *bs = argp; + incoming_byte_stream_unref(exec_ctx, bs); } void grpc_chttp2_incoming_byte_stream_finished( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, int success, int from_parsing_thread) { - if (!success) { - if (from_parsing_thread) { - gpr_mu_lock(&bs->transport->mu); - } - grpc_exec_ctx_enqueue(exec_ctx, bs->on_next, false, NULL); - bs->on_next = NULL; - bs->failed = 1; - if (from_parsing_thread) { - gpr_mu_unlock(&bs->transport->mu); + if (from_parsing_thread) { + if (success) { + grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream, + incoming_byte_stream_finished_ok_locked, + bs, 0); + } else { + incoming_byte_stream_finished_ok_locked(exec_ctx, bs->transport, + bs->stream, bs); } } else { -#ifndef NDEBUG - if (from_parsing_thread) { - gpr_mu_lock(&bs->transport->mu); - } - GPR_ASSERT(bs->on_next == NULL); - if (from_parsing_thread) { - gpr_mu_unlock(&bs->transport->mu); + if (success) { + grpc_chttp2_run_with_global_lock( + exec_ctx, bs->transport, bs->stream, + incoming_byte_stream_finished_failed_locked, bs, 0); + } else { + incoming_byte_stream_finished_failed_locked(exec_ctx, bs->transport, + bs->stream, bs); } -#endif } - incoming_byte_stream_unref(bs); } grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( @@ -1688,6 +1886,7 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( incoming_byte_stream->next_message = NULL; incoming_byte_stream->transport = TRANSPORT_FROM_PARSING(transport_parsing); incoming_byte_stream->stream = STREAM_FROM_PARSING(stream_parsing); + gpr_ref(&incoming_byte_stream->stream->global.active_streams); gpr_slice_buffer_init(&incoming_byte_stream->slices); incoming_byte_stream->on_next = NULL; incoming_byte_stream->is_tail = 1; @@ -1819,7 +2018,7 @@ void grpc_chttp2_transport_start_reading(grpc_exec_ctx *exec_ctx, grpc_transport *transport, gpr_slice *slices, size_t nslices) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)transport; - REF_TRANSPORT(t, "recv_data"); /* matches unref inside recv_data */ + REF_TRANSPORT(t, "reading_action"); /* matches unref inside reading_action */ gpr_slice_buffer_addn(&t->read_buffer, slices, nslices); - recv_data(exec_ctx, t, 1); + reading_action(exec_ctx, t, 1); } diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 98cd38abd4..8c4fa2d34a 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -236,9 +236,6 @@ struct grpc_chttp2_transport_parsing { /** was a goaway frame received? */ uint8_t goaway_received; - /** the last sent max_table_size setting */ - uint32_t last_sent_max_table_size; - /** initial window change */ int64_t initial_window_update; @@ -272,6 +269,9 @@ struct grpc_chttp2_transport_parsing { uint32_t incoming_frame_size; uint32_t incoming_stream_id; + /* current max frame size */ + uint32_t max_frame_size; + /* active parser */ void *parser_data; grpc_chttp2_stream_parsing *incoming_stream; @@ -282,6 +282,8 @@ struct grpc_chttp2_transport_parsing { /* received settings */ uint32_t settings[GRPC_CHTTP2_NUM_SETTINGS]; + /* last settings that were sent */ + uint32_t last_sent_settings[GRPC_CHTTP2_NUM_SETTINGS]; /* goaway data */ grpc_status_code goaway_error; @@ -291,27 +293,45 @@ struct grpc_chttp2_transport_parsing { int64_t outgoing_window; }; +typedef void (*grpc_chttp2_locked_action)(grpc_exec_ctx *ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s, void *arg); + +typedef struct grpc_chttp2_executor_action_header { + grpc_chttp2_stream *stream; + grpc_chttp2_locked_action action; + struct grpc_chttp2_executor_action_header *next; + void *arg; +} grpc_chttp2_executor_action_header; + struct grpc_chttp2_transport { grpc_transport base; /* must be first */ - grpc_endpoint *ep; gpr_refcount refs; + grpc_endpoint *ep; char *peer_string; /** when this drops to zero it's safe to shutdown the endpoint */ gpr_refcount shutdown_ep_refs; - gpr_mu mu; + struct { + gpr_mu mu; + + /** is a thread currently in the global lock */ + bool global_active; + /** is a thread currently writing */ + bool writing_active; + /** is a thread currently parsing */ + bool parsing_active; + + grpc_chttp2_executor_action_header *pending_actions_head; + grpc_chttp2_executor_action_header *pending_actions_tail; + } executor; /** is the transport destroying itself? */ uint8_t destroying; /** has the upper layer closed the transport? */ uint8_t closed; - /** is a thread currently writing */ - uint8_t writing_active; - /** is a thread currently parsing */ - uint8_t parsing_active; - /** is there a read request to the endpoint outstanding? */ uint8_t endpoint_reading; @@ -338,8 +358,10 @@ struct grpc_chttp2_transport { /** closure to execute writing */ grpc_closure writing_action; - /** closure to finish reading from the endpoint */ - grpc_closure recv_data; + /** closure to start reading from the endpoint */ + grpc_closure reading_action; + /** closure to actually do parsing */ + grpc_closure parsing_action; /** incoming read bytes */ gpr_slice_buffer read_buffer; @@ -397,21 +419,26 @@ typedef struct { grpc_transport_stream_stats *collecting_stats; grpc_transport_stream_stats stats; + /** number of streams that are currently being read */ + gpr_refcount active_streams; + /** when the application requests writes be closed, the write_closed is 'queued'; when the close is flow controlled into the send path, we are 'sending' it; when the write has been performed it is 'sent' */ - uint8_t write_closed; + bool write_closed; /** is this stream reading half-closed (boolean) */ - uint8_t read_closed; + bool read_closed; + /** are all published incoming byte streams closed */ + bool all_incoming_byte_streams_finished; /** is this stream in the stream map? (boolean) */ - uint8_t in_stream_map; + bool in_stream_map; /** has this stream seen an error? if 1, then pending incoming frames can be thrown away */ - uint8_t seen_error; + bool seen_error; - uint8_t published_initial_metadata; - uint8_t published_trailing_metadata; - uint8_t faked_trailing_metadata; + bool published_initial_metadata; + bool published_trailing_metadata; + bool faked_trailing_metadata; grpc_chttp2_incoming_metadata_buffer received_initial_metadata; grpc_chttp2_incoming_metadata_buffer received_trailing_metadata; @@ -570,6 +597,9 @@ int grpc_chttp2_list_pop_waiting_for_concurrency( void grpc_chttp2_list_add_check_read_ops( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global); +bool grpc_chttp2_list_remove_check_read_ops( + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global); int grpc_chttp2_list_pop_check_read_ops( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global **stream_global); @@ -645,6 +675,12 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream_global *stream_global, grpc_closure **pclosure, int success); +void grpc_chttp2_run_with_global_lock(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *transport, + grpc_chttp2_stream *optional_stream, + grpc_chttp2_locked_action action, + void *arg, size_t sizeof_arg); + #define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" #define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \ (sizeof(GRPC_CHTTP2_CLIENT_CONNECT_STRING) - 1) diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c index e827a43f7a..2995066e51 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.c +++ b/src/core/ext/transport/chttp2/transport/parsing.c @@ -79,9 +79,12 @@ void grpc_chttp2_prepare_to_read( GPR_TIMER_BEGIN("grpc_chttp2_prepare_to_read", 0); transport_parsing->next_stream_id = transport_global->next_stream_id; - transport_parsing->last_sent_max_table_size = - transport_global->settings[GRPC_SENT_SETTINGS] - [GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]; + memcpy(transport_parsing->last_sent_settings, + transport_global->settings[GRPC_SENT_SETTINGS], + sizeof(transport_parsing->last_sent_settings)); + transport_parsing->max_frame_size = + transport_global->settings[GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE]; /* update the parsing view of incoming window */ while (grpc_chttp2_list_pop_unannounced_incoming_window_available( @@ -388,6 +391,12 @@ int grpc_chttp2_perform_read(grpc_exec_ctx *exec_ctx, return 1; } goto dts_fh_0; /* loop */ + } else if (transport_parsing->incoming_frame_size > + transport_parsing->max_frame_size) { + gpr_log(GPR_DEBUG, "Frame size %d is larger than max frame size %d", + transport_parsing->incoming_frame_size, + transport_parsing->max_frame_size); + return 0; } if (++cur == end) { return 1; @@ -840,7 +849,11 @@ static int init_settings_frame_parser( transport_parsing->settings_ack_received = 1; grpc_chttp2_hptbl_set_max_bytes( &transport_parsing->hpack_parser.table, - transport_parsing->last_sent_max_table_size); + transport_parsing + ->last_sent_settings[GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]); + transport_parsing->max_frame_size = + transport_parsing + ->last_sent_settings[GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE]; } transport_parsing->parser = grpc_chttp2_settings_parser_parse; transport_parsing->parser_data = &transport_parsing->simple.settings; diff --git a/src/core/ext/transport/chttp2/transport/stream_lists.c b/src/core/ext/transport/chttp2/transport/stream_lists.c index e5b35aadca..8f3ab00e6d 100644 --- a/src/core/ext/transport/chttp2/transport/stream_lists.c +++ b/src/core/ext/transport/chttp2/transport/stream_lists.c @@ -305,6 +305,14 @@ void grpc_chttp2_list_add_check_read_ops( GRPC_CHTTP2_LIST_CHECK_READ_OPS); } +bool grpc_chttp2_list_remove_check_read_ops( + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global) { + return stream_list_maybe_remove(TRANSPORT_FROM_GLOBAL(transport_global), + STREAM_FROM_GLOBAL(stream_global), + GRPC_CHTTP2_LIST_CHECK_READ_OPS); +} + int grpc_chttp2_list_pop_check_read_ops( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global **stream_global) { diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c index cedc8ade82..81135d2fc0 100644 --- a/src/core/lib/channel/channel_stack.c +++ b/src/core/lib/channel/channel_stack.c @@ -214,14 +214,16 @@ void grpc_call_stack_ignore_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_pops *pops) {} -void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack) { +void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack, + void *and_free_memory) { grpc_call_element *elems = CALL_ELEMS_FROM_STACK(stack); size_t count = stack->count; size_t i; /* destroy per-filter data */ for (i = 0; i < count; i++) { - elems[i].filter->destroy_call_elem(exec_ctx, &elems[i]); + elems[i].filter->destroy_call_elem(exec_ctx, &elems[i], + i == count - 1 ? and_free_memory : NULL); } } diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h index c9d13ef193..7be86f788c 100644 --- a/src/core/lib/channel/channel_stack.h +++ b/src/core/lib/channel/channel_stack.h @@ -105,8 +105,12 @@ typedef struct { void (*set_pollset_or_pollset_set)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_pops *pops); /* Destroy per call data. - The filter does not need to do any chaining */ - void (*destroy_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem); + The filter does not need to do any chaining. + The bottom filter of a stack will be passed a non-NULL pointer to + \a and_free_memory that should be passed to gpr_free when destruction + is complete. */ + void (*destroy_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + void *and_free_memory); /* sizeof(per channel data) */ size_t sizeof_channel_data; @@ -225,7 +229,8 @@ void grpc_call_stack_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx, #endif /* Destroy a call stack */ -void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack); +void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack, + void *and_free_memory); /* Ignore set pollset{_set} - used by filters if they don't care about pollsets * at all. Does nothing. */ diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c index d7c835bc01..61379d1601 100644 --- a/src/core/lib/channel/compress_filter.c +++ b/src/core/lib/channel/compress_filter.c @@ -47,6 +47,8 @@ #include "src/core/lib/support/string.h" #include "src/core/lib/transport/static_metadata.h" +int grpc_compress_filter_trace = 0; + typedef struct call_data { gpr_slice_buffer slices; /**< Buffers up input slices to be compressed */ grpc_linked_mdelem compression_algorithm_storage; @@ -169,9 +171,29 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, did_compress = grpc_msg_compress(calld->compression_algorithm, &calld->slices, &tmp); if (did_compress) { + if (grpc_compress_filter_trace) { + char *algo_name; + const size_t before_size = calld->slices.length; + const size_t after_size = tmp.length; + const float savings_ratio = 1.0f - (float)after_size / (float)before_size; + GPR_ASSERT(grpc_compression_algorithm_name(calld->compression_algorithm, + &algo_name)); + gpr_log(GPR_DEBUG, + "Compressed[%s] %d bytes vs. %d bytes (%.2f%% savings)", + algo_name, before_size, after_size, 100 * savings_ratio); + } gpr_slice_buffer_swap(&calld->slices, &tmp); calld->send_flags |= GRPC_WRITE_INTERNAL_COMPRESS; + } else { + if (grpc_compress_filter_trace) { + char *algo_name; + GPR_ASSERT(grpc_compression_algorithm_name(calld->compression_algorithm, + &algo_name)); + gpr_log(GPR_DEBUG, "Algorithm '%s' enabled but decided not to compress.", + algo_name); + } } + gpr_slice_buffer_destroy(&tmp); grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices, @@ -246,8 +268,8 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, } /* Destructor for call_data */ -static void destroy_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { +static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + void *ignored) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; gpr_slice_buffer_destroy(&calld->slices); diff --git a/src/core/lib/channel/compress_filter.h b/src/core/lib/channel/compress_filter.h index 0d973329c4..cf5879d82e 100644 --- a/src/core/lib/channel/compress_filter.h +++ b/src/core/lib/channel/compress_filter.h @@ -38,6 +38,8 @@ #define GRPC_COMPRESS_REQUEST_ALGORITHM_KEY "grpc-internal-encoding-request" +extern int grpc_compress_filter_trace; + /** Compression filter for outgoing data. * * See <grpc/compression.h> for the available compression settings. diff --git a/src/core/lib/channel/connected_channel.c b/src/core/lib/channel/connected_channel.c index 314ff6e5ce..6e857ae61a 100644 --- a/src/core/lib/channel/connected_channel.c +++ b/src/core/lib/channel/connected_channel.c @@ -103,12 +103,13 @@ static void set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx, } /* Destructor for call_data */ -static void destroy_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { +static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + void *and_free_memory) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; grpc_transport_destroy_stream(exec_ctx, chand->transport, - TRANSPORT_STREAM_FROM_CALL_DATA(calld)); + TRANSPORT_STREAM_FROM_CALL_DATA(calld), + and_free_memory); } /* Constructor for channel_data */ diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c index 740c302f83..c5dcd3ea68 100644 --- a/src/core/lib/channel/http_client_filter.c +++ b/src/core/lib/channel/http_client_filter.c @@ -155,8 +155,8 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, } /* Destructor for call_data */ -static void destroy_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) {} +static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + void *ignored) {} static grpc_mdelem *scheme_from_args(const grpc_channel_args *args) { unsigned i; diff --git a/src/core/lib/channel/http_server_filter.c b/src/core/lib/channel/http_server_filter.c index 9803cfab87..cf2057838d 100644 --- a/src/core/lib/channel/http_server_filter.c +++ b/src/core/lib/channel/http_server_filter.c @@ -225,8 +225,8 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, } /* Destructor for call_data */ -static void destroy_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) {} +static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + void *ignored) {} /* Constructor for channel_data */ static void init_channel_elem(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index 0eb95a2e09..7df1751352 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -44,7 +44,6 @@ static const grpc_event_engine_vtable *g_event_engine; grpc_poll_function_type grpc_poll_function = poll; -grpc_wakeup_fd grpc_global_wakeup_fd; void grpc_event_engine_init(void) { if ((g_event_engine = grpc_init_poll_and_epoll_posix())) { diff --git a/src/core/lib/iomgr/iomgr.c b/src/core/lib/iomgr/iomgr.c index 146663984d..60cef8ba77 100644 --- a/src/core/lib/iomgr/iomgr.c +++ b/src/core/lib/iomgr/iomgr.c @@ -166,8 +166,10 @@ bool grpc_iomgr_abort_on_leaks(void) { if (env == NULL) return false; static const char *truthy[] = {"yes", "Yes", "YES", "true", "True", "TRUE", "1"}; + bool should_we = false; for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) { - if (0 == strcmp(env, truthy[i])) return true; + if (0 == strcmp(env, truthy[i])) should_we = true; } - return false; + gpr_free(env); + return should_we; } diff --git a/src/core/lib/iomgr/tcp_client_windows.c b/src/core/lib/iomgr/tcp_client_windows.c index 7d78beb15a..66f9ff7a46 100644 --- a/src/core/lib/iomgr/tcp_client_windows.c +++ b/src/core/lib/iomgr/tcp_client_windows.c @@ -63,39 +63,45 @@ typedef struct { grpc_endpoint **endpoint; } async_connect; -static void async_connect_unlock_and_cleanup(async_connect *ac) { +static void async_connect_unlock_and_cleanup(async_connect *ac, + grpc_winsocket *socket) { int done = (--ac->refs == 0); gpr_mu_unlock(&ac->mu); if (done) { - if (ac->socket != NULL) grpc_winsocket_destroy(ac->socket); gpr_mu_destroy(&ac->mu); gpr_free(ac->addr_name); gpr_free(ac); } + if (socket != NULL) grpc_winsocket_destroy(socket); } static void on_alarm(grpc_exec_ctx *exec_ctx, void *acp, bool occured) { async_connect *ac = acp; gpr_mu_lock(&ac->mu); - /* If the alarm didn't occur, it got cancelled. */ - if (ac->socket != NULL && occured) { + if (ac->socket != NULL) { grpc_winsocket_shutdown(ac->socket); } - async_connect_unlock_and_cleanup(ac); + async_connect_unlock_and_cleanup(ac, ac->socket); } static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, bool from_iocp) { async_connect *ac = acp; SOCKET sock = ac->socket->socket; grpc_endpoint **ep = ac->endpoint; + GPR_ASSERT(*ep == NULL); grpc_winsocket_callback_info *info = &ac->socket->write_info; grpc_closure *on_done = ac->on_done; + gpr_mu_lock(&ac->mu); + grpc_winsocket *socket = ac->socket; + ac->socket = NULL; + gpr_mu_unlock(&ac->mu); + grpc_timer_cancel(exec_ctx, &ac->alarm); gpr_mu_lock(&ac->mu); - if (from_iocp) { + if (from_iocp && socket != NULL) { DWORD transfered_bytes = 0; DWORD flags; BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped, @@ -107,12 +113,12 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, bool from_iocp) { ac->addr_name, utf8_message); gpr_free(utf8_message); } else { - *ep = grpc_tcp_create(ac->socket, ac->addr_name); - ac->socket = NULL; + *ep = grpc_tcp_create(socket, ac->addr_name); + socket = NULL; } } - async_connect_unlock_and_cleanup(ac); + async_connect_unlock_and_cleanup(ac, socket); /* If the connection was aborted, the callback was already called when the deadline was met. */ on_done->cb(exec_ctx, on_done->cb_arg, *ep != NULL); @@ -138,6 +144,7 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done, const char *message = NULL; char *utf8_message; grpc_winsocket_callback_info *info; + int last_error; *endpoint = NULL; @@ -208,8 +215,10 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done, return; failure: - utf8_message = gpr_format_message(WSAGetLastError()); + last_error = WSAGetLastError(); + utf8_message = gpr_format_message(last_error); gpr_log(GPR_ERROR, message, utf8_message); + gpr_log(GPR_ERROR, "last error = %d", last_error); gpr_free(utf8_message); if (socket != NULL) { grpc_winsocket_destroy(socket); diff --git a/src/core/lib/security/client_auth_filter.c b/src/core/lib/security/client_auth_filter.c index 1511659dfe..ea50c8513e 100644 --- a/src/core/lib/security/client_auth_filter.c +++ b/src/core/lib/security/client_auth_filter.c @@ -278,8 +278,8 @@ static void set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx, } /* Destructor for call_data */ -static void destroy_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { +static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + void *ignored) { call_data *calld = elem->call_data; grpc_call_credentials_unref(calld->creds); if (calld->host != NULL) { diff --git a/src/core/lib/security/server_auth_filter.c b/src/core/lib/security/server_auth_filter.c index 99a428f342..845320975d 100644 --- a/src/core/lib/security/server_auth_filter.c +++ b/src/core/lib/security/server_auth_filter.c @@ -221,8 +221,8 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, } /* Destructor for call_data */ -static void destroy_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) {} +static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + void *ignored) {} /* Constructor for channel_data */ static void init_channel_elem(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 4b5a7d7465..cd6c1585ec 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -390,8 +390,6 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, bool success) { if (c->receiving_stream != NULL) { grpc_byte_stream_destroy(exec_ctx, c->receiving_stream); } - grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c)); - GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->channel, "call"); gpr_mu_destroy(&c->mu); for (i = 0; i < STATUS_SOURCE_COUNT; i++) { if (c->status[i].details) { @@ -410,7 +408,9 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, bool success) { GRPC_CQ_INTERNAL_UNREF(c->cq, "bind"); } grpc_pops_destroy(c->pops); - gpr_free(c); + grpc_channel *channel = c->channel; + grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), c); + GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call"); GPR_TIMER_END("destroy_call", 0); } @@ -1535,3 +1535,39 @@ grpc_compression_algorithm grpc_call_compression_for_level( gpr_mu_unlock(&call->mu); return grpc_compression_algorithm_for_level(level, accepted_encodings); } + +const char *grpc_call_error_to_string(grpc_call_error error) { + switch (error) { + case GRPC_CALL_ERROR: + return "GRPC_CALL_ERROR"; + case GRPC_CALL_ERROR_ALREADY_ACCEPTED: + return "GRPC_CALL_ERROR_ALREADY_ACCEPTED"; + case GRPC_CALL_ERROR_ALREADY_FINISHED: + return "GRPC_CALL_ERROR_ALREADY_FINISHED"; + case GRPC_CALL_ERROR_ALREADY_INVOKED: + return "GRPC_CALL_ERROR_ALREADY_INVOKED"; + case GRPC_CALL_ERROR_BATCH_TOO_BIG: + return "GRPC_CALL_ERROR_BATCH_TOO_BIG"; + case GRPC_CALL_ERROR_INVALID_FLAGS: + return "GRPC_CALL_ERROR_INVALID_FLAGS"; + case GRPC_CALL_ERROR_INVALID_MESSAGE: + return "GRPC_CALL_ERROR_INVALID_MESSAGE"; + case GRPC_CALL_ERROR_INVALID_METADATA: + return "GRPC_CALL_ERROR_INVALID_METADATA"; + case GRPC_CALL_ERROR_NOT_INVOKED: + return "GRPC_CALL_ERROR_NOT_INVOKED"; + case GRPC_CALL_ERROR_NOT_ON_CLIENT: + return "GRPC_CALL_ERROR_NOT_ON_CLIENT"; + case GRPC_CALL_ERROR_NOT_ON_SERVER: + return "GRPC_CALL_ERROR_NOT_ON_SERVER"; + case GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE: + return "GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE"; + case GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH: + return "GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH"; + case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS: + return "GRPC_CALL_ERROR_TOO_MANY_OPERATIONS"; + case GRPC_CALL_OK: + return "GRPC_CALL_OK"; + } + GPR_UNREACHABLE_CODE(return "GRPC_CALL_ERROR_UNKNOW"); +} diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index 5ec8808b50..1f82c3bad2 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -227,6 +227,10 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, #endif GPR_TIMER_BEGIN("grpc_cq_end_op", 0); + GRPC_API_TRACE( + "grpc_cq_end_op(exec_ctx=%p, cc=%p, tag=%p, success=%d, done=%p, " + "done_arg=%p, storage=%p)", + 7, (exec_ctx, cc, tag, success, done, done_arg, storage)); storage->tag = tag; storage->done = done; diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c index 03f379aba8..57c6897626 100644 --- a/src/core/lib/surface/init.c +++ b/src/core/lib/surface/init.c @@ -164,10 +164,10 @@ void grpc_init(void) { grpc_register_tracer("channel_stack_builder", &grpc_trace_channel_stack_builder); grpc_register_tracer("http1", &grpc_http1_trace); + grpc_register_tracer("compression", &grpc_compress_filter_trace); grpc_security_pre_init(); grpc_iomgr_init(); grpc_executor_init(); - grpc_tracer_init("GRPC_TRACE"); gpr_timers_global_init(); grpc_cq_global_init(); for (i = 0; i < g_number_of_plugins; i++) { @@ -179,6 +179,7 @@ void grpc_init(void) { * at the appropriate time */ grpc_register_security_filters(); register_builtin_channel_init(); + grpc_tracer_init("GRPC_TRACE"); /* no more changes to channel init pipelines */ grpc_channel_init_finalize(); } diff --git a/src/core/lib/surface/lame_client.c b/src/core/lib/surface/lame_client.c index 62a816c26d..313b846266 100644 --- a/src/core/lib/surface/lame_client.c +++ b/src/core/lib/surface/lame_client.c @@ -107,8 +107,10 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx, static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_call_element_args *args) {} -static void destroy_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) {} +static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + void *and_free_memory) { + gpr_free(and_free_memory); +} static void init_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index aff01f8ae4..aafbc988ce 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -820,8 +820,8 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, server_ref(chand->server); } -static void destroy_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { +static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + void *ignored) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c index ba39693e47..fdde376a09 100644 --- a/src/core/lib/transport/transport.c +++ b/src/core/lib/transport/transport.c @@ -141,8 +141,9 @@ void grpc_transport_set_pops(grpc_exec_ctx *exec_ctx, grpc_transport *transport, void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *transport, - grpc_stream *stream) { - transport->vtable->destroy_stream(exec_ctx, transport, stream); + grpc_stream *stream, void *and_free_memory) { + transport->vtable->destroy_stream(exec_ctx, transport, stream, + and_free_memory); } char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index 0be080c9ca..6948e9de3d 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -99,6 +99,11 @@ void grpc_transport_move_stats(grpc_transport_stream_stats *from, /* Transport stream op: a set of operations to perform on a transport against a single stream */ typedef struct grpc_transport_stream_op { + /** Should be enqueued when all requested operations (excluding recv_message + and recv_initial_metadata which have their own closures) in a given batch + have been completed. */ + grpc_closure *on_complete; + /** Send initial metadata to the peer, from the provided metadata batch. idempotent_request MUST be set if this is non-null */ grpc_metadata_batch *send_initial_metadata; @@ -130,11 +135,6 @@ typedef struct grpc_transport_stream_op { /** Collect any stats into provided buffer, zero internal stat counters */ grpc_transport_stream_stats *collect_stats; - /** Should be enqueued when all requested operations (excluding recv_message - and recv_initial_metadata which have their own closures) in a given batch - have been completed. */ - grpc_closure *on_complete; - /** If != GRPC_STATUS_OK, cancel this stream */ grpc_status_code cancel_with_status; @@ -213,7 +213,7 @@ void grpc_transport_set_pops(grpc_exec_ctx *exec_ctx, grpc_transport *transport, caller, but any child memory must be cleaned up) */ void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *transport, - grpc_stream *stream); + grpc_stream *stream, void *and_free_memory); void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *op); diff --git a/src/core/lib/transport/transport_impl.h b/src/core/lib/transport/transport_impl.h index 78fac88e4f..fc7140671b 100644 --- a/src/core/lib/transport/transport_impl.h +++ b/src/core/lib/transport/transport_impl.h @@ -67,7 +67,7 @@ typedef struct grpc_transport_vtable { /* implementation of grpc_transport_destroy_stream */ void (*destroy_stream)(grpc_exec_ctx *exec_ctx, grpc_transport *self, - grpc_stream *stream); + grpc_stream *stream, void *and_free_memory); /* implementation of grpc_transport_destroy */ void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_transport *self); |