aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio_tests/tests/_runner.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/grpcio_tests/tests/_runner.py')
-rw-r--r--src/python/grpcio_tests/tests/_runner.py273
1 files changed, 137 insertions, 136 deletions
diff --git a/src/python/grpcio_tests/tests/_runner.py b/src/python/grpcio_tests/tests/_runner.py
index 926dcbe23a..59964b271c 100644
--- a/src/python/grpcio_tests/tests/_runner.py
+++ b/src/python/grpcio_tests/tests/_runner.py
@@ -49,7 +49,7 @@ from tests import _result
class CaptureFile(object):
- """A context-managed file to redirect output to a byte array.
+ """A context-managed file to redirect output to a byte array.
Use by invoking `start` (`__enter__`) and at some point invoking `stop`
(`__exit__`). At any point after the initial call to `start` call `output` to
@@ -66,57 +66,56 @@ class CaptureFile(object):
Only non-None when self is started.
"""
- def __init__(self, fd):
- self._redirected_fd = fd
- self._saved_fd = os.dup(self._redirected_fd)
- self._into_file = None
+ def __init__(self, fd):
+ self._redirected_fd = fd
+ self._saved_fd = os.dup(self._redirected_fd)
+ self._into_file = None
- def output(self):
- """Get all output from the redirected-to file if it exists."""
- if self._into_file:
- self._into_file.seek(0)
- return bytes(self._into_file.read())
- else:
- return bytes()
+ def output(self):
+ """Get all output from the redirected-to file if it exists."""
+ if self._into_file:
+ self._into_file.seek(0)
+ return bytes(self._into_file.read())
+ else:
+ return bytes()
- def start(self):
- """Start redirection of writes to the file descriptor."""
- self._into_file = tempfile.TemporaryFile()
- os.dup2(self._into_file.fileno(), self._redirected_fd)
+ def start(self):
+ """Start redirection of writes to the file descriptor."""
+ self._into_file = tempfile.TemporaryFile()
+ os.dup2(self._into_file.fileno(), self._redirected_fd)
- def stop(self):
- """Stop redirection of writes to the file descriptor."""
- # n.b. this dup2 call auto-closes self._redirected_fd
- os.dup2(self._saved_fd, self._redirected_fd)
+ def stop(self):
+ """Stop redirection of writes to the file descriptor."""
+ # n.b. this dup2 call auto-closes self._redirected_fd
+ os.dup2(self._saved_fd, self._redirected_fd)
- def write_bypass(self, value):
- """Bypass the redirection and write directly to the original file.
+ def write_bypass(self, value):
+ """Bypass the redirection and write directly to the original file.
Arguments:
value (str): What to write to the original file.
"""
- if six.PY3 and not isinstance(value, six.binary_type):
- value = bytes(value, 'ascii')
- if self._saved_fd is None:
- os.write(self._redirect_fd, value)
- else:
- os.write(self._saved_fd, value)
+ if six.PY3 and not isinstance(value, six.binary_type):
+ value = bytes(value, 'ascii')
+ if self._saved_fd is None:
+ os.write(self._redirect_fd, value)
+ else:
+ os.write(self._saved_fd, value)
- def __enter__(self):
- self.start()
- return self
+ def __enter__(self):
+ self.start()
+ return self
- def __exit__(self, type, value, traceback):
- self.stop()
+ def __exit__(self, type, value, traceback):
+ self.stop()
- def close(self):
- """Close any resources used by self not closed by stop()."""
- os.close(self._saved_fd)
+ def close(self):
+ """Close any resources used by self not closed by stop()."""
+ os.close(self._saved_fd)
-class AugmentedCase(collections.namedtuple('AugmentedCase', [
- 'case', 'id'])):
- """A test case with a guaranteed unique externally specified identifier.
+class AugmentedCase(collections.namedtuple('AugmentedCase', ['case', 'id'])):
+ """A test case with a guaranteed unique externally specified identifier.
Attributes:
case (unittest.TestCase): TestCase we're decorating with an additional
@@ -125,105 +124,107 @@ class AugmentedCase(collections.namedtuple('AugmentedCase', [
purposes.
"""
- def __new__(cls, case, id=None):
- if id is None:
- id = uuid.uuid4()
- return super(cls, AugmentedCase).__new__(cls, case, id)
+ def __new__(cls, case, id=None):
+ if id is None:
+ id = uuid.uuid4()
+ return super(cls, AugmentedCase).__new__(cls, case, id)
class Runner(object):
- def run(self, suite):
- """See setuptools' test_runner setup argument for information."""
- # only run test cases with id starting with given prefix
- testcase_filter = os.getenv('GRPC_PYTHON_TESTRUNNER_FILTER')
- filtered_cases = []
- for case in _loader.iterate_suite_cases(suite):
- if not testcase_filter or case.id().startswith(testcase_filter):
- filtered_cases.append(case)
-
- # Ensure that every test case has no collision with any other test case in
- # the augmented results.
- augmented_cases = [AugmentedCase(case, uuid.uuid4())
- for case in filtered_cases]
- case_id_by_case = dict((augmented_case.case, augmented_case.id)
- for augmented_case in augmented_cases)
- result_out = moves.cStringIO()
- result = _result.TerminalResult(
- result_out, id_map=lambda case: case_id_by_case[case])
- stdout_pipe = CaptureFile(sys.stdout.fileno())
- stderr_pipe = CaptureFile(sys.stderr.fileno())
- kill_flag = [False]
-
- def sigint_handler(signal_number, frame):
- if signal_number == signal.SIGINT:
- kill_flag[0] = True # Python 2.7 not having 'local'... :-(
- signal.signal(signal_number, signal.SIG_DFL)
-
- def fault_handler(signal_number, frame):
- stdout_pipe.write_bypass(
- 'Received fault signal {}\nstdout:\n{}\n\nstderr:{}\n'
- .format(signal_number, stdout_pipe.output(),
- stderr_pipe.output()))
- os._exit(1)
-
- def check_kill_self():
- if kill_flag[0]:
- stdout_pipe.write_bypass('Stopping tests short...')
- result.stopTestRun()
- stdout_pipe.write_bypass(result_out.getvalue())
- stdout_pipe.write_bypass(
- '\ninterrupted stdout:\n{}\n'.format(stdout_pipe.output().decode()))
- stderr_pipe.write_bypass(
- '\ninterrupted stderr:\n{}\n'.format(stderr_pipe.output().decode()))
- os._exit(1)
- def try_set_handler(name, handler):
- try:
- signal.signal(getattr(signal, name), handler)
- except AttributeError:
- pass
- try_set_handler('SIGINT', sigint_handler)
- try_set_handler('SIGSEGV', fault_handler)
- try_set_handler('SIGBUS', fault_handler)
- try_set_handler('SIGABRT', fault_handler)
- try_set_handler('SIGFPE', fault_handler)
- try_set_handler('SIGILL', fault_handler)
- # Sometimes output will lag after a test has successfully finished; we
- # ignore such writes to our pipes.
- try_set_handler('SIGPIPE', signal.SIG_IGN)
-
- # Run the tests
- result.startTestRun()
- for augmented_case in augmented_cases:
- sys.stdout.write('Running {}\n'.format(augmented_case.case.id()))
- sys.stdout.flush()
- case_thread = threading.Thread(
- target=augmented_case.case.run, args=(result,))
- try:
- with stdout_pipe, stderr_pipe:
- case_thread.start()
- while case_thread.is_alive():
+ def run(self, suite):
+ """See setuptools' test_runner setup argument for information."""
+ # only run test cases with id starting with given prefix
+ testcase_filter = os.getenv('GRPC_PYTHON_TESTRUNNER_FILTER')
+ filtered_cases = []
+ for case in _loader.iterate_suite_cases(suite):
+ if not testcase_filter or case.id().startswith(testcase_filter):
+ filtered_cases.append(case)
+
+ # Ensure that every test case has no collision with any other test case in
+ # the augmented results.
+ augmented_cases = [
+ AugmentedCase(case, uuid.uuid4()) for case in filtered_cases
+ ]
+ case_id_by_case = dict((augmented_case.case, augmented_case.id)
+ for augmented_case in augmented_cases)
+ result_out = moves.cStringIO()
+ result = _result.TerminalResult(
+ result_out, id_map=lambda case: case_id_by_case[case])
+ stdout_pipe = CaptureFile(sys.stdout.fileno())
+ stderr_pipe = CaptureFile(sys.stderr.fileno())
+ kill_flag = [False]
+
+ def sigint_handler(signal_number, frame):
+ if signal_number == signal.SIGINT:
+ kill_flag[0] = True # Python 2.7 not having 'local'... :-(
+ signal.signal(signal_number, signal.SIG_DFL)
+
+ def fault_handler(signal_number, frame):
+ stdout_pipe.write_bypass(
+ 'Received fault signal {}\nstdout:\n{}\n\nstderr:{}\n'.format(
+ signal_number, stdout_pipe.output(), stderr_pipe.output()))
+ os._exit(1)
+
+ def check_kill_self():
+ if kill_flag[0]:
+ stdout_pipe.write_bypass('Stopping tests short...')
+ result.stopTestRun()
+ stdout_pipe.write_bypass(result_out.getvalue())
+ stdout_pipe.write_bypass('\ninterrupted stdout:\n{}\n'.format(
+ stdout_pipe.output().decode()))
+ stderr_pipe.write_bypass('\ninterrupted stderr:\n{}\n'.format(
+ stderr_pipe.output().decode()))
+ os._exit(1)
+
+ def try_set_handler(name, handler):
+ try:
+ signal.signal(getattr(signal, name), handler)
+ except AttributeError:
+ pass
+
+ try_set_handler('SIGINT', sigint_handler)
+ try_set_handler('SIGSEGV', fault_handler)
+ try_set_handler('SIGBUS', fault_handler)
+ try_set_handler('SIGABRT', fault_handler)
+ try_set_handler('SIGFPE', fault_handler)
+ try_set_handler('SIGILL', fault_handler)
+ # Sometimes output will lag after a test has successfully finished; we
+ # ignore such writes to our pipes.
+ try_set_handler('SIGPIPE', signal.SIG_IGN)
+
+ # Run the tests
+ result.startTestRun()
+ for augmented_case in augmented_cases:
+ sys.stdout.write('Running {}\n'.format(augmented_case.case.id(
+ )))
+ sys.stdout.flush()
+ case_thread = threading.Thread(
+ target=augmented_case.case.run, args=(result,))
+ try:
+ with stdout_pipe, stderr_pipe:
+ case_thread.start()
+ while case_thread.is_alive():
+ check_kill_self()
+ time.sleep(0)
+ case_thread.join()
+ except:
+ # re-raise the exception after forcing the with-block to end
+ raise
+ result.set_output(augmented_case.case,
+ stdout_pipe.output(), stderr_pipe.output())
+ sys.stdout.write(result_out.getvalue())
+ sys.stdout.flush()
+ result_out.truncate(0)
check_kill_self()
- time.sleep(0)
- case_thread.join()
- except:
- # re-raise the exception after forcing the with-block to end
- raise
- result.set_output(
- augmented_case.case, stdout_pipe.output(), stderr_pipe.output())
- sys.stdout.write(result_out.getvalue())
- sys.stdout.flush()
- result_out.truncate(0)
- check_kill_self()
- result.stopTestRun()
- stdout_pipe.close()
- stderr_pipe.close()
-
- # Report results
- sys.stdout.write(result_out.getvalue())
- sys.stdout.flush()
- signal.signal(signal.SIGINT, signal.SIG_DFL)
- with open('report.xml', 'wb') as report_xml_file:
- _result.jenkins_junit_xml(result).write(report_xml_file)
- return result
-
+ result.stopTestRun()
+ stdout_pipe.close()
+ stderr_pipe.close()
+
+ # Report results
+ sys.stdout.write(result_out.getvalue())
+ sys.stdout.flush()
+ signal.signal(signal.SIGINT, signal.SIG_DFL)
+ with open('report.xml', 'wb') as report_xml_file:
+ _result.jenkins_junit_xml(result).write(report_xml_file)
+ return result