diff options
author | Craig Tiller <ctiller@google.com> | 2016-06-09 07:50:50 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2016-06-09 07:50:50 -0700 |
commit | 0ca68b7cb19d2735f965d283b9974d8cd42df51d (patch) | |
tree | 10abff4f54d5c109e8aa813a340729ba1ef2689f /src/python/grpcio | |
parent | 16ebf5a124026f8e9d59894b214d2e743590a93e (diff) | |
parent | 1bc2976a0f51e14d3525aecbf4b3450445ed2d1b (diff) |
Merge github.com:grpc/grpc into error
Diffstat (limited to 'src/python/grpcio')
-rw-r--r-- | src/python/grpcio/grpc/_auth.py | 15 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_cython/imports.generated.c | 4 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_cython/imports.generated.h | 7 | ||||
-rw-r--r-- | src/python/grpcio/grpc_core_dependencies.py | 4 | ||||
-rw-r--r-- | src/python/grpcio/tests/interop/client.py | 3 | ||||
-rw-r--r-- | src/python/grpcio/tests/interop/methods.py | 13 | ||||
-rw-r--r-- | src/python/grpcio/tests/qps/benchmark_client.py | 88 | ||||
-rw-r--r-- | src/python/grpcio/tests/qps/client_runner.py | 5 | ||||
-rw-r--r-- | src/python/grpcio/tests/qps/qps_worker.py | 4 | ||||
-rw-r--r-- | src/python/grpcio/tests/qps/worker_server.py | 5 |
10 files changed, 82 insertions, 66 deletions
diff --git a/src/python/grpcio/grpc/_auth.py b/src/python/grpcio/grpc/_auth.py index 3ae00ca23a..dea3221c9d 100644 --- a/src/python/grpcio/grpc/_auth.py +++ b/src/python/grpcio/grpc/_auth.py @@ -29,6 +29,7 @@ """GRPCAuthMetadataPlugins for standard authentication.""" +import inspect from concurrent import futures import grpc @@ -46,9 +47,21 @@ class GoogleCallCredentials(grpc.AuthMetadataPlugin): self._credentials = credentials self._pool = futures.ThreadPoolExecutor(max_workers=1) + # Hack to determine if these are JWT creds and we need to pass + # additional_claims when getting a token + if 'additional_claims' in inspect.getargspec( + credentials.get_access_token).args: + self._is_jwt = True + else: + self._is_jwt = False + def __call__(self, context, callback): # MetadataPlugins cannot block (see grpc.beta.interfaces.py) - future = self._pool.submit(self._credentials.get_access_token) + if self._is_jwt: + future = self._pool.submit(self._credentials.get_access_token, + additional_claims={'aud': context.service_url}) + else: + future = self._pool.submit(self._credentials.get_access_token) future.add_done_callback(lambda x: self._get_token_callback(callback, x)) def _get_token_callback(self, callback, future): diff --git a/src/python/grpcio/grpc/_cython/imports.generated.c b/src/python/grpcio/grpc/_cython/imports.generated.c index 1e1656c0ef..8437e74ba0 100644 --- a/src/python/grpcio/grpc/_cython/imports.generated.c +++ b/src/python/grpcio/grpc/_cython/imports.generated.c @@ -126,6 +126,8 @@ grpc_header_key_is_legal_type grpc_header_key_is_legal_import; grpc_header_nonbin_value_is_legal_type grpc_header_nonbin_value_is_legal_import; grpc_is_binary_header_type grpc_is_binary_header_import; grpc_call_error_to_string_type grpc_call_error_to_string_import; +grpc_insecure_channel_create_from_fd_type grpc_insecure_channel_create_from_fd_import; +grpc_server_add_insecure_channel_from_fd_type grpc_server_add_insecure_channel_from_fd_import; grpc_auth_property_iterator_next_type grpc_auth_property_iterator_next_import; grpc_auth_context_property_iterator_type grpc_auth_context_property_iterator_import; grpc_auth_context_peer_identity_type grpc_auth_context_peer_identity_import; @@ -399,6 +401,8 @@ void pygrpc_load_imports(HMODULE library) { grpc_header_nonbin_value_is_legal_import = (grpc_header_nonbin_value_is_legal_type) GetProcAddress(library, "grpc_header_nonbin_value_is_legal"); grpc_is_binary_header_import = (grpc_is_binary_header_type) GetProcAddress(library, "grpc_is_binary_header"); grpc_call_error_to_string_import = (grpc_call_error_to_string_type) GetProcAddress(library, "grpc_call_error_to_string"); + grpc_insecure_channel_create_from_fd_import = (grpc_insecure_channel_create_from_fd_type) GetProcAddress(library, "grpc_insecure_channel_create_from_fd"); + grpc_server_add_insecure_channel_from_fd_import = (grpc_server_add_insecure_channel_from_fd_type) GetProcAddress(library, "grpc_server_add_insecure_channel_from_fd"); grpc_auth_property_iterator_next_import = (grpc_auth_property_iterator_next_type) GetProcAddress(library, "grpc_auth_property_iterator_next"); grpc_auth_context_property_iterator_import = (grpc_auth_context_property_iterator_type) GetProcAddress(library, "grpc_auth_context_property_iterator"); grpc_auth_context_peer_identity_import = (grpc_auth_context_peer_identity_type) GetProcAddress(library, "grpc_auth_context_peer_identity"); diff --git a/src/python/grpcio/grpc/_cython/imports.generated.h b/src/python/grpcio/grpc/_cython/imports.generated.h index 7b9e94d05d..a14f4f5183 100644 --- a/src/python/grpcio/grpc/_cython/imports.generated.h +++ b/src/python/grpcio/grpc/_cython/imports.generated.h @@ -43,6 +43,7 @@ #include <grpc/census.h> #include <grpc/compression.h> #include <grpc/grpc.h> +#include <grpc/grpc_posix.h> #include <grpc/grpc_security.h> #include <grpc/impl/codegen/alloc.h> #include <grpc/impl/codegen/byte_buffer.h> @@ -328,6 +329,12 @@ extern grpc_is_binary_header_type grpc_is_binary_header_import; typedef const char *(*grpc_call_error_to_string_type)(grpc_call_error error); extern grpc_call_error_to_string_type grpc_call_error_to_string_import; #define grpc_call_error_to_string grpc_call_error_to_string_import +typedef grpc_channel *(*grpc_insecure_channel_create_from_fd_type)(const char *target, int fd, const grpc_channel_args *args); +extern grpc_insecure_channel_create_from_fd_type grpc_insecure_channel_create_from_fd_import; +#define grpc_insecure_channel_create_from_fd grpc_insecure_channel_create_from_fd_import +typedef void(*grpc_server_add_insecure_channel_from_fd_type)(grpc_server *server, grpc_completion_queue *cq, int fd); +extern grpc_server_add_insecure_channel_from_fd_type grpc_server_add_insecure_channel_from_fd_import; +#define grpc_server_add_insecure_channel_from_fd grpc_server_add_insecure_channel_from_fd_import typedef const grpc_auth_property *(*grpc_auth_property_iterator_next_type)(grpc_auth_property_iterator *it); extern grpc_auth_property_iterator_next_type grpc_auth_property_iterator_next_import; #define grpc_auth_property_iterator_next grpc_auth_property_iterator_next_import diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 4c207aaa42..0f9212f6a0 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -83,7 +83,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/channel/connected_channel.c', 'src/core/lib/channel/http_client_filter.c', 'src/core/lib/channel/http_server_filter.c', - 'src/core/lib/compression/compression_algorithm.c', + 'src/core/lib/compression/compression.c', 'src/core/lib/compression/message_compress.c', 'src/core/lib/debug/trace.c', 'src/core/lib/http/format_request.c', @@ -233,7 +233,9 @@ CORE_SOURCE_FILES = [ 'src/core/ext/client_config/subchannel_index.c', 'src/core/ext/client_config/uri_parser.c', 'src/core/ext/transport/chttp2/server/insecure/server_chttp2.c', + 'src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c', 'src/core/ext/transport/chttp2/client/insecure/channel_create.c', + 'src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c', 'src/core/ext/lb_policy/grpclb/load_balancer_api.c', 'src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c', 'third_party/nanopb/pb_common.c', diff --git a/src/python/grpcio/tests/interop/client.py b/src/python/grpcio/tests/interop/client.py index e3d5545a02..8aa1ce30c1 100644 --- a/src/python/grpcio/tests/interop/client.py +++ b/src/python/grpcio/tests/interop/client.py @@ -76,6 +76,9 @@ def _stub(args): creds = oauth2client_client.GoogleCredentials.get_application_default() scoped_creds = creds.create_scoped([args.oauth_scope]) call_creds = implementations.google_call_credentials(scoped_creds) + elif args.test_case == 'jwt_token_creds': + creds = oauth2client_client.GoogleCredentials.get_application_default() + call_creds = implementations.google_call_credentials(creds) else: call_creds = None if args.use_tls: diff --git a/src/python/grpcio/tests/interop/methods.py b/src/python/grpcio/tests/interop/methods.py index d5ef0c68bb..7eac511525 100644 --- a/src/python/grpcio/tests/interop/methods.py +++ b/src/python/grpcio/tests/interop/methods.py @@ -310,6 +310,16 @@ def _oauth2_auth_token(stub, args): (response.oauth_scope, args.oauth_scope)) +def _jwt_token_creds(stub, args): + json_key_filename = os.environ[ + oauth2client_client.GOOGLE_APPLICATION_CREDENTIALS] + wanted_email = json.load(open(json_key_filename, 'rb'))['client_email'] + response = _large_unary_common_behavior(stub, True, False) + if wanted_email != response.username: + raise ValueError( + 'expected username %s, got %s' % (wanted_email, response.username)) + + def _per_rpc_creds(stub, args): json_key_filename = os.environ[ oauth2client_client.GOOGLE_APPLICATION_CREDENTIALS] @@ -338,6 +348,7 @@ class TestCase(enum.Enum): EMPTY_STREAM = 'empty_stream' COMPUTE_ENGINE_CREDS = 'compute_engine_creds' OAUTH2_AUTH_TOKEN = 'oauth2_auth_token' + JWT_TOKEN_CREDS = 'jwt_token_creds' PER_RPC_CREDS = 'per_rpc_creds' TIMEOUT_ON_SLEEPING_SERVER = 'timeout_on_sleeping_server' @@ -364,6 +375,8 @@ class TestCase(enum.Enum): _compute_engine_creds(stub, args) elif self is TestCase.OAUTH2_AUTH_TOKEN: _oauth2_auth_token(stub, args) + elif self is TestCase.JWT_TOKEN_CREDS: + _jwt_token_creds(stub, args) elif self is TestCase.PER_RPC_CREDS: _per_rpc_creds(stub, args) else: diff --git a/src/python/grpcio/tests/qps/benchmark_client.py b/src/python/grpcio/tests/qps/benchmark_client.py index b372ea01ad..e2922347f9 100644 --- a/src/python/grpcio/tests/qps/benchmark_client.py +++ b/src/python/grpcio/tests/qps/benchmark_client.py @@ -82,6 +82,7 @@ class BenchmarkClient: self._response_callbacks = [] def add_response_callback(self, callback): + """callback will be invoked as callback(client, query_time)""" self._response_callbacks.append(callback) @abc.abstractmethod @@ -95,10 +96,10 @@ class BenchmarkClient: def stop(self): pass - def _handle_response(self, query_time): + def _handle_response(self, client, query_time): self._hist.add(query_time * 1e9) # Report times in nanoseconds for callback in self._response_callbacks: - callback(query_time) + callback(client, query_time) class UnarySyncBenchmarkClient(BenchmarkClient): @@ -121,7 +122,7 @@ class UnarySyncBenchmarkClient(BenchmarkClient): start_time = time.time() self._stub.UnaryCall(self._request, _TIMEOUT) end_time = time.time() - self._handle_response(end_time - start_time) + self._handle_response(self, end_time - start_time) class UnaryAsyncBenchmarkClient(BenchmarkClient): @@ -136,19 +137,20 @@ class UnaryAsyncBenchmarkClient(BenchmarkClient): def _response_received(self, start_time, resp): resp.result() end_time = time.time() - self._handle_response(end_time - start_time) + self._handle_response(self, end_time - start_time) def stop(self): self._stub = None -class StreamingSyncBenchmarkClient(BenchmarkClient): +class _SyncStream(object): - def __init__(self, server, config, hist): - super(StreamingSyncBenchmarkClient, self).__init__(server, config, hist) + def __init__(self, stub, generic, request, handle_response): + self._stub = stub + self._generic = generic + self._request = request + self._handle_response = handle_response self._is_streaming = False - self._pool = futures.ThreadPoolExecutor(max_workers=1) - # Use a thread-safe queue to put requests on the stream self._request_queue = queue.Queue() self._send_time_queue = queue.Queue() @@ -158,15 +160,6 @@ class StreamingSyncBenchmarkClient(BenchmarkClient): def start(self): self._is_streaming = True - self._pool.submit(self._request_stream) - - def stop(self): - self._is_streaming = False - self._pool.shutdown(wait=True) - self._stub = None - - def _request_stream(self): - self._is_streaming = True if self._generic: stream_callable = self._stub.stream_stream( 'grpc.testing.BenchmarkService', 'StreamingCall') @@ -175,8 +168,11 @@ class StreamingSyncBenchmarkClient(BenchmarkClient): response_stream = stream_callable(self._request_generator(), _TIMEOUT) for _ in response_stream: - end_time = time.time() - self._handle_response(end_time - self._send_time_queue.get_nowait()) + self._handle_response( + self, time.time() - self._send_time_queue.get_nowait()) + + def stop(self): + self._is_streaming = False def _request_generator(self): while self._is_streaming: @@ -187,46 +183,28 @@ class StreamingSyncBenchmarkClient(BenchmarkClient): pass -class AsyncReceiver(face.ResponseReceiver): - """Receiver for async stream responses.""" - - def __init__(self, send_time_queue, response_handler): - self._send_time_queue = send_time_queue - self._response_handler = response_handler - - def initial_metadata(self, initial_mdetadata): - pass - - def response(self, response): - end_time = time.time() - self._response_handler(end_time - self._send_time_queue.get_nowait()) - - def complete(self, terminal_metadata, code, details): - pass - - -class StreamingAsyncBenchmarkClient(BenchmarkClient): +class StreamingSyncBenchmarkClient(BenchmarkClient): def __init__(self, server, config, hist): - super(StreamingAsyncBenchmarkClient, self).__init__(server, config, hist) - self._send_time_queue = queue.Queue() - self._receiver = AsyncReceiver(self._send_time_queue, self._handle_response) - self._rendezvous = None + super(StreamingSyncBenchmarkClient, self).__init__(server, config, hist) + self._pool = futures.ThreadPoolExecutor( + max_workers=config.outstanding_rpcs_per_channel) + self._streams = [_SyncStream(self._stub, self._generic, + self._request, self._handle_response) + for _ in xrange(config.outstanding_rpcs_per_channel)] + self._curr_stream = 0 def send_request(self): - if self._rendezvous is not None: - self._send_time_queue.put(time.time()) - self._rendezvous.consume(self._request) + # Use a round_robin scheduler to determine what stream to send on + self._streams[self._curr_stream].send_request() + self._curr_stream = (self._curr_stream + 1) % len(self._streams) def start(self): - if self._generic: - stream_callable = self._stub.stream_stream( - 'grpc.testing.BenchmarkService', 'StreamingCall') - else: - stream_callable = self._stub.StreamingCall - self._rendezvous = stream_callable.event( - self._receiver, lambda *args: None, _TIMEOUT) + for stream in self._streams: + self._pool.submit(stream.start) def stop(self): - self._rendezvous.terminate() - self._rendezvous = None + for stream in self._streams: + stream.stop() + self._pool.shutdown(wait=True) + self._stub = None diff --git a/src/python/grpcio/tests/qps/client_runner.py b/src/python/grpcio/tests/qps/client_runner.py index 1ede7d2af1..2d1d981733 100644 --- a/src/python/grpcio/tests/qps/client_runner.py +++ b/src/python/grpcio/tests/qps/client_runner.py @@ -98,7 +98,6 @@ class ClosedLoopClientRunner(ClientRunner): self._client.stop() self._client = None - def _send_request(self, response_time): + def _send_request(self, client, response_time): if self._is_running: - self._client.send_request() - + client.send_request() diff --git a/src/python/grpcio/tests/qps/qps_worker.py b/src/python/grpcio/tests/qps/qps_worker.py index 3dda718638..16926379a5 100644 --- a/src/python/grpcio/tests/qps/qps_worker.py +++ b/src/python/grpcio/tests/qps/qps_worker.py @@ -43,9 +43,7 @@ def run_worker_server(port): server.add_insecure_port('[::]:{}'.format(port)) server.start() servicer.wait_for_quit() - # Drain outstanding requests for clean exit - time.sleep(2) - server.stop(0) + server.stop(2) if __name__ == '__main__': diff --git a/src/python/grpcio/tests/qps/worker_server.py b/src/python/grpcio/tests/qps/worker_server.py index 1f9af5482c..d41f8377c2 100644 --- a/src/python/grpcio/tests/qps/worker_server.py +++ b/src/python/grpcio/tests/qps/worker_server.py @@ -153,9 +153,8 @@ class WorkerServer(services_pb2.BetaWorkerServiceServicer): if config.rpc_type == control_pb2.UNARY: client = benchmark_client.UnaryAsyncBenchmarkClient( server, config, qps_data) - elif config.rpc_type == control_pb2.STREAMING: - client = benchmark_client.StreamingAsyncBenchmarkClient( - server, config, qps_data) + else: + raise Exception('Async streaming client not supported') else: raise Exception('Unsupported client type {}'.format(config.client_type)) |