diff options
author | Nathaniel Manista <nathaniel@google.com> | 2017-01-17 15:00:34 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-01-17 15:00:34 -0800 |
commit | fc4b07e10c0482522dbc6a01401ea8f1606a76b4 (patch) | |
tree | 622b321c1682d85950c8b9519c2f957b26e6e52e /src/python/grpcio_tests/tests/interop/client.py | |
parent | c0d7d67dfbe4f7d539e64365c4111b748285668f (diff) | |
parent | cc793703bfba6f661f523b6fec82ff8a913e1759 (diff) |
Merge pull request #9276 from soltanmm-google/remember-the-blue-flowers-they-are-important!
Enable yapf (Python formatting).
Diffstat (limited to 'src/python/grpcio_tests/tests/interop/client.py')
-rw-r--r-- | src/python/grpcio_tests/tests/interop/client.py | 163 |
1 files changed, 88 insertions, 75 deletions
diff --git a/src/python/grpcio_tests/tests/interop/client.py b/src/python/grpcio_tests/tests/interop/client.py index afaa466254..f177896e8e 100644 --- a/src/python/grpcio_tests/tests/interop/client.py +++ b/src/python/grpcio_tests/tests/interop/client.py @@ -26,7 +26,6 @@ # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - """The Python implementation of the GRPC interoperability test client.""" import argparse @@ -41,93 +40,107 @@ from tests.interop import resources def _args(): - parser = argparse.ArgumentParser() - parser.add_argument( - '--server_host', help='the host to which to connect', type=str, - default="127.0.0.1") - parser.add_argument( - '--server_port', help='the port to which to connect', type=int) - parser.add_argument( - '--test_case', help='the test case to execute', type=str, - default="large_unary") - parser.add_argument( - '--use_tls', help='require a secure connection', default=False, - type=resources.parse_bool) - parser.add_argument( - '--use_test_ca', help='replace platform root CAs with ca.pem', - default=False, type=resources.parse_bool) - parser.add_argument( - '--server_host_override', default="foo.test.google.fr", - help='the server host to which to claim to connect', type=str) - parser.add_argument('--oauth_scope', help='scope for OAuth tokens', type=str) - parser.add_argument( - '--default_service_account', - help='email address of the default service account', type=str) - return parser.parse_args() + parser = argparse.ArgumentParser() + parser.add_argument( + '--server_host', + help='the host to which to connect', + type=str, + default="127.0.0.1") + parser.add_argument( + '--server_port', help='the port to which to connect', type=int) + parser.add_argument( + '--test_case', + help='the test case to execute', + type=str, + default="large_unary") + parser.add_argument( + '--use_tls', + help='require a secure connection', + default=False, + type=resources.parse_bool) + parser.add_argument( + '--use_test_ca', + help='replace platform root CAs with ca.pem', + default=False, + type=resources.parse_bool) + parser.add_argument( + '--server_host_override', + default="foo.test.google.fr", + help='the server host to which to claim to connect', + type=str) + parser.add_argument( + '--oauth_scope', help='scope for OAuth tokens', type=str) + parser.add_argument( + '--default_service_account', + help='email address of the default service account', + type=str) + return parser.parse_args() def _application_default_credentials(): - return oauth2client_client.GoogleCredentials.get_application_default() + return oauth2client_client.GoogleCredentials.get_application_default() def _stub(args): - target = '{}:{}'.format(args.server_host, args.server_port) - if args.test_case == 'oauth2_auth_token': - google_credentials = _application_default_credentials() - scoped_credentials = google_credentials.create_scoped([args.oauth_scope]) - access_token = scoped_credentials.get_access_token().access_token - call_credentials = grpc.access_token_call_credentials(access_token) - elif args.test_case == 'compute_engine_creds': - google_credentials = _application_default_credentials() - scoped_credentials = google_credentials.create_scoped([args.oauth_scope]) - # TODO(https://github.com/grpc/grpc/issues/6799): Eliminate this last - # remaining use of the Beta API. - call_credentials = implementations.google_call_credentials( - scoped_credentials) - elif args.test_case == 'jwt_token_creds': - google_credentials = _application_default_credentials() - # TODO(https://github.com/grpc/grpc/issues/6799): Eliminate this last - # remaining use of the Beta API. - call_credentials = implementations.google_call_credentials( - google_credentials) - else: - call_credentials = None - if args.use_tls: - if args.use_test_ca: - root_certificates = resources.test_root_certificates() + target = '{}:{}'.format(args.server_host, args.server_port) + if args.test_case == 'oauth2_auth_token': + google_credentials = _application_default_credentials() + scoped_credentials = google_credentials.create_scoped( + [args.oauth_scope]) + access_token = scoped_credentials.get_access_token().access_token + call_credentials = grpc.access_token_call_credentials(access_token) + elif args.test_case == 'compute_engine_creds': + google_credentials = _application_default_credentials() + scoped_credentials = google_credentials.create_scoped( + [args.oauth_scope]) + # TODO(https://github.com/grpc/grpc/issues/6799): Eliminate this last + # remaining use of the Beta API. + call_credentials = implementations.google_call_credentials( + scoped_credentials) + elif args.test_case == 'jwt_token_creds': + google_credentials = _application_default_credentials() + # TODO(https://github.com/grpc/grpc/issues/6799): Eliminate this last + # remaining use of the Beta API. + call_credentials = implementations.google_call_credentials( + google_credentials) else: - root_certificates = None # will load default roots. - - channel_credentials = grpc.ssl_channel_credentials(root_certificates) - if call_credentials is not None: - channel_credentials = grpc.composite_channel_credentials( - channel_credentials, call_credentials) - - channel = grpc.secure_channel( - target, channel_credentials, - (('grpc.ssl_target_name_override', args.server_host_override,),)) - else: - channel = grpc.insecure_channel(target) - if args.test_case == "unimplemented_service": - return test_pb2.UnimplementedServiceStub(channel) - else: - return test_pb2.TestServiceStub(channel) + call_credentials = None + if args.use_tls: + if args.use_test_ca: + root_certificates = resources.test_root_certificates() + else: + root_certificates = None # will load default roots. + + channel_credentials = grpc.ssl_channel_credentials(root_certificates) + if call_credentials is not None: + channel_credentials = grpc.composite_channel_credentials( + channel_credentials, call_credentials) + + channel = grpc.secure_channel(target, channel_credentials, (( + 'grpc.ssl_target_name_override', + args.server_host_override,),)) + else: + channel = grpc.insecure_channel(target) + if args.test_case == "unimplemented_service": + return test_pb2.UnimplementedServiceStub(channel) + else: + return test_pb2.TestServiceStub(channel) def _test_case_from_arg(test_case_arg): - for test_case in methods.TestCase: - if test_case_arg == test_case.value: - return test_case - else: - raise ValueError('No test case "%s"!' % test_case_arg) + for test_case in methods.TestCase: + if test_case_arg == test_case.value: + return test_case + else: + raise ValueError('No test case "%s"!' % test_case_arg) def test_interoperability(): - args = _args() - stub = _stub(args) - test_case = _test_case_from_arg(args.test_case) - test_case.test_interoperability(stub, args) + args = _args() + stub = _stub(args) + test_case = _test_case_from_arg(args.test_case) + test_case.test_interoperability(stub, args) if __name__ == '__main__': - test_interoperability() + test_interoperability() |