diff options
author | 2016-01-27 19:57:58 -0800 | |
---|---|---|
committer | 2016-01-27 19:57:58 -0800 | |
commit | b4e51b52bdd3a41f44e4959d08957f7142c9c56a (patch) | |
tree | 699df3ca4dfe307dc9e139dab565dc1db1a700fe /src/python | |
parent | 5a9462339dd35de13acd44e7c8a001ac727c038e (diff) | |
parent | f846aaf41baf34d68f9aa7e3daa2c65cd75dd7f1 (diff) |
Merge branch 'master' of github.com:grpc/grpc into sync-async-plus-interfaces
Diffstat (limited to 'src/python')
7 files changed, 345 insertions, 32 deletions
diff --git a/src/python/grpcio/commands.py b/src/python/grpcio/commands.py index bd12c5579c..a6b8ad3fc0 100644 --- a/src/python/grpcio/commands.py +++ b/src/python/grpcio/commands.py @@ -30,16 +30,25 @@ """Provides distutils command classes for the GRPC Python setup process.""" import distutils +import glob import os import os.path +import platform import re +import shutil import subprocess import sys +import traceback import setuptools +from setuptools.command import bdist_egg +from setuptools.command import build_ext from setuptools.command import build_py +from setuptools.command import easy_install +from setuptools.command import install from setuptools.command import test -from setuptools.command import build_ext + +import support PYTHON_STEM = os.path.dirname(os.path.abspath(__file__)) @@ -56,6 +65,129 @@ class CommandError(Exception): """Simple exception class for GRPC custom commands.""" +# TODO(atash): Remove this once PyPI has better Linux bdist support. See +# https://bitbucket.org/pypa/pypi/issues/120/binary-wheels-for-linux-are-not-supported +def _get_linux_bdist_egg(decorated_basename, target_egg_basename): + """Returns a string path to a .egg file for Linux to install. + + If we can retrieve a pre-compiled egg from online, uses it. Else, emits a + warning and builds from source. + """ + # Break import style to ensure that setup.py has had a chance to install the + # relevant package eggs. + from six.moves.urllib import request + decorated_path = decorated_basename + '.egg' + try: + url = ( + 'https://storage.googleapis.com/grpc-precompiled-binaries/' + 'python/{target}' + .format(target=decorated_path)) + egg_data = request.urlopen(url).read() + except IOError as error: + raise CommandError( + '{}\n\nCould not find the bdist egg {}: {}' + .format(traceback.format_exc(), decorated_path, error.message)) + # Our chosen local egg path. + egg_path = target_egg_basename + '.egg' + try: + with open(egg_path, 'w') as egg_file: + egg_file.write(egg_data) + except IOError as error: + raise CommandError( + '{}\n\nCould not write grpcio egg: {}' + .format(traceback.format_exc(), error.message)) + return egg_path + + +class EggNameMixin(object): + + def egg_name(self, with_custom): + """ + Args: + with_custom: Boolean describing whether or not to decorate the egg name + with custom gRPC-specific target information. + """ + egg_command = self.get_finalized_command('bdist_egg') + base = os.path.splitext(os.path.basename(egg_command.egg_output))[0] + if with_custom: + flavor = 'ucs2' if sys.maxunicode == 65535 else 'ucs4' + return '{base}-{flavor}'.format(base=base, flavor=flavor) + else: + return base + + +class Install(install.install, EggNameMixin): + """Custom Install command for gRPC Python. + + This is for bdist shims and whatever else we might need a custom install + command for. + """ + + user_options = install.install.user_options + [ + # TODO(atash): remove this once manylinux gets on PyPI. See + # https://bitbucket.org/pypa/pypi/issues/120/binary-wheels-for-linux-are-not-supported + ('use-linux-bdist', None, + 'Whether to retrieve a binary for Linux instead of building from ' + 'source.'), + ] + + def initialize_options(self): + install.install.initialize_options(self) + self.use_linux_bdist = False + + def finalize_options(self): + install.install.finalize_options(self) + + def run(self): + if self.use_linux_bdist: + try: + egg_path = _get_linux_bdist_egg(self.egg_name(True), + self.egg_name(False)) + except CommandError as error: + sys.stderr.write( + '\nWARNING: Failed to acquire grpcio prebuilt binary:\n' + '{}.\n\n'.format(error.message)) + raise + try: + self._run_bdist_retrieval_install(egg_path) + except Exception as error: + # if anything else happens (and given how there's no way to really know + # what's happening in setuptools here, I mean *anything*), warn the user + # and fall back to building from source. + sys.stderr.write( + '{}\nWARNING: Failed to install grpcio prebuilt binary.\n\n' + .format(traceback.format_exc())) + install.install.run(self) + else: + install.install.run(self) + + # TODO(atash): Remove this once PyPI has better Linux bdist support. See + # https://bitbucket.org/pypa/pypi/issues/120/binary-wheels-for-linux-are-not-supported + def _run_bdist_retrieval_install(self, bdist_egg): + easy_install = self.distribution.get_command_class('easy_install') + easy_install_command = easy_install( + self.distribution, args='x', root=self.root, record=self.record, + ) + easy_install_command.ensure_finalized() + easy_install_command.always_copy_from = '.' + easy_install_command.package_index.scan(glob.glob('*.egg')) + arguments = [bdist_egg] + if setuptools.bootstrap_install_from: + args.insert(0, setuptools.bootstrap_install_from) + easy_install_command.args = arguments + easy_install_command.run() + setuptools.bootstrap_install_from = None + + +class BdistEggCustomName(bdist_egg.bdist_egg, EggNameMixin): + """Thin wrapper around the bdist_egg command to build with our custom name.""" + + def run(self): + bdist_egg.bdist_egg.run(self) + target = os.path.join(self.dist_dir, '{}.egg'.format(self.egg_name(True))) + shutil.move(self.get_outputs()[0], target) + + class SphinxDocumentation(setuptools.Command): """Command to generate documentation via sphinx.""" @@ -186,7 +318,13 @@ class BuildExt(build_ext.build_ext): if compiler in BuildExt.LINK_OPTIONS: for extension in self.extensions: extension.extra_link_args += list(BuildExt.LINK_OPTIONS[compiler]) - build_ext.build_ext.build_extensions(self) + try: + build_ext.build_ext.build_extensions(self) + except KeyboardInterrupt: + raise + except Exception as error: + support.diagnose_build_ext_error(self, error) + raise CommandError("Failed `build_ext` step.") class Gather(setuptools.Command): diff --git a/src/python/grpcio/grpc/framework/foundation/logging_pool.py b/src/python/grpcio/grpc/framework/foundation/logging_pool.py index 7c7a6eebfc..f82c7f7fba 100644 --- a/src/python/grpcio/grpc/framework/foundation/logging_pool.py +++ b/src/python/grpcio/grpc/framework/foundation/logging_pool.py @@ -1,4 +1,4 @@ -# Copyright 2015, Google Inc. +# Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -29,7 +29,6 @@ """A thread pool that logs exceptions raised by tasks executed within it.""" -import functools import logging from concurrent import futures @@ -37,12 +36,12 @@ from concurrent import futures def _wrap(behavior): """Wraps an arbitrary callable behavior in exception-logging.""" - @functools.wraps(behavior) def _wrapping(*args, **kwargs): try: return behavior(*args, **kwargs) except Exception as e: - logging.exception('Unexpected exception from task run in logging pool!') + logging.exception( + 'Unexpected exception from %s executed in logging pool!', behavior) raise return _wrapping diff --git a/src/python/grpcio/support.py b/src/python/grpcio/support.py new file mode 100644 index 0000000000..bbc509653d --- /dev/null +++ b/src/python/grpcio/support.py @@ -0,0 +1,91 @@ +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + +import os +import os.path +import shutil +import sys +import tempfile + +from distutils import errors + +import commands + + +C_PYTHON_DEV = """ +#include <Python.h> +int main(int argc, char **argv) { return 0; } +""" +C_PYTHON_DEV_ERROR_MESSAGE = """ +Could not find <Python.h>. This could mean the following: + * You're on Ubuntu and haven't `apt-get install`ed `python-dev`. + * You're on Mac OS X and the usual Python framework was somehow corrupted + (check your environment variables or try re-installing?) + * You're on Windows and your Python installation was somehow corrupted + (check your environment variables or try re-installing?) + * Note: Windows users should look into installing `vcpython27`. +""" + +C_CHECKS = { + C_PYTHON_DEV: C_PYTHON_DEV_ERROR_MESSAGE, +} + +def _compile(compiler, source_string): + tempdir = tempfile.mkdtemp() + cpath = os.path.join(tempdir, 'a.c') + with open(cpath, 'w') as cfile: + cfile.write(source_string) + try: + compiler.compile([cpath]) + except errors.CompileError as error: + return error + finally: + shutil.rmtree(tempdir) + +def _expect_compile(compiler, source_string, error_message): + if _compile(compiler, source_string) is not None: + sys.stderr.write(error_message) + raise commands.CommandError( + "Diagnostics found a compilation environment issue:\n{}" + .format(error_message)) + +def diagnose_build_ext_error(build_ext, error): + { + errors.CompileError: diagnose_compile_error + }[type(error)](build_ext, error) + +def diagnose_compile_error(build_ext, error): + """Attempt to run a few test files through the compiler to see if we can + diagnose the reason for the compile failure.""" + for c_check, message in C_CHECKS.items(): + _expect_compile(build_ext.compiler, c_check, message) + raise commands.CommandError( + "\n\nWe could not diagnose your build failure. Please file an issue at " + "http://www.github.com/grpc/grpc with `[Python install]` in the title.") diff --git a/src/python/grpcio/tests/unit/framework/foundation/_logging_pool_test.py b/src/python/grpcio/tests/unit/framework/foundation/_logging_pool_test.py index 452802da6a..0521e1c102 100644 --- a/src/python/grpcio/tests/unit/framework/foundation/_logging_pool_test.py +++ b/src/python/grpcio/tests/unit/framework/foundation/_logging_pool_test.py @@ -1,4 +1,4 @@ -# Copyright 2015, Google Inc. +# Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -29,6 +29,7 @@ """Tests for grpc.framework.foundation.logging_pool.""" +import threading import unittest from grpc.framework.foundation import logging_pool @@ -36,6 +37,21 @@ from grpc.framework.foundation import logging_pool _POOL_SIZE = 16 +class _CallableObject(object): + + def __init__(self): + self._lock = threading.Lock() + self._passed_values = [] + + def __call__(self, value): + with self._lock: + self._passed_values.append(value) + + def passed_values(self): + with self._lock: + return tuple(self._passed_values) + + class LoggingPoolTest(unittest.TestCase): def testUpAndDown(self): @@ -59,6 +75,14 @@ class LoggingPoolTest(unittest.TestCase): self.assertIsNotNone(raised_exception) + def testCallableObjectExecuted(self): + callable_object = _CallableObject() + passed_object = object() + with logging_pool.pool(_POOL_SIZE) as pool: + future = pool.submit(callable_object, passed_object) + self.assertIsNone(future.result()) + self.assertSequenceEqual((passed_object,), callable_object.passed_values()) + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py index 3bcefa601d..c8a3a1bc74 100644 --- a/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py +++ b/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py @@ -1,4 +1,4 @@ -# Copyright 2015, Google Inc. +# Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -30,9 +30,12 @@ """Test code for the Face layer of RPC Framework.""" import abc +import itertools import unittest +from concurrent import futures # test_interfaces is referenced from specification in this module. +from grpc.framework.foundation import logging_pool from grpc.framework.interfaces.face import face from tests.unit.framework.common import test_constants from tests.unit.framework.common import test_control @@ -139,13 +142,50 @@ class TestCase(test_coverage.Coverage, unittest.TestCase): test_messages.verify(second_request, second_response, self) - @unittest.skip('Parallel invocations impossible with blocking control flow!') def testParallelInvocations(self): - raise NotImplementedError() + pool = logging_pool.pool(test_constants.PARALLELISM) + for (group, method), test_messages_sequence in ( + self._digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = [] + response_futures = [] + for _ in range(test_constants.PARALLELISM): + request = test_messages.request() + response_future = pool.submit( + self._invoker.blocking(group, method), request, + test_constants.LONG_TIMEOUT) + requests.append(request) + response_futures.append(response_future) + + responses = [ + response_future.result() for response_future in response_futures] + + for request, response in zip(requests, responses): + test_messages.verify(request, response, self) + pool.shutdown(wait=True) - @unittest.skip('Parallel invocations impossible with blocking control flow!') def testWaitingForSomeButNotAllParallelInvocations(self): - raise NotImplementedError() + pool = logging_pool.pool(test_constants.PARALLELISM) + for (group, method), test_messages_sequence in ( + self._digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = [] + response_futures_to_indices = {} + for index in range(test_constants.PARALLELISM): + request = test_messages.request() + response_future = pool.submit( + self._invoker.blocking(group, method), request, + test_constants.LONG_TIMEOUT) + requests.append(request) + response_futures_to_indices[response_future] = index + + some_completed_response_futures_iterator = itertools.islice( + futures.as_completed(response_futures_to_indices), + test_constants.PARALLELISM / 2) + for response_future in some_completed_response_futures_iterator: + index = response_futures_to_indices[response_future] + test_messages.verify(requests[index], response_future.result(), self) + pool.shutdown(wait=True) @unittest.skip('Cancellation impossible with blocking control flow!') def testCancelledUnaryRequestUnaryResponse(self): diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py index fc8daa992f..1d36a931e8 100644 --- a/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py +++ b/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py @@ -31,8 +31,10 @@ import abc import contextlib +import itertools import threading import unittest +from concurrent import futures # test_interfaces is referenced from specification in this module. from grpc.framework.foundation import logging_pool @@ -219,6 +221,23 @@ class TestCase(test_coverage.Coverage, unittest.TestCase): test_messages.verify(second_request, second_response, self) + def testParallelInvocations(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + first_request = test_messages.request() + second_request = test_messages.request() + + first_response_future = self._invoker.future(group, method)( + first_request, test_constants.LONG_TIMEOUT) + second_response_future = self._invoker.future(group, method)( + second_request, test_constants.LONG_TIMEOUT) + first_response = first_response_future.result() + second_response = second_response_future.result() + + test_messages.verify(first_request, first_response, self) + test_messages.verify(second_request, second_response, self) + for (group, method), test_messages_sequence in ( self._digest.unary_unary_messages_sequences.iteritems()): for test_messages in test_messages_sequence: @@ -237,26 +256,28 @@ class TestCase(test_coverage.Coverage, unittest.TestCase): for request, response in zip(requests, responses): test_messages.verify(request, response, self) - def testParallelInvocations(self): + def testWaitingForSomeButNotAllParallelInvocations(self): + pool = logging_pool.pool(test_constants.PARALLELISM) for (group, method), test_messages_sequence in ( self._digest.unary_unary_messages_sequences.iteritems()): for test_messages in test_messages_sequence: - first_request = test_messages.request() - second_request = test_messages.request() - - first_response_future = self._invoker.future(group, method)( - first_request, test_constants.LONG_TIMEOUT) - second_response_future = self._invoker.future(group, method)( - second_request, test_constants.LONG_TIMEOUT) - first_response = first_response_future.result() - second_response = second_response_future.result() - - test_messages.verify(first_request, first_response, self) - test_messages.verify(second_request, second_response, self) - - @unittest.skip('TODO(nathaniel): implement.') - def testWaitingForSomeButNotAllParallelInvocations(self): - raise NotImplementedError() + requests = [] + response_futures_to_indices = {} + for index in range(test_constants.PARALLELISM): + request = test_messages.request() + inner_response_future = self._invoker.future(group, method)( + request, test_constants.LONG_TIMEOUT) + outer_response_future = pool.submit(inner_response_future.result) + requests.append(request) + response_futures_to_indices[outer_response_future] = index + + some_completed_response_futures_iterator = itertools.islice( + futures.as_completed(response_futures_to_indices), + test_constants.PARALLELISM / 2) + for response_future in some_completed_response_futures_iterator: + index = response_futures_to_indices[response_future] + test_messages.verify(requests[index], response_future.result(), self) + pool.shutdown(wait=True) def testCancelledUnaryRequestUnaryResponse(self): for (group, method), test_messages_sequence in ( diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_receiver.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_receiver.py index 2e444ff09d..42a7f4e3b8 100644 --- a/src/python/grpcio/tests/unit/framework/interfaces/face/_receiver.py +++ b/src/python/grpcio/tests/unit/framework/interfaces/face/_receiver.py @@ -1,4 +1,4 @@ -# Copyright 2015, Google Inc. +# Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -76,7 +76,7 @@ class Receiver(face.ResponseReceiver): def unary_response(self): with self._condition: if self._abortion is not None: - raise AssertionError('Aborted with abortion "%s"!' % self._abortion) + raise AssertionError('Aborted: "{}"!'.format(self._abortion)) elif len(self._responses) != 1: raise AssertionError( '%d responses received, not exactly one!', len(self._responses)) @@ -88,7 +88,7 @@ class Receiver(face.ResponseReceiver): if self._abortion is None: return list(self._responses) else: - raise AssertionError('Aborted with abortion "%s"!' % self._abortion) + raise AssertionError('Aborted: "{}"!'.format(self._abortion)) def abortion(self): with self._condition: |