From 8ae0ff0af2dc27ee82dd0e7174d4ad3112e6cf26 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Fri, 8 Jun 2018 08:37:23 -0700 Subject: Move recv_trailing_metadata into its own callback. Don't use on_complete for recv_ops. --- src/core/lib/iomgr/call_combiner.h | 80 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 80 insertions(+) (limited to 'src/core/lib/iomgr/call_combiner.h') diff --git a/src/core/lib/iomgr/call_combiner.h b/src/core/lib/iomgr/call_combiner.h index 0ccd08ea57..f9ce29f231 100644 --- a/src/core/lib/iomgr/call_combiner.h +++ b/src/core/lib/iomgr/call_combiner.h @@ -26,6 +26,7 @@ #include #include "src/core/lib/gpr/mpscq.h" +#include "src/core/lib/gprpp/inlined_vector.h" #include "src/core/lib/iomgr/closure.h" // A simple, lock-free mechanism for serializing activity related to a @@ -109,4 +110,83 @@ void grpc_call_combiner_set_notify_on_cancel(grpc_call_combiner* call_combiner, void grpc_call_combiner_cancel(grpc_call_combiner* call_combiner, grpc_error* error); +namespace grpc_core { + +// Helper for running a list of closures in a call combiner. +// +// Each callback running in the call combiner will eventually be +// returned to the surface, at which point the surface will yield the +// call combiner. So when we are running in the call combiner and have +// more than one callback to return to the surface, we need to re-enter +// the call combiner for all but one of those callbacks. +class CallCombinerClosureList { + public: + CallCombinerClosureList() {} + + // Adds a closure to the list. The closure must eventually result in + // the call combiner being yielded. + void Add(grpc_closure* closure, grpc_error* error, const char* reason) { + closures_.emplace_back(closure, error, reason); + } + + // Runs all closures in the call combiner and yields the call combiner. + // + // All but one of the closures in the list will be scheduled via + // GRPC_CALL_COMBINER_START(), and the remaining closure will be + // scheduled via GRPC_CLOSURE_SCHED(), which will eventually result in + // yielding the call combiner. If the list is empty, then the call + // combiner will be yielded immediately. + void RunClosures(grpc_call_combiner* call_combiner) { + for (size_t i = 1; i < closures_.size(); ++i) { + auto& closure = closures_[i]; + GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error, + closure.reason); + } + if (closures_.size() > 0) { + if (grpc_call_combiner_trace.enabled()) { + gpr_log(GPR_INFO, + "CallCombinerClosureList executing closure while already " + "holding call_combiner %p: closure=%p error=%s reason=%s", + call_combiner, closures_[0].closure, + grpc_error_string(closures_[0].error), closures_[0].reason); + } + // This will release the call combiner. + GRPC_CLOSURE_SCHED(closures_[0].closure, closures_[0].error); + } else { + GRPC_CALL_COMBINER_STOP(call_combiner, "no closures to schedule"); + } + closures_.clear(); + } + + // Runs all closures in the call combiner, but does NOT yield the call + // combiner. All closures will be scheduled via GRPC_CALL_COMBINER_START(). + void RunClosuresWithoutYielding(grpc_call_combiner* call_combiner) { + for (size_t i = 0; i < closures_.size(); ++i) { + auto& closure = closures_[i]; + GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error, + closure.reason); + } + closures_.clear(); + } + + size_t size() const { return closures_.size(); } + + private: + struct CallCombinerClosure { + grpc_closure* closure; + grpc_error* error; + const char* reason; + + CallCombinerClosure(grpc_closure* closure, grpc_error* error, + const char* reason) + : closure(closure), error(error), reason(reason) {} + }; + + // There are generally a maximum of 6 closures to run in the call + // combiner, one for each pending op. + InlinedVector closures_; +}; + +} // namespace grpc_core + #endif /* GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H */ -- cgit v1.2.3