diff options
Diffstat (limited to 'src/python/grpcio_tests/tests/_result.py')
-rw-r--r-- | src/python/grpcio_tests/tests/_result.py | 453 |
1 files changed, 453 insertions, 0 deletions
diff --git a/src/python/grpcio_tests/tests/_result.py b/src/python/grpcio_tests/tests/_result.py new file mode 100644 index 0000000000..1acec6a9b5 --- /dev/null +++ b/src/python/grpcio_tests/tests/_result.py @@ -0,0 +1,453 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +from __future__ import absolute_import + +import collections +import itertools +import traceback +import unittest +from xml.etree import ElementTree + +import coverage +from six import moves + +from tests import _loader + + +class CaseResult(collections.namedtuple('CaseResult', [ + 'id', 'name', 'kind', 'stdout', 'stderr', 'skip_reason', 'traceback'])): + """A serializable result of a single test case. + + Attributes: + id (object): Any serializable object used to denote the identity of this + test case. + name (str or None): A human-readable name of the test case. + kind (CaseResult.Kind): The kind of test result. + stdout (object or None): Output on stdout, or None if nothing was captured. + stderr (object or None): Output on stderr, or None if nothing was captured. + skip_reason (object or None): The reason the test was skipped. Must be + something if self.kind is CaseResult.Kind.SKIP, else None. + traceback (object or None): The traceback of the test. Must be something if + self.kind is CaseResult.Kind.{ERROR, FAILURE, EXPECTED_FAILURE}, else + None. + """ + + class Kind: + UNTESTED = 'untested' + RUNNING = 'running' + ERROR = 'error' + FAILURE = 'failure' + SUCCESS = 'success' + SKIP = 'skip' + EXPECTED_FAILURE = 'expected failure' + UNEXPECTED_SUCCESS = 'unexpected success' + + def __new__(cls, id=None, name=None, kind=None, stdout=None, stderr=None, + skip_reason=None, traceback=None): + """Helper keyword constructor for the namedtuple. + + See this class' attributes for information on the arguments.""" + assert id is not None + assert name is None or isinstance(name, str) + if kind is CaseResult.Kind.UNTESTED: + pass + elif kind is CaseResult.Kind.RUNNING: + pass + elif kind is CaseResult.Kind.ERROR: + assert traceback is not None + elif kind is CaseResult.Kind.FAILURE: + assert traceback is not None + elif kind is CaseResult.Kind.SUCCESS: + pass + elif kind is CaseResult.Kind.SKIP: + assert skip_reason is not None + elif kind is CaseResult.Kind.EXPECTED_FAILURE: + assert traceback is not None + elif kind is CaseResult.Kind.UNEXPECTED_SUCCESS: + pass + else: + assert False + return super(cls, CaseResult).__new__( + cls, id, name, kind, stdout, stderr, skip_reason, traceback) + + def updated(self, name=None, kind=None, stdout=None, stderr=None, + skip_reason=None, traceback=None): + """Get a new validated CaseResult with the fields updated. + + See this class' attributes for information on the arguments.""" + name = self.name if name is None else name + kind = self.kind if kind is None else kind + stdout = self.stdout if stdout is None else stdout + stderr = self.stderr if stderr is None else stderr + skip_reason = self.skip_reason if skip_reason is None else skip_reason + traceback = self.traceback if traceback is None else traceback + return CaseResult(id=self.id, name=name, kind=kind, stdout=stdout, + stderr=stderr, skip_reason=skip_reason, + traceback=traceback) + + +class AugmentedResult(unittest.TestResult): + """unittest.Result that keeps track of additional information. + + Uses CaseResult objects to store test-case results, providing additional + information beyond that of the standard Python unittest library, such as + standard output. + + Attributes: + id_map (callable): A unary callable mapping unittest.TestCase objects to + unique identifiers. + cases (dict): A dictionary mapping from the identifiers returned by id_map + to CaseResult objects corresponding to those IDs. + """ + + def __init__(self, id_map): + """Initialize the object with an identifier mapping. + + Arguments: + id_map (callable): Corresponds to the attribute `id_map`.""" + super(AugmentedResult, self).__init__() + self.id_map = id_map + self.cases = None + + def startTestRun(self): + """See unittest.TestResult.startTestRun.""" + super(AugmentedResult, self).startTestRun() + self.cases = dict() + + def stopTestRun(self): + """See unittest.TestResult.stopTestRun.""" + super(AugmentedResult, self).stopTestRun() + + def startTest(self, test): + """See unittest.TestResult.startTest.""" + super(AugmentedResult, self).startTest(test) + case_id = self.id_map(test) + self.cases[case_id] = CaseResult( + id=case_id, name=test.id(), kind=CaseResult.Kind.RUNNING) + + def addError(self, test, error): + """See unittest.TestResult.addError.""" + super(AugmentedResult, self).addError(test, error) + case_id = self.id_map(test) + self.cases[case_id] = self.cases[case_id].updated( + kind=CaseResult.Kind.ERROR, traceback=error) + + def addFailure(self, test, error): + """See unittest.TestResult.addFailure.""" + super(AugmentedResult, self).addFailure(test, error) + case_id = self.id_map(test) + self.cases[case_id] = self.cases[case_id].updated( + kind=CaseResult.Kind.FAILURE, traceback=error) + + def addSuccess(self, test): + """See unittest.TestResult.addSuccess.""" + super(AugmentedResult, self).addSuccess(test) + case_id = self.id_map(test) + self.cases[case_id] = self.cases[case_id].updated( + kind=CaseResult.Kind.SUCCESS) + + def addSkip(self, test, reason): + """See unittest.TestResult.addSkip.""" + super(AugmentedResult, self).addSkip(test, reason) + case_id = self.id_map(test) + self.cases[case_id] = self.cases[case_id].updated( + kind=CaseResult.Kind.SKIP, skip_reason=reason) + + def addExpectedFailure(self, test, error): + """See unittest.TestResult.addExpectedFailure.""" + super(AugmentedResult, self).addExpectedFailure(test, error) + case_id = self.id_map(test) + self.cases[case_id] = self.cases[case_id].updated( + kind=CaseResult.Kind.EXPECTED_FAILURE, traceback=error) + + def addUnexpectedSuccess(self, test): + """See unittest.TestResult.addUnexpectedSuccess.""" + super(AugmentedResult, self).addUnexpectedSuccess(test) + case_id = self.id_map(test) + self.cases[case_id] = self.cases[case_id].updated( + kind=CaseResult.Kind.UNEXPECTED_SUCCESS) + + def set_output(self, test, stdout, stderr): + """Set the output attributes for the CaseResult corresponding to a test. + + Args: + test (unittest.TestCase): The TestCase to set the outputs of. + stdout (str): Output from stdout to assign to self.id_map(test). + stderr (str): Output from stderr to assign to self.id_map(test). + """ + case_id = self.id_map(test) + self.cases[case_id] = self.cases[case_id].updated( + stdout=stdout.decode(), stderr=stderr.decode()) + + def augmented_results(self, filter): + """Convenience method to retrieve filtered case results. + + Args: + filter (callable): A unary predicate to filter over CaseResult objects. + """ + return (self.cases[case_id] for case_id in self.cases + if filter(self.cases[case_id])) + + +class CoverageResult(AugmentedResult): + """Extension to AugmentedResult adding coverage.py support per test.\ + + Attributes: + coverage_context (coverage.Coverage): coverage.py management object. + """ + + def __init__(self, id_map): + """See AugmentedResult.__init__.""" + super(CoverageResult, self).__init__(id_map=id_map) + self.coverage_context = None + + def startTest(self, test): + """See unittest.TestResult.startTest. + + Additionally initializes and begins code coverage tracking.""" + super(CoverageResult, self).startTest(test) + self.coverage_context = coverage.Coverage(data_suffix=True) + self.coverage_context.start() + + def stopTest(self, test): + """See unittest.TestResult.stopTest. + + Additionally stops and deinitializes code coverage tracking.""" + super(CoverageResult, self).stopTest(test) + self.coverage_context.stop() + self.coverage_context.save() + self.coverage_context = None + + def stopTestRun(self): + """See unittest.TestResult.stopTestRun.""" + super(CoverageResult, self).stopTestRun() + # TODO(atash): Dig deeper into why the following line fails to properly + # combine coverage data from the Cython plugin. + #coverage.Coverage().combine() + + +class _Colors: + """Namespaced constants for terminal color magic numbers.""" + HEADER = '\033[95m' + INFO = '\033[94m' + OK = '\033[92m' + WARN = '\033[93m' + FAIL = '\033[91m' + BOLD = '\033[1m' + UNDERLINE = '\033[4m' + END = '\033[0m' + + +class TerminalResult(CoverageResult): + """Extension to CoverageResult adding basic terminal reporting.""" + + def __init__(self, out, id_map): + """Initialize the result object. + + Args: + out (file-like): Output file to which terminal-colored live results will + be written. + id_map (callable): See AugmentedResult.__init__. + """ + super(TerminalResult, self).__init__(id_map=id_map) + self.out = out + + def startTestRun(self): + """See unittest.TestResult.startTestRun.""" + super(TerminalResult, self).startTestRun() + self.out.write( + _Colors.HEADER + + 'Testing gRPC Python...\n' + + _Colors.END) + + def stopTestRun(self): + """See unittest.TestResult.stopTestRun.""" + super(TerminalResult, self).stopTestRun() + self.out.write(summary(self)) + self.out.flush() + + def addError(self, test, error): + """See unittest.TestResult.addError.""" + super(TerminalResult, self).addError(test, error) + self.out.write( + _Colors.FAIL + + 'ERROR {}\n'.format(test.id()) + + _Colors.END) + self.out.flush() + + def addFailure(self, test, error): + """See unittest.TestResult.addFailure.""" + super(TerminalResult, self).addFailure(test, error) + self.out.write( + _Colors.FAIL + + 'FAILURE {}\n'.format(test.id()) + + _Colors.END) + self.out.flush() + + def addSuccess(self, test): + """See unittest.TestResult.addSuccess.""" + super(TerminalResult, self).addSuccess(test) + self.out.write( + _Colors.OK + + 'SUCCESS {}\n'.format(test.id()) + + _Colors.END) + self.out.flush() + + def addSkip(self, test, reason): + """See unittest.TestResult.addSkip.""" + super(TerminalResult, self).addSkip(test, reason) + self.out.write( + _Colors.INFO + + 'SKIP {}\n'.format(test.id()) + + _Colors.END) + self.out.flush() + + def addExpectedFailure(self, test, error): + """See unittest.TestResult.addExpectedFailure.""" + super(TerminalResult, self).addExpectedFailure(test, error) + self.out.write( + _Colors.INFO + + 'FAILURE_OK {}\n'.format(test.id()) + + _Colors.END) + self.out.flush() + + def addUnexpectedSuccess(self, test): + """See unittest.TestResult.addUnexpectedSuccess.""" + super(TerminalResult, self).addUnexpectedSuccess(test) + self.out.write( + _Colors.INFO + + 'UNEXPECTED_OK {}\n'.format(test.id()) + + _Colors.END) + self.out.flush() + +def _traceback_string(type, value, trace): + """Generate a descriptive string of a Python exception traceback. + + Args: + type (class): The type of the exception. + value (Exception): The value of the exception. + trace (traceback): Traceback of the exception. + + Returns: + str: Formatted exception descriptive string. + """ + buffer = moves.cStringIO() + traceback.print_exception(type, value, trace, file=buffer) + return buffer.getvalue() + +def summary(result): + """A summary string of a result object. + + Args: + result (AugmentedResult): The result object to get the summary of. + + Returns: + str: The summary string. + """ + assert isinstance(result, AugmentedResult) + untested = list(result.augmented_results( + lambda case_result: case_result.kind is CaseResult.Kind.UNTESTED)) + running = list(result.augmented_results( + lambda case_result: case_result.kind is CaseResult.Kind.RUNNING)) + failures = list(result.augmented_results( + lambda case_result: case_result.kind is CaseResult.Kind.FAILURE)) + errors = list(result.augmented_results( + lambda case_result: case_result.kind is CaseResult.Kind.ERROR)) + successes = list(result.augmented_results( + lambda case_result: case_result.kind is CaseResult.Kind.SUCCESS)) + skips = list(result.augmented_results( + lambda case_result: case_result.kind is CaseResult.Kind.SKIP)) + expected_failures = list(result.augmented_results( + lambda case_result: case_result.kind is CaseResult.Kind.EXPECTED_FAILURE)) + unexpected_successes = list(result.augmented_results( + lambda case_result: case_result.kind is CaseResult.Kind.UNEXPECTED_SUCCESS)) + running_names = [case.name for case in running] + finished_count = (len(failures) + len(errors) + len(successes) + + len(expected_failures) + len(unexpected_successes)) + statistics = ( + '{finished} tests finished:\n' + '\t{successful} successful\n' + '\t{unsuccessful} unsuccessful\n' + '\t{skipped} skipped\n' + '\t{expected_fail} expected failures\n' + '\t{unexpected_successful} unexpected successes\n' + 'Interrupted Tests:\n' + '\t{interrupted}\n' + .format(finished=finished_count, + successful=len(successes), + unsuccessful=(len(failures)+len(errors)), + skipped=len(skips), + expected_fail=len(expected_failures), + unexpected_successful=len(unexpected_successes), + interrupted=str(running_names))) + tracebacks = '\n\n'.join([ + (_Colors.FAIL + '{test_name}' + _Colors.END + '\n' + + _Colors.BOLD + 'traceback:' + _Colors.END + '\n' + + '{traceback}\n' + + _Colors.BOLD + 'stdout:' + _Colors.END + '\n' + + '{stdout}\n' + + _Colors.BOLD + 'stderr:' + _Colors.END + '\n' + + '{stderr}\n').format( + test_name=result.name, + traceback=_traceback_string(*result.traceback), + stdout=result.stdout, stderr=result.stderr) + for result in itertools.chain(failures, errors) + ]) + notes = 'Unexpected successes: {}\n'.format([ + result.name for result in unexpected_successes]) + return statistics + '\nErrors/Failures: \n' + tracebacks + '\n' + notes + + +def jenkins_junit_xml(result): + """An XML tree object that when written is recognizable by Jenkins. + + Args: + result (AugmentedResult): The result object to get the junit xml output of. + + Returns: + ElementTree.ElementTree: The XML tree. + """ + assert isinstance(result, AugmentedResult) + root = ElementTree.Element('testsuites') + suite = ElementTree.SubElement(root, 'testsuite', { + 'name': 'Python gRPC tests', + }) + for case in result.cases.values(): + if case.kind is CaseResult.Kind.SUCCESS: + ElementTree.SubElement(suite, 'testcase', { + 'name': case.name, + }) + elif case.kind in (CaseResult.Kind.ERROR, CaseResult.Kind.FAILURE): + case_xml = ElementTree.SubElement(suite, 'testcase', { + 'name': case.name, + }) + error_xml = ElementTree.SubElement(case_xml, 'error', {}) + error_xml.text = ''.format(case.stderr, case.traceback) + return ElementTree.ElementTree(element=root) |