From eaddd597d75cbf05835318b8d047654eb8b37e72 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Thu, 26 Apr 2018 14:31:45 -0700 Subject: Add combiner_run to run closures immediately if we already have the combiner lock --- src/core/ext/filters/client_channel/subchannel.cc | 2 +- .../transport/chttp2/transport/chttp2_transport.cc | 14 ++++---- src/core/lib/iomgr/combiner.cc | 39 ++++++++++++++++++++-- src/core/lib/iomgr/tcp_posix.cc | 4 +-- 4 files changed, 47 insertions(+), 12 deletions(-) (limited to 'src') diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index d7815fb7e1..450e684273 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -408,7 +408,7 @@ static void on_external_state_watcher_done(void* arg, grpc_error* error) { gpr_mu_unlock(&w->subchannel->mu); GRPC_SUBCHANNEL_WEAK_UNREF(w->subchannel, "external_state_watcher"); gpr_free(w); - GRPC_CLOSURE_RUN(follow_up, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(follow_up, GRPC_ERROR_REF(error)); } static void on_alarm(void* arg, grpc_error* error) { diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 0ef73961a5..eb16c1b910 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -1665,8 +1665,8 @@ static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error) { static void send_ping_locked(grpc_chttp2_transport* t, grpc_closure* on_initiate, grpc_closure* on_ack) { if (t->closed_with_error != GRPC_ERROR_NONE) { - GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_REF(t->closed_with_error)); - GRPC_CLOSURE_SCHED(on_ack, GRPC_ERROR_REF(t->closed_with_error)); + GRPC_CLOSURE_RUN(on_initiate, GRPC_ERROR_REF(t->closed_with_error)); + GRPC_CLOSURE_RUN(on_ack, GRPC_ERROR_REF(t->closed_with_error)); return; } grpc_chttp2_ping_queue* pq = &t->ping_queue; @@ -1683,16 +1683,16 @@ static void send_ping_locked(grpc_chttp2_transport* t, */ static void send_keepalive_ping_locked(grpc_chttp2_transport* t) { if (t->closed_with_error != GRPC_ERROR_NONE) { - GRPC_CLOSURE_SCHED(&t->start_keepalive_ping_locked, - GRPC_ERROR_REF(t->closed_with_error)); - GRPC_CLOSURE_SCHED(&t->finish_keepalive_ping_locked, - GRPC_ERROR_REF(t->closed_with_error)); + GRPC_CLOSURE_RUN(&t->start_keepalive_ping_locked, + GRPC_ERROR_REF(t->closed_with_error)); + GRPC_CLOSURE_RUN(&t->finish_keepalive_ping_locked, + GRPC_ERROR_REF(t->closed_with_error)); return; } grpc_chttp2_ping_queue* pq = &t->ping_queue; if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) { /* There is a ping in flight. Add yourself to the inflight closure list. */ - GRPC_CLOSURE_SCHED(&t->start_keepalive_ping_locked, GRPC_ERROR_NONE); + GRPC_CLOSURE_RUN(&t->start_keepalive_ping_locked, GRPC_ERROR_NONE); grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT], &t->finish_keepalive_ping_locked, GRPC_ERROR_NONE); return; diff --git a/src/core/lib/iomgr/combiner.cc b/src/core/lib/iomgr/combiner.cc index 9429842eb8..c11dd65f5a 100644 --- a/src/core/lib/iomgr/combiner.cc +++ b/src/core/lib/iomgr/combiner.cc @@ -63,13 +63,15 @@ struct grpc_combiner { gpr_refcount refs; }; +static void combiner_run(grpc_closure* closure, grpc_error* error); static void combiner_exec(grpc_closure* closure, grpc_error* error); +static void combiner_finally_run(grpc_closure* closure, grpc_error* error); static void combiner_finally_exec(grpc_closure* closure, grpc_error* error); static const grpc_closure_scheduler_vtable scheduler = { - combiner_exec, combiner_exec, "combiner:immediately"}; + combiner_run, combiner_exec, "combiner:immediately"}; static const grpc_closure_scheduler_vtable finally_scheduler = { - combiner_finally_exec, combiner_finally_exec, "combiner:finally"}; + combiner_finally_run, combiner_finally_exec, "combiner:finally"}; static void offload(void* arg, grpc_error* error); @@ -343,6 +345,39 @@ static void combiner_finally_exec(grpc_closure* closure, grpc_error* error) { grpc_closure_list_append(&lock->final_list, closure, error); } +static void combiner_run(grpc_closure* closure, grpc_error* error) { +#ifndef NDEBUG + closure->scheduled = false; + grpc_combiner* lock = COMBINER_FROM_CLOSURE_SCHEDULER(closure, scheduler); + GRPC_COMBINER_TRACE(gpr_log( + GPR_DEBUG, + "Combiner:%p grpc_combiner_run closure:%p created [%s:%d] run [%s:%d]", + lock, closure, closure->file_created, closure->line_created, + closure->file_initiated, closure->line_initiated)); + GPR_ASSERT(grpc_core::ExecCtx::Get()->combiner_data()->active_combiner == + lock); +#endif + closure->cb(closure->cb_arg, error); + GRPC_ERROR_UNREF(error); +} + +static void combiner_finally_run(grpc_closure* closure, grpc_error* error) { +#ifndef NDEBUG + closure->scheduled = false; + grpc_combiner* lock = + COMBINER_FROM_CLOSURE_SCHEDULER(closure, finally_scheduler); + GRPC_COMBINER_TRACE(gpr_log( + GPR_DEBUG, + "Combiner:%p grpc_combiner_run closure:%p created [%s:%d] run [%s:%d]", + lock, closure, closure->file_created, closure->line_created, + closure->file_initiated, closure->line_initiated)); + GPR_ASSERT(grpc_core::ExecCtx::Get()->combiner_data()->active_combiner == + lock); +#endif + closure->cb(closure->cb_arg, error); + GRPC_ERROR_UNREF(error); +} + static void enqueue_finally(void* closure, grpc_error* error) { combiner_finally_exec(static_cast(closure), GRPC_ERROR_REF(error)); diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index 153be05e83..b79ffe20f1 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -366,7 +366,7 @@ static void call_read_cb(grpc_tcp* tcp, grpc_error* error) { tcp->read_cb = nullptr; tcp->incoming_buffer = nullptr; - GRPC_CLOSURE_RUN(cb, error); + GRPC_CLOSURE_SCHED(cb, error); } #define MAX_READ_IOVEC 4 @@ -629,7 +629,7 @@ static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error) { gpr_log(GPR_INFO, "write: %s", str); } - GRPC_CLOSURE_RUN(cb, error); + GRPC_CLOSURE_SCHED(cb, error); TCP_UNREF(tcp, "write"); } } -- cgit v1.2.3