diff options
-rw-r--r-- | src/core/lib/iomgr/combiner.c | 23 | ||||
-rwxr-xr-x | tools/run_tests/python_utils/jobset.py | 62 |
2 files changed, 53 insertions, 32 deletions
diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c index 38eace12c7..b77a68eead 100644 --- a/src/core/lib/iomgr/combiner.c +++ b/src/core/lib/iomgr/combiner.c @@ -59,6 +59,11 @@ struct grpc_combiner { grpc_closure_scheduler scheduler; grpc_closure_scheduler finally_scheduler; gpr_mpscq queue; + // either: + // a pointer to the initiating exec ctx if that is the only exec_ctx that has + // ever queued to this combiner, or NULL. If this is non-null, it's not + // dereferencable (since the initiating exec_ctx may have gone out of scope) + gpr_atm initiating_exec_ctx_or_null; // state is: // lower bit - zero if orphaned (STATE_UNORPHANED) // other bits - number of items queued on the lock (STATE_ELEM_COUNT_LOW_BIT) @@ -168,15 +173,25 @@ static void combiner_exec(grpc_exec_ctx *exec_ctx, grpc_closure *cl, GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p grpc_combiner_execute c=%p last=%" PRIdPTR, lock, cl, last)); - GPR_ASSERT(last & STATE_UNORPHANED); // ensure lock has not been destroyed - assert(cl->cb); - cl->error_data.error = error; - gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next); if (last == 1) { + gpr_atm_no_barrier_store(&lock->initiating_exec_ctx_or_null, + (gpr_atm)exec_ctx); // first element on this list: add it to the list of combiner locks // executing within this exec_ctx push_last_on_exec_ctx(exec_ctx, lock); + } else { + // there may be a race with setting here: if that happens, we may delay + // offload for one or two actions, and that's fine + gpr_atm initiator = + gpr_atm_no_barrier_load(&lock->initiating_exec_ctx_or_null); + if (initiator != 0 && initiator != (gpr_atm)exec_ctx) { + gpr_atm_no_barrier_store(&lock->initiating_exec_ctx_or_null, 0); + } } + GPR_ASSERT(last & STATE_UNORPHANED); // ensure lock has not been destroyed + assert(cl->cb); + cl->error_data.error = error; + gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next); GPR_TIMER_END("combiner.execute", 0); } diff --git a/tools/run_tests/python_utils/jobset.py b/tools/run_tests/python_utils/jobset.py index 3754035308..3ff773300f 100755 --- a/tools/run_tests/python_utils/jobset.py +++ b/tools/run_tests/python_utils/jobset.py @@ -41,6 +41,7 @@ import subprocess import sys import tempfile import time +import errno # cpu cost measurement @@ -132,29 +133,44 @@ _TAG_COLOR = { _FORMAT = '%(asctime)-15s %(message)s' logging.basicConfig(level=logging.INFO, format=_FORMAT) + +def eintr_be_gone(fn): + """Run fn until it doesn't stop because of EINTR""" + while True: + try: + return fn() + except IOError, e: + if e.errno != errno.EINTR: + raise + + + def message(tag, msg, explanatory_text=None, do_newline=False): if message.old_tag == tag and message.old_msg == msg and not explanatory_text: return message.old_tag = tag message.old_msg = msg - try: - if platform_string() == 'windows' or not sys.stdout.isatty(): - if explanatory_text: - logging.info(explanatory_text) - logging.info('%s: %s', tag, msg) - else: - sys.stdout.write('%s%s%s\x1b[%d;%dm%s\x1b[0m: %s%s' % ( - _BEGINNING_OF_LINE, - _CLEAR_LINE, - '\n%s' % explanatory_text if explanatory_text is not None else '', - _COLORS[_TAG_COLOR[tag]][1], - _COLORS[_TAG_COLOR[tag]][0], - tag, - msg, - '\n' if do_newline or explanatory_text is not None else '')) - sys.stdout.flush() - except: - pass + while True: + try: + if platform_string() == 'windows' or not sys.stdout.isatty(): + if explanatory_text: + logging.info(explanatory_text) + logging.info('%s: %s', tag, msg) + else: + sys.stdout.write('%s%s%s\x1b[%d;%dm%s\x1b[0m: %s%s' % ( + _BEGINNING_OF_LINE, + _CLEAR_LINE, + '\n%s' % explanatory_text if explanatory_text is not None else '', + _COLORS[_TAG_COLOR[tag]][1], + _COLORS[_TAG_COLOR[tag]][0], + tag, + msg, + '\n' if do_newline or explanatory_text is not None else '')) + sys.stdout.flush() + return + except IOError, e: + if e.errno != errno.EINTR: + raise message.old_tag = '' message.old_msg = '' @@ -226,16 +242,6 @@ class JobResult(object): self.cpu_measured = 0 -def eintr_be_gone(fn): - """Run fn until it doesn't stop because of EINTR""" - while True: - try: - return fn() - except IOError, e: - if e.errno != errno.EINTR: - raise - - def read_from_start(f): f.seek(0) return f.read() |