From dfd3a8f7a566aaf59b676caf583d4048b8e9ab5b Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 24 Aug 2016 09:43:45 -0700 Subject: Merge combiner and exec_ctx execution better Allows exec_ctx callbacks to be called while a combiner is executing. Also allows guaranteeing direct execution of callbacks from combiners, which should allow reducing cpu burn for up/down stack interactions in the future. --- .../transport/chttp2/transport/chttp2_transport.c | 59 +++-- src/core/ext/transport/chttp2/transport/internal.h | 19 ++ .../ext/transport/chttp2/transport/stream_lists.c | 2 + src/core/lib/iomgr/combiner.c | 278 ++++++++++----------- src/core/lib/iomgr/combiner.h | 2 + src/core/lib/iomgr/exec_ctx.c | 33 ++- src/core/lib/iomgr/exec_ctx.h | 3 +- src/core/lib/profiling/timers.h | 5 + src/core/lib/transport/transport.c | 25 ++ src/core/lib/transport/transport.h | 4 + test/core/end2end/tests/filter_causes_close.c | 7 +- 11 files changed, 243 insertions(+), 194 deletions(-) diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 520fa57c35..19e988670c 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -179,33 +179,30 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, gpr_free(t); } -/*#define REFCOUNTING_DEBUG 1*/ -#ifdef REFCOUNTING_DEBUG -#define REF_TRANSPORT(t, r) ref_transport(t, r, __FILE__, __LINE__) -#define UNREF_TRANSPORT(cl, t, r) unref_transport(cl, t, r, __FILE__, __LINE__) -static void unref_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - const char *reason, const char *file, int line) { - gpr_log(GPR_DEBUG, "chttp2:unref:%p %d->%d %s [%s:%d]", t, t->refs.count, - t->refs.count - 1, reason, file, line); +#ifdef GRPC_CHTTP2_REFCOUNTING_DEBUG +void grpc_chttp2_unref_transport(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, const char *reason, + const char *file, int line) { + gpr_log(GPR_DEBUG, "chttp2:unref:%p %" PRIdPTR "->%" PRIdPTR " %s [%s:%d]", t, + t->refs.count, t->refs.count - 1, reason, file, line); if (!gpr_unref(&t->refs)) return; destruct_transport(exec_ctx, t); } -static void ref_transport(grpc_chttp2_transport *t, const char *reason, - const char *file, int line) { - gpr_log(GPR_DEBUG, "chttp2: ref:%p %d->%d %s [%s:%d]", t, t->refs.count, - t->refs.count + 1, reason, file, line); +void grpc_chttp2_ref_transport(grpc_chttp2_transport *t, const char *reason, + const char *file, int line) { + gpr_log(GPR_DEBUG, "chttp2: ref:%p %" PRIdPTR "->%" PRIdPTR " %s [%s:%d]", t, + t->refs.count, t->refs.count + 1, reason, file, line); gpr_ref(&t->refs); } #else -#define REF_TRANSPORT(t, r) ref_transport(t) -#define UNREF_TRANSPORT(cl, t, r) unref_transport(cl, t) -static void unref_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { +void grpc_chttp2_unref_transport(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t) { if (!gpr_unref(&t->refs)) return; destruct_transport(exec_ctx, t); } -static void ref_transport(grpc_chttp2_transport *t) { gpr_ref(&t->refs); } +void grpc_chttp2_ref_transport(grpc_chttp2_transport *t) { gpr_ref(&t->refs); } #endif static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, @@ -392,7 +389,7 @@ static void destroy_transport_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_chttp2_transport *t = tp; t->destroying = 1; drop_connection(exec_ctx, t, GRPC_ERROR_CREATE("Transport destroyed")); - UNREF_TRANSPORT(exec_ctx, t, "destroy"); + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "destroy"); } static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) { @@ -482,7 +479,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, gpr_slice_buffer_init(&s->writing.flow_controlled_buffer); s->global.deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); - REF_TRANSPORT(t, "stream"); + GRPC_CHTTP2_REF_TRANSPORT(t, "stream"); if (server_data) { s->global.id = (uint32_t)(uintptr_t)server_data; @@ -547,7 +544,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, GRPC_ERROR_UNREF(s->global.read_closed_error); GRPC_ERROR_UNREF(s->global.write_closed_error); - UNREF_TRANSPORT(exec_ctx, t, "stream"); + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "stream"); GPR_TIMER_END("destroy_stream", 0); @@ -632,6 +629,7 @@ static void initiate_read_flush_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_chttp2_transport *t = tp; t->executor.check_read_ops_scheduled = false; check_read_ops(exec_ctx, &t->global); + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "initiate_read_flush_locked"); } /******************************************************************************* @@ -667,7 +665,7 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, break; case GRPC_CHTTP2_WRITING_INACTIVE: set_write_state(t, GRPC_CHTTP2_WRITE_SCHEDULED, reason); - REF_TRANSPORT(t, "writing"); + GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); grpc_combiner_execute_finally(exec_ctx, t->executor.combiner, &t->initiate_writing, GRPC_ERROR_NONE, covered_by_poller); @@ -714,7 +712,7 @@ static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { "start_writing:nothing_to_write"); } end_waiting_for_write(exec_ctx, t, GRPC_ERROR_NONE); - UNREF_TRANSPORT(exec_ctx, t, "writing"); + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "writing"); } GPR_TIMER_END("start_writing", 0); } @@ -787,7 +785,7 @@ static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, void *tp, case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER: GPR_TIMER_MARK("state=writing_stale_with_poller", 0); set_write_state(t, GRPC_CHTTP2_WRITE_SCHEDULED, "terminate_writing"); - REF_TRANSPORT(t, "writing"); + GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); grpc_combiner_execute_finally(exec_ctx, t->executor.combiner, &t->initiate_writing, GRPC_ERROR_NONE, true); @@ -795,14 +793,14 @@ static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, void *tp, case GRPC_CHTTP2_WRITING_STALE_NO_POLLER: GPR_TIMER_MARK("state=writing_stale_no_poller", 0); set_write_state(t, GRPC_CHTTP2_WRITE_SCHEDULED, "terminate_writing"); - REF_TRANSPORT(t, "writing"); + GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); grpc_combiner_execute_finally(exec_ctx, t->executor.combiner, &t->initiate_writing, GRPC_ERROR_NONE, false); break; } - UNREF_TRANSPORT(exec_ctx, t, "writing"); + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "writing"); GPR_TIMER_END("terminate_writing_with_lock", 0); } @@ -1261,7 +1259,7 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL); - UNREF_TRANSPORT(exec_ctx, t, "transport_op"); + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "transport_op"); } static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, @@ -1270,7 +1268,7 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, op->transport_private.args[0] = gt; grpc_closure_init(&op->transport_private.closure, perform_transport_op_locked, op); - REF_TRANSPORT(t, "transport_op"); + GRPC_CHTTP2_REF_TRANSPORT(t, "transport_op"); grpc_combiner_execute(exec_ctx, t->executor.combiner, &op->transport_private.closure, GRPC_ERROR_NONE); } @@ -1864,7 +1862,7 @@ static void reading_action_locked(grpc_exec_ctx *exec_ctx, void *tp, } } else if (!t->closed) { keep_reading = true; - REF_TRANSPORT(t, "keep_reading"); + GRPC_CHTTP2_REF_TRANSPORT(t, "keep_reading"); prevent_endpoint_shutdown(t); } gpr_slice_buffer_reset_and_unref(&t->read_buffer); @@ -1872,9 +1870,9 @@ static void reading_action_locked(grpc_exec_ctx *exec_ctx, void *tp, if (keep_reading) { grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, &t->reading_action); allow_endpoint_shutdown_locked(exec_ctx, t); - UNREF_TRANSPORT(exec_ctx, t, "keep_reading"); + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keep_reading"); } else { - UNREF_TRANSPORT(exec_ctx, t, "reading_action"); + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "reading_action"); } GPR_TIMER_END("post_reading_action_locked", 0); @@ -2247,7 +2245,8 @@ void grpc_chttp2_transport_start_reading(grpc_exec_ctx *exec_ctx, grpc_transport *transport, gpr_slice_buffer *read_buffer) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)transport; - REF_TRANSPORT(t, "reading_action"); /* matches unref inside reading_action */ + GRPC_CHTTP2_REF_TRANSPORT( + t, "reading_action"); /* matches unref inside reading_action */ if (read_buffer != NULL) { gpr_slice_buffer_move_into(read_buffer, &t->read_buffer); gpr_free(read_buffer); diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 1b48b82f4f..761ed2dad1 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -726,6 +726,25 @@ void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream_global *stream_global); #endif +//#define GRPC_CHTTP2_REFCOUNTING_DEBUG 1 +#ifdef GRPC_CHTTP2_REFCOUNTING_DEBUG +#define GRPC_CHTTP2_REF_TRANSPORT(t, r) \ + grpc_chttp2_ref_transport(t, r, __FILE__, __LINE__) +#define GRPC_CHTTP2_UNREF_TRANSPORT(cl, t, r) \ + grpc_chttp2_unref_transport(cl, t, r, __FILE__, __LINE__) +void grpc_chttp2_unref_transport(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, const char *reason, + const char *file, int line); +void grpc_chttp2_ref_transport(grpc_chttp2_transport *t, const char *reason, + const char *file, int line); +#else +#define GRPC_CHTTP2_REF_TRANSPORT(t, r) grpc_chttp2_ref_transport(t) +#define GRPC_CHTTP2_UNREF_TRANSPORT(cl, t, r) grpc_chttp2_unref_transport(cl, t) +void grpc_chttp2_unref_transport(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t); +void grpc_chttp2_ref_transport(grpc_chttp2_transport *t); +#endif + grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, uint32_t frame_size, diff --git a/src/core/ext/transport/chttp2/transport/stream_lists.c b/src/core/ext/transport/chttp2/transport/stream_lists.c index 4ba09087f9..6d4863c4aa 100644 --- a/src/core/ext/transport/chttp2/transport/stream_lists.c +++ b/src/core/ext/transport/chttp2/transport/stream_lists.c @@ -245,6 +245,8 @@ void grpc_chttp2_list_add_check_read_ops( grpc_chttp2_stream_global *stream_global) { grpc_chttp2_transport *t = TRANSPORT_FROM_GLOBAL(transport_global); if (!t->executor.check_read_ops_scheduled) { + GRPC_CHTTP2_REF_TRANSPORT(TRANSPORT_FROM_GLOBAL(transport_global), + "initiate_read_flush_locked"); grpc_combiner_execute_finally(exec_ctx, t->executor.combiner, &t->initiate_read_flush_locked, GRPC_ERROR_NONE, false); diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c index 946cfc65fc..f1a2b29519 100644 --- a/src/core/lib/iomgr/combiner.c +++ b/src/core/lib/iomgr/combiner.c @@ -51,6 +51,7 @@ int grpc_combiner_trace = 0; } while (0) struct grpc_combiner { + grpc_combiner *next_combiner_on_this_exec_ctx; grpc_workqueue *optional_workqueue; gpr_mpscq queue; // state is: @@ -58,17 +59,23 @@ struct grpc_combiner { // other bits - number of items queued on the lock gpr_atm state; bool take_async_break_before_final_list; + bool time_to_execute_final_list; grpc_closure_list final_list; - grpc_closure continue_finishing; + grpc_closure offload; }; +static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); + grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) { grpc_combiner *lock = gpr_malloc(sizeof(*lock)); + lock->next_combiner_on_this_exec_ctx = NULL; + lock->time_to_execute_final_list = false; lock->optional_workqueue = optional_workqueue; gpr_atm_no_barrier_store(&lock->state, 1); gpr_mpscq_init(&lock->queue); lock->take_async_break_before_final_list = false; grpc_closure_list_init(&lock->final_list); + grpc_closure_init(&lock->offload, offload, lock); GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p create", lock)); return lock; } @@ -90,177 +97,154 @@ void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { } } -static bool maybe_finish_one(grpc_exec_ctx *exec_ctx, grpc_combiner *lock); -static void finish(grpc_exec_ctx *exec_ctx, grpc_combiner *lock); +static void queue_on_exec_ctx(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { + lock->next_combiner_on_this_exec_ctx = NULL; + if (exec_ctx->active_combiner == NULL) { + exec_ctx->active_combiner = exec_ctx->last_combiner = lock; + } else { + exec_ctx->last_combiner->next_combiner_on_this_exec_ctx = lock; + exec_ctx->last_combiner = lock; + } +} -static void continue_finishing_mainline(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - GPR_TIMER_BEGIN("combiner.continue_executing_mainline", 0); - grpc_combiner *lock = arg; +void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, + grpc_closure *cl, grpc_error *error) { GRPC_COMBINER_TRACE( - gpr_log(GPR_DEBUG, "C:%p continue_finishing_mainline", lock)); - GPR_ASSERT(exec_ctx->active_combiner == NULL); - exec_ctx->active_combiner = lock; - if (maybe_finish_one(exec_ctx, lock)) finish(exec_ctx, lock); - GPR_ASSERT(exec_ctx->active_combiner == lock); - exec_ctx->active_combiner = NULL; - GPR_TIMER_END("combiner.continue_executing_mainline", 0); + gpr_log(GPR_DEBUG, "C:%p grpc_combiner_execute c=%p", lock, cl)); + GPR_TIMER_BEGIN("combiner.execute", 0); + gpr_atm last = gpr_atm_full_fetch_add(&lock->state, 2); + GPR_ASSERT(last & 1); // ensure lock has not been destroyed + cl->error = error; + gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next); + if (last == 1) { + // code will be written when the exec_ctx calls + // grpc_combiner_continue_exec_ctx + queue_on_exec_ctx(exec_ctx, lock); + } + GPR_TIMER_END("combiner.execute", 0); } -static void execute_final(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { - GPR_TIMER_BEGIN("combiner.execute_final", 0); - grpc_closure *c = lock->final_list.head; - GPR_ASSERT(c != NULL); - grpc_closure_list_init(&lock->final_list); - lock->take_async_break_before_final_list = false; - int loops = 0; - while (c != NULL) { - GRPC_COMBINER_TRACE( - gpr_log(GPR_DEBUG, "C:%p execute_final[%d] c=%p", lock, loops, c)); - grpc_closure *next = c->next_data.next; - grpc_error *error = c->error; - c->cb(exec_ctx, c->cb_arg, error); - GRPC_ERROR_UNREF(error); - c = next; - loops++; +static void move_next(grpc_exec_ctx *exec_ctx) { + exec_ctx->active_combiner = + exec_ctx->active_combiner->next_combiner_on_this_exec_ctx; + if (exec_ctx->active_combiner == NULL) { + exec_ctx->last_combiner = NULL; } - GPR_TIMER_END("combiner.execute_final", 0); } -static void continue_executing_final(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - GPR_TIMER_BEGIN("combiner.continue_executing_final", 0); +static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_combiner *lock = arg; - GRPC_COMBINER_TRACE( - gpr_log(GPR_DEBUG, "C:%p continue_executing_final", lock)); - GPR_ASSERT(exec_ctx->active_combiner == NULL); - exec_ctx->active_combiner = lock; - // quick peek to see if new things have turned up on the queue: if so, go back - // to executing them before the final list - if ((gpr_atm_acq_load(&lock->state) >> 1) > 1) { - if (maybe_finish_one(exec_ctx, lock)) finish(exec_ctx, lock); - } else { - execute_final(exec_ctx, lock); - finish(exec_ctx, lock); - } - GPR_ASSERT(exec_ctx->active_combiner == lock); - exec_ctx->active_combiner = NULL; - GPR_TIMER_END("combiner.continue_executing_final", 0); + queue_on_exec_ctx(exec_ctx, lock); } -static bool start_execute_final(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { - GPR_TIMER_BEGIN("combiner.start_execute_final", 0); - GPR_ASSERT(exec_ctx->active_combiner == lock); - GRPC_COMBINER_TRACE( - gpr_log(GPR_DEBUG, - "C:%p start_execute_final take_async_break_before_final_list=%d", - lock, lock->take_async_break_before_final_list)); - if (lock->take_async_break_before_final_list) { - grpc_closure_init(&lock->continue_finishing, continue_executing_final, - lock); - grpc_exec_ctx_sched(exec_ctx, &lock->continue_finishing, GRPC_ERROR_NONE, - GRPC_WORKQUEUE_REF(lock->optional_workqueue, "sched")); - GPR_TIMER_END("combiner.start_execute_final", 0); +static void queue_offload(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { + move_next(exec_ctx); + grpc_workqueue_enqueue(exec_ctx, lock->optional_workqueue, &lock->offload, + GRPC_ERROR_NONE); +} + +bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { + GPR_TIMER_BEGIN("combiner.continue_exec_ctx", 0); + grpc_combiner *lock = exec_ctx->active_combiner; + if (lock == NULL) { + GPR_TIMER_END("combiner.continue_exec_ctx", 0); return false; - } else { - execute_final(exec_ctx, lock); - GPR_TIMER_END("combiner.start_execute_final", 0); - return true; } -} -static bool maybe_finish_one(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { - GPR_TIMER_BEGIN("combiner.maybe_finish_one", 0); - GPR_ASSERT(exec_ctx->active_combiner == lock); if (lock->optional_workqueue != NULL && grpc_exec_ctx_ready_to_finish(exec_ctx)) { + GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0); // this execution context wants to move on, and we have a workqueue (and so // can help the execution context out): schedule remaining work to be picked // up on the workqueue - grpc_closure_init(&lock->continue_finishing, continue_finishing_mainline, - lock); - grpc_workqueue_enqueue(exec_ctx, lock->optional_workqueue, - &lock->continue_finishing, GRPC_ERROR_NONE); - GPR_TIMER_END("combiner.maybe_finish_one", 0); - return false; - } - gpr_mpscq_node *n = gpr_mpscq_pop(&lock->queue); - GRPC_COMBINER_TRACE( - gpr_log(GPR_DEBUG, "C:%p maybe_finish_one n=%p", lock, n)); - if (n == NULL) { - // queue is in an inconsistant state: use this as a cue that we should - // go off and do something else for a while (and come back later) - grpc_closure_init(&lock->continue_finishing, continue_finishing_mainline, - lock); - grpc_exec_ctx_sched(exec_ctx, &lock->continue_finishing, GRPC_ERROR_NONE, - GRPC_WORKQUEUE_REF(lock->optional_workqueue, "sched")); - GPR_TIMER_END("combiner.maybe_finish_one", 0); - return false; + queue_offload(exec_ctx, lock); + GPR_TIMER_END("combiner.continue_exec_ctx", 0); + return true; } - grpc_closure *cl = (grpc_closure *)n; - grpc_error *error = cl->error; - cl->cb(exec_ctx, cl->cb_arg, error); - GRPC_ERROR_UNREF(error); - GPR_TIMER_END("combiner.maybe_finish_one", 0); - return true; -} -static void finish(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { - bool (*executor)(grpc_exec_ctx * exec_ctx, grpc_combiner * lock); - GPR_TIMER_BEGIN("combiner.finish", 0); - int loops = 0; - do { - executor = maybe_finish_one; - gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -2); - GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, - "C:%p finish[%d] old_state=%" PRIdPTR, lock, - loops, old_state)); - switch (old_state) { - case 5: // we're down to one queued item: if it's the final list we - case 4: // should do that - if (!grpc_closure_list_empty(lock->final_list)) { - executor = start_execute_final; - } - break; - case 3: // had one count, one unorphaned --> unlocked unorphaned - GPR_TIMER_END("combiner.finish", 0); - return; - case 2: // and one count, one orphaned --> unlocked and orphaned - really_destroy(exec_ctx, lock); - GPR_TIMER_END("combiner.finish", 0); - return; - case 1: - case 0: - // these values are illegal - representing an already unlocked or - // deleted lock - GPR_UNREACHABLE_CODE(return ); + if (!lock->time_to_execute_final_list || + // peek to see if something new has shown up, and execute that with + // priority + (gpr_atm_acq_load(&lock->state) >> 1) > 1) { + gpr_mpscq_node *n = gpr_mpscq_pop(&lock->queue); + GRPC_COMBINER_TRACE( + gpr_log(GPR_DEBUG, "C:%p maybe_finish_one n=%p", lock, n)); + if (n == NULL) { + // queue is in an inconsistant state: use this as a cue that we should + // go off and do something else for a while (and come back later) + GPR_TIMER_MARK("delay_busy", 0); + if (lock->optional_workqueue != NULL) { + queue_offload(exec_ctx, lock); + } + GPR_TIMER_END("combiner.continue_exec_ctx", 0); + return true; } - loops++; - } while (executor(exec_ctx, lock)); - GPR_TIMER_END("combiner.finish", 0); -} - -void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, - grpc_closure *cl, grpc_error *error) { - GRPC_COMBINER_TRACE( - gpr_log(GPR_DEBUG, "C:%p grpc_combiner_execute c=%p", lock, cl)); - GPR_TIMER_BEGIN("combiner.execute", 0); - gpr_atm last = gpr_atm_full_fetch_add(&lock->state, 2); - GPR_ASSERT(last & 1); // ensure lock has not been destroyed - if (last == 1) { - exec_ctx->active_combiner = lock; - GPR_TIMER_BEGIN("combiner.execute_first_cb", 0); + GPR_TIMER_BEGIN("combiner.exec1", 0); + grpc_closure *cl = (grpc_closure *)n; + grpc_error *error = cl->error; cl->cb(exec_ctx, cl->cb_arg, error); - GPR_TIMER_END("combiner.execute_first_cb", 0); GRPC_ERROR_UNREF(error); - finish(exec_ctx, lock); - GPR_ASSERT(exec_ctx->active_combiner == lock); - exec_ctx->active_combiner = NULL; + GPR_TIMER_END("combiner.exec1", 0); } else { - cl->error = error; - gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next); + if (lock->take_async_break_before_final_list) { + GPR_TIMER_MARK("async_break", 0); + GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p take async break", lock)); + lock->take_async_break_before_final_list = false; + if (lock->optional_workqueue != NULL) { + queue_offload(exec_ctx, lock); + } + GPR_TIMER_END("combiner.continue_exec_ctx", 0); + return true; + } else { + grpc_closure *c = lock->final_list.head; + GPR_ASSERT(c != NULL); + grpc_closure_list_init(&lock->final_list); + lock->take_async_break_before_final_list = false; + int loops = 0; + while (c != NULL) { + GPR_TIMER_BEGIN("combiner.exec_1final", 0); + GRPC_COMBINER_TRACE( + gpr_log(GPR_DEBUG, "C:%p execute_final[%d] c=%p", lock, loops, c)); + grpc_closure *next = c->next_data.next; + grpc_error *error = c->error; + c->cb(exec_ctx, c->cb_arg, error); + GRPC_ERROR_UNREF(error); + c = next; + GPR_TIMER_END("combiner.exec_1final", 0); + } + } } - GPR_TIMER_END("combiner.execute", 0); + + GPR_TIMER_MARK("unref", 0); + gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -2); + GRPC_COMBINER_TRACE( + gpr_log(GPR_DEBUG, "C:%p finish old_state=%" PRIdPTR, lock, old_state)); + lock->time_to_execute_final_list = false; + switch (old_state) { + case 5: // we're down to one queued item: if it's the final list we + case 4: // should do that + if (!grpc_closure_list_empty(lock->final_list)) { + lock->time_to_execute_final_list = true; + } + break; + case 3: // had one count, one unorphaned --> unlocked unorphaned + move_next(exec_ctx); + GPR_TIMER_END("combiner.continue_exec_ctx", 0); + return true; + case 2: // and one count, one orphaned --> unlocked and orphaned + move_next(exec_ctx); + really_destroy(exec_ctx, lock); + GPR_TIMER_END("combiner.continue_exec_ctx", 0); + return true; + case 1: + case 0: + // these values are illegal - representing an already unlocked or + // deleted lock + GPR_TIMER_END("combiner.continue_exec_ctx", 0); + GPR_UNREACHABLE_CODE(return true); + } + GPR_TIMER_END("combiner.continue_exec_ctx", 0); + return true; } static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure, diff --git a/src/core/lib/iomgr/combiner.h b/src/core/lib/iomgr/combiner.h index 3eb9f34638..28f548b2f5 100644 --- a/src/core/lib/iomgr/combiner.h +++ b/src/core/lib/iomgr/combiner.h @@ -64,6 +64,8 @@ void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, bool hint_async_break); void grpc_combiner_force_async_finally(grpc_combiner *lock); +bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx); + extern int grpc_combiner_trace; #endif /* GRPC_CORE_LIB_IOMGR_COMBINER_H */ diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c index 12e51ac092..747b462a6e 100644 --- a/src/core/lib/iomgr/exec_ctx.c +++ b/src/core/lib/iomgr/exec_ctx.c @@ -37,6 +37,7 @@ #include #include +#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/profiling/timers.h" @@ -60,20 +61,28 @@ bool grpc_always_ready_to_finish(grpc_exec_ctx *exec_ctx, void *arg_ignored) { bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) { bool did_something = 0; GPR_TIMER_BEGIN("grpc_exec_ctx_flush", 0); - while (!grpc_closure_list_empty(exec_ctx->closure_list)) { - grpc_closure *c = exec_ctx->closure_list.head; - exec_ctx->closure_list.head = exec_ctx->closure_list.tail = NULL; - while (c != NULL) { - grpc_closure *next = c->next_data.next; - grpc_error *error = c->error; - did_something = true; - GPR_TIMER_BEGIN("grpc_exec_ctx_flush.cb", 0); - c->cb(exec_ctx, c->cb_arg, error); - GRPC_ERROR_UNREF(error); - GPR_TIMER_END("grpc_exec_ctx_flush.cb", 0); - c = next; + for (;;) { + if (!grpc_closure_list_empty(exec_ctx->closure_list)) { + grpc_closure *c = exec_ctx->closure_list.head; + exec_ctx->closure_list.head = exec_ctx->closure_list.tail = NULL; + while (c != NULL) { + grpc_closure *next = c->next_data.next; + grpc_error *error = c->error; + did_something = true; + GPR_TIMER_BEGIN("grpc_exec_ctx_flush.cb", 0); + c->cb(exec_ctx, c->cb_arg, error); + GRPC_ERROR_UNREF(error); + GPR_TIMER_END("grpc_exec_ctx_flush.cb", 0); + c = next; + } + continue; + } + if (grpc_combiner_continue_exec_ctx(exec_ctx)) { + continue; } + break; } + GPR_ASSERT(exec_ctx->active_combiner == NULL); if (exec_ctx->stealing_from_workqueue != NULL) { if (grpc_exec_ctx_ready_to_finish(exec_ctx)) { grpc_workqueue_enqueue(exec_ctx, exec_ctx->stealing_from_workqueue, diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h index ac4674bbac..91029f5fba 100644 --- a/src/core/lib/iomgr/exec_ctx.h +++ b/src/core/lib/iomgr/exec_ctx.h @@ -70,6 +70,7 @@ struct grpc_exec_ctx { grpc_closure *stolen_closure; /** currently active combiner: updated only via combiner.c */ grpc_combiner *active_combiner; + grpc_combiner *last_combiner; bool cached_ready_to_finish; void *check_ready_to_finish_arg; bool (*check_ready_to_finish)(grpc_exec_ctx *exec_ctx, void *arg); @@ -79,7 +80,7 @@ struct grpc_exec_ctx { prefer to use GRPC_EXEC_CTX_INIT whenever possible */ #define GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(finish_check, finish_check_arg) \ { \ - GRPC_CLOSURE_LIST_INIT, NULL, NULL, NULL, false, finish_check_arg, \ + GRPC_CLOSURE_LIST_INIT, NULL, NULL, NULL, NULL, false, finish_check_arg, \ finish_check \ } #else diff --git a/src/core/lib/profiling/timers.h b/src/core/lib/profiling/timers.h index 621cdbf656..ea0cbca977 100644 --- a/src/core/lib/profiling/timers.h +++ b/src/core/lib/profiling/timers.h @@ -34,6 +34,8 @@ #ifndef GRPC_CORE_LIB_PROFILING_TIMERS_H #define GRPC_CORE_LIB_PROFILING_TIMERS_H +#include + #ifdef __cplusplus extern "C" { #endif @@ -56,14 +58,17 @@ void gpr_timer_set_enabled(int enabled); /* No profiling. No-op all the things. */ #define GPR_TIMER_MARK(tag, important) \ do { \ + /*printf("- %s\n", tag);*/ \ } while (0) #define GPR_TIMER_BEGIN(tag, important) \ do { \ + /*printf("%s {\n", tag);*/ \ } while (0) #define GPR_TIMER_END(tag, important) \ do { \ + /*printf("} // %s\n", tag);*/ \ } while (0) #else /* at least one profiler requested... */ diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c index a78ad4349a..b951218130 100644 --- a/src/core/lib/transport/transport.c +++ b/src/core/lib/transport/transport.c @@ -276,3 +276,28 @@ grpc_transport_op *grpc_make_transport_op(grpc_closure *on_complete) { op->op.on_consumed = &op->outer_on_complete; return &op->op; } + +typedef struct { + grpc_closure outer_on_complete; + grpc_closure *inner_on_complete; + grpc_transport_stream_op op; +} made_transport_stream_op; + +static void destroy_made_transport_stream_op(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + made_transport_stream_op *op = arg; + grpc_exec_ctx_sched(exec_ctx, op->inner_on_complete, GRPC_ERROR_REF(error), + NULL); + gpr_free(op); +} + +grpc_transport_stream_op *grpc_make_transport_stream_op( + grpc_closure *on_complete) { + made_transport_stream_op *op = gpr_malloc(sizeof(*op)); + grpc_closure_init(&op->outer_on_complete, destroy_made_transport_stream_op, + op); + op->inner_on_complete = on_complete; + memset(&op->op, 0, sizeof(op->op)); + op->op.on_complete = &op->outer_on_complete; + return &op->op; +} diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index d0d0c2a461..2c1cc3ee42 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -292,6 +292,10 @@ char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx, /* Allocate a grpc_transport_op, and preconfigure the on_consumed closure to \a on_consumed and then delete the returned transport op */ grpc_transport_op *grpc_make_transport_op(grpc_closure *on_consumed); +/* Allocate a grpc_transport_stream_op, and preconfigure the on_consumed closure + to \a on_consumed and then delete the returned transport op */ +grpc_transport_stream_op *grpc_make_transport_stream_op( + grpc_closure *on_consumed); #ifdef __cplusplus } diff --git a/test/core/end2end/tests/filter_causes_close.c b/test/core/end2end/tests/filter_causes_close.c index c6c36d668b..ef1a9c4edb 100644 --- a/test/core/end2end/tests/filter_causes_close.c +++ b/test/core/end2end/tests/filter_causes_close.c @@ -211,11 +211,10 @@ static void recv_im_ready(grpc_exec_ctx *exec_ctx, void *arg, // close the stream with an error. gpr_slice message = gpr_slice_from_copied_string("Failure that's not preventable."); - grpc_transport_stream_op op; - memset(&op, 0, sizeof(op)); - grpc_transport_stream_op_add_close(&op, GRPC_STATUS_PERMISSION_DENIED, + grpc_transport_stream_op *op = grpc_make_transport_stream_op(NULL); + grpc_transport_stream_op_add_close(op, GRPC_STATUS_PERMISSION_DENIED, &message); - grpc_call_next_op(exec_ctx, elem, &op); + grpc_call_next_op(exec_ctx, elem, op); } grpc_exec_ctx_sched( exec_ctx, calld->recv_im_ready, -- cgit v1.2.3