# Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are # met: # # * Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # * Redistributions in binary form must reproduce the above # copyright notice, this list of conditions and the following disclaimer # in the documentation and/or other materials provided with the # distribution. # * Neither the name of Google Inc. nor the names of its # contributors may be used to endorse or promote products derived from # this software without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. from __future__ import absolute_import import collections import itertools import traceback import unittest from xml.etree import ElementTree import coverage from six import moves from tests import _loader class CaseResult(collections.namedtuple('CaseResult', [ 'id', 'name', 'kind', 'stdout', 'stderr', 'skip_reason', 'traceback'])): """A serializable result of a single test case. Attributes: id (object): Any serializable object used to denote the identity of this test case. name (str or None): A human-readable name of the test case. kind (CaseResult.Kind): The kind of test result. stdout (object or None): Output on stdout, or None if nothing was captured. stderr (object or None): Output on stderr, or None if nothing was captured. skip_reason (object or None): The reason the test was skipped. Must be something if self.kind is CaseResult.Kind.SKIP, else None. traceback (object or None): The traceback of the test. Must be something if self.kind is CaseResult.Kind.{ERROR, FAILURE, EXPECTED_FAILURE}, else None. """ class Kind: UNTESTED = 'untested' RUNNING = 'running' ERROR = 'error' FAILURE = 'failure' SUCCESS = 'success' SKIP = 'skip' EXPECTED_FAILURE = 'expected failure' UNEXPECTED_SUCCESS = 'unexpected success' def __new__(cls, id=None, name=None, kind=None, stdout=None, stderr=None, skip_reason=None, traceback=None): """Helper keyword constructor for the namedtuple. See this class' attributes for information on the arguments.""" assert id is not None assert name is None or isinstance(name, str) if kind is CaseResult.Kind.UNTESTED: pass elif kind is CaseResult.Kind.RUNNING: pass elif kind is CaseResult.Kind.ERROR: assert traceback is not None elif kind is CaseResult.Kind.FAILURE: assert traceback is not None elif kind is CaseResult.Kind.SUCCESS: pass elif kind is CaseResult.Kind.SKIP: assert skip_reason is not None elif kind is CaseResult.Kind.EXPECTED_FAILURE: assert traceback is not None elif kind is CaseResult.Kind.UNEXPECTED_SUCCESS: pass else: assert False return super(cls, CaseResult).__new__( cls, id, name, kind, stdout, stderr, skip_reason, traceback) def updated(self, name=None, kind=None, stdout=None, stderr=None, skip_reason=None, traceback=None): """Get a new validated CaseResult with the fields updated. See this class' attributes for information on the arguments.""" name = self.name if name is None else name kind = self.kind if kind is None else kind stdout = self.stdout if stdout is None else stdout stderr = self.stderr if stderr is None else stderr skip_reason = self.skip_reason if skip_reason is None else skip_reason traceback = self.traceback if traceback is None else traceback return CaseResult(id=self.id, name=name, kind=kind, stdout=stdout, stderr=stderr, skip_reason=skip_reason, traceback=traceback) class AugmentedResult(unittest.TestResult): """unittest.Result that keeps track of additional information. Uses CaseResult objects to store test-case results, providing additional information beyond that of the standard Python unittest library, such as standard output. Attributes: id_map (callable): A unary callable mapping unittest.TestCase objects to unique identifiers. cases (dict): A dictionary mapping from the identifiers returned by id_map to CaseResult objects corresponding to those IDs. """ def __init__(self, id_map): """Initialize the object with an identifier mapping. Arguments: id_map (callable): Corresponds to the attribute `id_map`.""" super(AugmentedResult, self).__init__() self.id_map = id_map self.cases = None def startTestRun(self): """See unittest.TestResult.startTestRun.""" super(AugmentedResult, self).startTestRun() self.cases = dict() def stopTestRun(self): """See unittest.TestResult.stopTestRun.""" super(AugmentedResult, self).stopTestRun() def startTest(self, test): """See unittest.TestResult.startTest.""" super(AugmentedResult, self).startTest(test) case_id = self.id_map(test) self.cases[case_id] = CaseResult( id=case_id, name=test.id(), kind=CaseResult.Kind.RUNNING) def addError(self, test, error): """See unittest.TestResult.addError.""" super(AugmentedResult, self).addError(test, error) case_id = self.id_map(test) self.cases[case_id] = self.cases[case_id].updated( kind=CaseResult.Kind.ERROR, traceback=error) def addFailure(self, test, error): """See unittest.TestResult.addFailure.""" super(AugmentedResult, self).addFailure(test, error) case_id = self.id_map(test) self.cases[case_id] = self.cases[case_id].updated( kind=CaseResult.Kind.FAILURE, traceback=error) def addSuccess(self, test): """See unittest.TestResult.addSuccess.""" super(AugmentedResult, self).addSuccess(test) case_id = self.id_map(test) self.cases[case_id] = self.cases[case_id].updated( kind=CaseResult.Kind.SUCCESS) def addSkip(self, test, reason): """See unittest.TestResult.addSkip.""" super(AugmentedResult, self).addSkip(test, reason) case_id = self.id_map(test) self.cases[case_id] = self.cases[case_id].updated( kind=CaseResult.Kind.SKIP, skip_reason=reason) def addExpectedFailure(self, test, error): """See unittest.TestResult.addExpectedFailure.""" super(AugmentedResult, self).addExpectedFailure(test, error) case_id = self.id_map(test) self.cases[case_id] = self.cases[case_id].updated( kind=CaseResult.Kind.EXPECTED_FAILURE, traceback=error) def addUnexpectedSuccess(self, test): """See unittest.TestResult.addUnexpectedSuccess.""" super(AugmentedResult, self).addUnexpectedSuccess(test) case_id = self.id_map(test) self.cases[case_id] = self.cases[case_id].updated( kind=CaseResult.Kind.UNEXPECTED_SUCCESS) def set_output(self, test, stdout, stderr): """Set the output attributes for the CaseResult corresponding to a test. Args: test (unittest.TestCase): The TestCase to set the outputs of. stdout (str): Output from stdout to assign to self.id_map(test). stderr (str): Output from stderr to assign to self.id_map(test). """ case_id = self.id_map(test) self.cases[case_id] = self.cases[case_id].updated( stdout=stdout.decode(), stderr=stderr.decode()) def augmented_results(self, filter): """Convenience method to retrieve filtered case results. Args: filter (callable): A unary predicate to filter over CaseResult objects. """ return (self.cases[case_id] for case_id in self.cases if filter(self.cases[case_id])) class CoverageResult(AugmentedResult): """Extension to AugmentedResult adding coverage.py support per test.\ Attributes: coverage_context (coverage.Coverage): coverage.py management object. """ def __init__(self, id_map): """See AugmentedResult.__init__.""" super(CoverageResult, self).__init__(id_map=id_map) self.coverage_context = None def startTest(self, test): """See unittest.TestResult.startTest. Additionally initializes and begins code coverage tracking.""" super(CoverageResult, self).startTest(test) self.coverage_context = coverage.Coverage(data_suffix=True) self.coverage_context.start() def stopTest(self, test): """See unittest.TestResult.stopTest. Additionally stops and deinitializes code coverage tracking.""" super(CoverageResult, self).stopTest(test) self.coverage_context.stop() self.coverage_context.save() self.coverage_context = None def stopTestRun(self): """See unittest.TestResult.stopTestRun.""" super(CoverageResult, self).stopTestRun() # TODO(atash): Dig deeper into why the following line fails to properly # combine coverage data from the Cython plugin. #coverage.Coverage().combine() class _Colors: """Namespaced constants for terminal color magic numbers.""" HEADER = '\033[95m' INFO = '\033[94m' OK = '\033[92m' WARN = '\033[93m' FAIL = '\033[91m' BOLD = '\033[1m' UNDERLINE = '\033[4m' END = '\033[0m' class TerminalResult(CoverageResult): """Extension to CoverageResult adding basic terminal reporting.""" def __init__(self, out, id_map): """Initialize the result object. Args: out (file-like): Output file to which terminal-colored live results will be written. id_map (callable): See AugmentedResult.__init__. """ super(TerminalResult, self).__init__(id_map=id_map) self.out = out def startTestRun(self): """See unittest.TestResult.startTestRun.""" super(TerminalResult, self).startTestRun() self.out.write( _Colors.HEADER + 'Testing gRPC Python...\n' + _Colors.END) def stopTestRun(self): """See unittest.TestResult.stopTestRun.""" super(TerminalResult, self).stopTestRun() self.out.write(summary(self)) self.out.flush() def addError(self, test, error): """See unittest.TestResult.addError.""" super(TerminalResult, self).addError(test, error) self.out.write( _Colors.FAIL + 'ERROR {}\n'.format(test.id()) + _Colors.END) self.out.flush() def addFailure(self, test, error): """See unittest.TestResult.addFailure.""" super(TerminalResult, self).addFailure(test, error) self.out.write( _Colors.FAIL + 'FAILURE {}\n'.format(test.id()) + _Colors.END) self.out.flush() def addSuccess(self, test): """See unittest.TestResult.addSuccess.""" super(TerminalResult, self).addSuccess(test) self.out.write( _Colors.OK + 'SUCCESS {}\n'.format(test.id()) + _Colors.END) self.out.flush() def addSkip(self, test, reason): """See unittest.TestResult.addSkip.""" super(TerminalResult, self).addSkip(test, reason) self.out.write( _Colors.INFO + 'SKIP {}\n'.format(test.id()) + _Colors.END) self.out.flush() def addExpectedFailure(self, test, error): """See unittest.TestResult.addExpectedFailure.""" super(TerminalResult, self).addExpectedFailure(test, error) self.out.write( _Colors.INFO + 'FAILURE_OK {}\n'.format(test.id()) + _Colors.END) self.out.flush() def addUnexpectedSuccess(self, test): """See unittest.TestResult.addUnexpectedSuccess.""" super(TerminalResult, self).addUnexpectedSuccess(test) self.out.write( _Colors.INFO + 'UNEXPECTED_OK {}\n'.format(test.id()) + _Colors.END) self.out.flush() def _traceback_string(type, value, trace): """Generate a descriptive string of a Python exception traceback. Args: type (class): The type of the exception. value (Exception): The value of the exception. trace (traceback): Traceback of the exception. Returns: str: Formatted exception descriptive string. """ buffer = moves.cStringIO() traceback.print_exception(type, value, trace, file=buffer) return buffer.getvalue() def summary(result): """A summary string of a result object. Args: result (AugmentedResult): The result object to get the summary of. Returns: str: The summary string. """ assert isinstance(result, AugmentedResult) untested = list(result.augmented_results( lambda case_result: case_result.kind is CaseResult.Kind.UNTESTED)) running = list(result.augmented_results( lambda case_result: case_result.kind is CaseResult.Kind.RUNNING)) failures = list(result.augmented_results( lambda case_result: case_result.kind is CaseResult.Kind.FAILURE)) errors = list(result.augmented_results( lambda case_result: case_result.kind is CaseResult.Kind.ERROR)) successes = list(result.augmented_results( lambda case_result: case_result.kind is CaseResult.Kind.SUCCESS)) skips = list(result.augmented_results( lambda case_result: case_result.kind is CaseResult.Kind.SKIP)) expected_failures = list(result.augmented_results( lambda case_result: case_result.kind is CaseResult.Kind.EXPECTED_FAILURE)) unexpected_successes = list(result.augmented_results( lambda case_result: case_result.kind is CaseResult.Kind.UNEXPECTED_SUCCESS)) running_names = [case.name for case in running] finished_count = (len(failures) + len(errors) + len(successes) + len(expected_failures) + len(unexpected_successes)) statistics = ( '{finished} tests finished:\n' '\t{successful} successful\n' '\t{unsuccessful} unsuccessful\n' '\t{skipped} skipped\n' '\t{expected_fail} expected failures\n' '\t{unexpected_successful} unexpected successes\n' 'Interrupted Tests:\n' '\t{interrupted}\n' .format(finished=finished_count, successful=len(successes), unsuccessful=(len(failures)+len(errors)), skipped=len(skips), expected_fail=len(expected_failures), unexpected_successful=len(unexpected_successes), interrupted=str(running_names))) tracebacks = '\n\n'.join([ (_Colors.FAIL + '{test_name}' + _Colors.END + '\n' + _Colors.BOLD + 'traceback:' + _Colors.END + '\n' + '{traceback}\n' + _Colors.BOLD + 'stdout:' + _Colors.END + '\n' + '{stdout}\n' + _Colors.BOLD + 'stderr:' + _Colors.END + '\n' + '{stderr}\n').format( test_name=result.name, traceback=_traceback_string(*result.traceback), stdout=result.stdout, stderr=result.stderr) for result in itertools.chain(failures, errors) ]) notes = 'Unexpected successes: {}\n'.format([ result.name for result in unexpected_successes]) return statistics + '\nErrors/Failures: \n' + tracebacks + '\n' + notes def jenkins_junit_xml(result): """An XML tree object that when written is recognizable by Jenkins. Args: result (AugmentedResult): The result object to get the junit xml output of. Returns: ElementTree.ElementTree: The XML tree. """ assert isinstance(result, AugmentedResult) root = ElementTree.Element('testsuites') suite = ElementTree.SubElement(root, 'testsuite', { 'name': 'Python gRPC tests', }) for case in result.cases.values(): if case.kind is CaseResult.Kind.SUCCESS: ElementTree.SubElement(suite, 'testcase', { 'name': case.name, }) elif case.kind in (CaseResult.Kind.ERROR, CaseResult.Kind.FAILURE): case_xml = ElementTree.SubElement(suite, 'testcase', { 'name': case.name, }) error_xml = ElementTree.SubElement(case_xml, 'error', {}) error_xml.text = ''.format(case.stderr, case.traceback) return ElementTree.ElementTree(element=root)