aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/lib/iomgr/combiner.c23
-rwxr-xr-xtools/run_tests/python_utils/jobset.py62
2 files changed, 53 insertions, 32 deletions
diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c
index 38eace12c7..b77a68eead 100644
--- a/src/core/lib/iomgr/combiner.c
+++ b/src/core/lib/iomgr/combiner.c
@@ -59,6 +59,11 @@ struct grpc_combiner {
grpc_closure_scheduler scheduler;
grpc_closure_scheduler finally_scheduler;
gpr_mpscq queue;
+ // either:
+ // a pointer to the initiating exec ctx if that is the only exec_ctx that has
+ // ever queued to this combiner, or NULL. If this is non-null, it's not
+ // dereferencable (since the initiating exec_ctx may have gone out of scope)
+ gpr_atm initiating_exec_ctx_or_null;
// state is:
// lower bit - zero if orphaned (STATE_UNORPHANED)
// other bits - number of items queued on the lock (STATE_ELEM_COUNT_LOW_BIT)
@@ -168,15 +173,25 @@ static void combiner_exec(grpc_exec_ctx *exec_ctx, grpc_closure *cl,
GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG,
"C:%p grpc_combiner_execute c=%p last=%" PRIdPTR,
lock, cl, last));
- GPR_ASSERT(last & STATE_UNORPHANED); // ensure lock has not been destroyed
- assert(cl->cb);
- cl->error_data.error = error;
- gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next);
if (last == 1) {
+ gpr_atm_no_barrier_store(&lock->initiating_exec_ctx_or_null,
+ (gpr_atm)exec_ctx);
// first element on this list: add it to the list of combiner locks
// executing within this exec_ctx
push_last_on_exec_ctx(exec_ctx, lock);
+ } else {
+ // there may be a race with setting here: if that happens, we may delay
+ // offload for one or two actions, and that's fine
+ gpr_atm initiator =
+ gpr_atm_no_barrier_load(&lock->initiating_exec_ctx_or_null);
+ if (initiator != 0 && initiator != (gpr_atm)exec_ctx) {
+ gpr_atm_no_barrier_store(&lock->initiating_exec_ctx_or_null, 0);
+ }
}
+ GPR_ASSERT(last & STATE_UNORPHANED); // ensure lock has not been destroyed
+ assert(cl->cb);
+ cl->error_data.error = error;
+ gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next);
GPR_TIMER_END("combiner.execute", 0);
}
diff --git a/tools/run_tests/python_utils/jobset.py b/tools/run_tests/python_utils/jobset.py
index 3754035308..3ff773300f 100755
--- a/tools/run_tests/python_utils/jobset.py
+++ b/tools/run_tests/python_utils/jobset.py
@@ -41,6 +41,7 @@ import subprocess
import sys
import tempfile
import time
+import errno
# cpu cost measurement
@@ -132,29 +133,44 @@ _TAG_COLOR = {
_FORMAT = '%(asctime)-15s %(message)s'
logging.basicConfig(level=logging.INFO, format=_FORMAT)
+
+def eintr_be_gone(fn):
+ """Run fn until it doesn't stop because of EINTR"""
+ while True:
+ try:
+ return fn()
+ except IOError, e:
+ if e.errno != errno.EINTR:
+ raise
+
+
+
def message(tag, msg, explanatory_text=None, do_newline=False):
if message.old_tag == tag and message.old_msg == msg and not explanatory_text:
return
message.old_tag = tag
message.old_msg = msg
- try:
- if platform_string() == 'windows' or not sys.stdout.isatty():
- if explanatory_text:
- logging.info(explanatory_text)
- logging.info('%s: %s', tag, msg)
- else:
- sys.stdout.write('%s%s%s\x1b[%d;%dm%s\x1b[0m: %s%s' % (
- _BEGINNING_OF_LINE,
- _CLEAR_LINE,
- '\n%s' % explanatory_text if explanatory_text is not None else '',
- _COLORS[_TAG_COLOR[tag]][1],
- _COLORS[_TAG_COLOR[tag]][0],
- tag,
- msg,
- '\n' if do_newline or explanatory_text is not None else ''))
- sys.stdout.flush()
- except:
- pass
+ while True:
+ try:
+ if platform_string() == 'windows' or not sys.stdout.isatty():
+ if explanatory_text:
+ logging.info(explanatory_text)
+ logging.info('%s: %s', tag, msg)
+ else:
+ sys.stdout.write('%s%s%s\x1b[%d;%dm%s\x1b[0m: %s%s' % (
+ _BEGINNING_OF_LINE,
+ _CLEAR_LINE,
+ '\n%s' % explanatory_text if explanatory_text is not None else '',
+ _COLORS[_TAG_COLOR[tag]][1],
+ _COLORS[_TAG_COLOR[tag]][0],
+ tag,
+ msg,
+ '\n' if do_newline or explanatory_text is not None else ''))
+ sys.stdout.flush()
+ return
+ except IOError, e:
+ if e.errno != errno.EINTR:
+ raise
message.old_tag = ''
message.old_msg = ''
@@ -226,16 +242,6 @@ class JobResult(object):
self.cpu_measured = 0
-def eintr_be_gone(fn):
- """Run fn until it doesn't stop because of EINTR"""
- while True:
- try:
- return fn()
- except IOError, e:
- if e.errno != errno.EINTR:
- raise
-
-
def read_from_start(f):
f.seek(0)
return f.read()