diff options
author | Ken Payson <kpayson@google.com> | 2016-04-19 12:08:34 -0700 |
---|---|---|
committer | Ken Payson <kpayson@google.com> | 2016-04-28 20:01:06 -0700 |
commit | 0482c1046b837be17c3a056af6569d84b8f52c23 (patch) | |
tree | 4efa2d6271958cf8449e45ddb93ab70d18476d98 /src/python | |
parent | a4e7ecab1a512bae1c40b9b301661ba24677b0ed (diff) |
Python QPS Worker/initial scenarios
Diffstat (limited to 'src/python')
-rw-r--r-- | src/python/grpcio/tests/qps/__init__.py | 28 | ||||
-rw-r--r-- | src/python/grpcio/tests/qps/benchmark_client.py | 186 | ||||
-rw-r--r-- | src/python/grpcio/tests/qps/benchmark_server.py | 58 | ||||
-rw-r--r-- | src/python/grpcio/tests/qps/client_runner.py | 104 | ||||
-rw-r--r-- | src/python/grpcio/tests/qps/histogram.py | 85 | ||||
-rw-r--r-- | src/python/grpcio/tests/qps/qps_worker.py | 60 | ||||
-rw-r--r-- | src/python/grpcio/tests/qps/worker_server.py | 184 |
7 files changed, 705 insertions, 0 deletions
diff --git a/src/python/grpcio/tests/qps/__init__.py b/src/python/grpcio/tests/qps/__init__.py new file mode 100644 index 0000000000..100a624dc9 --- /dev/null +++ b/src/python/grpcio/tests/qps/__init__.py @@ -0,0 +1,28 @@ +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/src/python/grpcio/tests/qps/benchmark_client.py b/src/python/grpcio/tests/qps/benchmark_client.py new file mode 100644 index 0000000000..eed0b0c6da --- /dev/null +++ b/src/python/grpcio/tests/qps/benchmark_client.py @@ -0,0 +1,186 @@ +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Defines test client behaviors (UNARY/STREAMING) (SYNC/ASYNC).""" + +import abc +import time +try: + import Queue as queue # Python 2.x +except ImportError: + import queue # Python 3 + +from concurrent import futures + +from grpc.beta import implementations +from src.proto.grpc.testing import messages_pb2 +from src.proto.grpc.testing import services_pb2 +from tests.unit import resources +from tests.unit.beta import test_utilities + +_TIMEOUT = 60 * 60 * 24 + + +class BenchmarkClient: + """Benchmark client interface that exposes a non-blocking send_request().""" + + __metaclass__ = abc.ABCMeta + + def __init__(self, server, config, hist): + # Create the stub + host, port = server.split(':') + port = int(port) + if config.HasField('security_params'): + creds = implementations.ssl_channel_credentials( + resources.test_root_certificates()) + channel = test_utilities.not_really_secure_channel( + host, port, creds, config.security_params.server_host_override) + else: + channel = implementations.insecure_channel(host, port) + + if config.payload_config.WhichOneof('payload') == 'simple_params': + self._generic = False + self._stub = services_pb2.beta_create_BenchmarkService_stub(channel) + payload = messages_pb2.Payload( + body='\0' * config.payload_config.simple_params.req_size) + self._request = messages_pb2.SimpleRequest( + payload=payload, + response_size=config.payload_config.simple_params.resp_size) + else: + self._generic = True + self._stub = implementations.generic_stub(channel) + self._request = '\0' * config.payload_config.bytebuf_params.req_size + + self._hist = hist + self._response_callbacks = [] + + def add_response_callback(self, callback): + self._response_callbacks.append(callback) + + @abc.abstractmethod + def send_request(self): + """Non-blocking wrapper for a client's request operation.""" + raise NotImplementedError() + + def start(self): + pass + + def stop(self): + pass + + def _handle_response(self, query_time): + self._hist.add(query_time * 1e9) # Report times in nanoseconds + for callback in self._response_callbacks: + callback(query_time) + + +class UnarySyncBenchmarkClient(BenchmarkClient): + + def __init__(self, server, config, hist): + super(UnarySyncBenchmarkClient, self).__init__(server, config, hist) + self._pool = futures.ThreadPoolExecutor( + max_workers=config.outstanding_rpcs_per_channel) + + def send_request(self): + # Send requests in seperate threads to support multiple outstanding rpcs + # (See src/proto/grpc/testing/control.proto) + self._pool.submit(self._dispatch_request) + + def stop(self): + self._pool.shutdown(wait=True) + self._stub = None + + def _dispatch_request(self): + start_time = time.time() + self._stub.UnaryCall(self._request, _TIMEOUT) + end_time = time.time() + self._handle_response(end_time - start_time) + + +class UnaryAsyncBenchmarkClient(BenchmarkClient): + + def send_request(self): + # Use the Future callback api to support multiple outstanding rpcs + start_time = time.time() + response_future = self._stub.UnaryCall.future(self._request, _TIMEOUT) + response_future.add_done_callback( + lambda resp: self._response_received(start_time, resp)) + + def _response_received(self, start_time, resp): + resp.result() + end_time = time.time() + self._handle_response(end_time - start_time) + + def stop(self): + self._stub = None + + +class StreamingAsyncBenchmarkClient(BenchmarkClient): + + def __init__(self, server, config, hist): + super(StreamingAsyncBenchmarkClient, self).__init__(server, config, hist) + self._is_streaming = False + self._pool = futures.ThreadPoolExecutor(max_workers=1) + # Use a thread-safe queue to put requests on the stream + self._request_queue = queue.Queue() + self._send_time_queue = queue.Queue() + + def send_request(self): + self._send_time_queue.put(time.time()) + self._request_queue.put(self._request) + + def start(self): + self._is_streaming = True + self._pool.submit(self._request_stream) + + def stop(self): + self._is_streaming = False + self._pool.shutdown(wait=True) + self._stub = None + + def _request_stream(self): + self._is_streaming = True + if self._generic: + response_stream = self._stub.inline_stream_stream( + 'grpc.testing.BenchmarkService', 'StreamingCall', + self._request_generator(), _TIMEOUT) + else: + response_stream = self._stub.StreamingCall(self._request_generator(), + _TIMEOUT) + for _ in response_stream: + end_time = time.time() + self._handle_response(end_time - self._send_time_queue.get_nowait()) + + def _request_generator(self): + while self._is_streaming: + try: + request = self._request_queue.get(block=True, timeout=1.0) + yield request + except queue.Empty: + pass diff --git a/src/python/grpcio/tests/qps/benchmark_server.py b/src/python/grpcio/tests/qps/benchmark_server.py new file mode 100644 index 0000000000..8cbf480d58 --- /dev/null +++ b/src/python/grpcio/tests/qps/benchmark_server.py @@ -0,0 +1,58 @@ +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +from src.proto.grpc.testing import messages_pb2 +from src.proto.grpc.testing import services_pb2 + + +class BenchmarkServer(services_pb2.BetaBenchmarkServiceServicer): + """Synchronous Server implementation for the Benchmark service.""" + + def UnaryCall(self, request, context): + payload = messages_pb2.Payload(body='\0' * request.response_size) + return messages_pb2.SimpleResponse(payload=payload) + + def StreamingCall(self, request_iterator, context): + for request in request_iterator: + payload = messages_pb2.Payload(body='\0' * request.response_size) + yield messages_pb2.SimpleResponse(payload=payload) + + +class GenericBenchmarkServer(services_pb2.BetaBenchmarkServiceServicer): + """Generic Server implementation for the Benchmark service.""" + + def __init__(self, resp_size): + self._response = '\0' * resp_size + + def UnaryCall(self, request, context): + return self._response + + def StreamingCall(self, request_iterator, context): + for request in request_iterator: + yield self._response diff --git a/src/python/grpcio/tests/qps/client_runner.py b/src/python/grpcio/tests/qps/client_runner.py new file mode 100644 index 0000000000..a36c30ccc0 --- /dev/null +++ b/src/python/grpcio/tests/qps/client_runner.py @@ -0,0 +1,104 @@ +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Defines behavior for WHEN clients send requests. + +Each client exposes a non-blocking send_request() method that the +ClientRunner invokes either periodically or in response to some event. +""" + +import abc +import thread +import time + + +class ClientRunner: + """Abstract interface for sending requests from clients.""" + + __metaclass__ = abc.ABCMeta + + def __init__(self, client): + self._client = client + + @abc.abstractmethod + def start(self): + raise NotImplementedError() + + @abc.abstractmethod + def stop(self): + raise NotImplementedError() + + +class OpenLoopClientRunner(ClientRunner): + + def __init__(self, client, interval_generator): + super(OpenLoopClientRunner, self).__init__(client) + self._is_running = False + self._interval_generator = interval_generator + + def start(self): + self._is_running = True + self._client.start() + thread.start_new_thread(self._dispatch_requests, ()) + + def stop(self): + self._is_running = False + self._client.stop() + self._client = None + + def _dispatch_requests(self): + while self._is_running: + self._client.send_request() + time.sleep(next(self._interval_generator)) + + +class ClosedLoopClientRunner(ClientRunner): + + def __init__(self, client, request_count): + super(ClosedLoopClientRunner, self).__init__(client) + self._is_running = False + self._request_count = request_count + # Send a new request on each response for closed loop + self._client.add_response_callback(self._send_request) + + def start(self): + self._is_running = True + for _ in xrange(self._request_count): + self._client.send_request() + self._client.start() + + def stop(self): + self._is_running = False + self._client.stop() + self._client = None + + def _send_request(self, response_time): + if self._is_running: + self._client.send_request() + diff --git a/src/python/grpcio/tests/qps/histogram.py b/src/python/grpcio/tests/qps/histogram.py new file mode 100644 index 0000000000..9a7b5eb2ba --- /dev/null +++ b/src/python/grpcio/tests/qps/histogram.py @@ -0,0 +1,85 @@ +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import math +import threading + +from src.proto.grpc.testing import stats_pb2 + + +class Histogram(object): + """Histogram class used for recording performance testing data. + + This class is thread safe. + """ + + def __init__(self, resolution, max_possible): + self._lock = threading.Lock() + self._resolution = resolution + self._max_possible = max_possible + self._sum = 0 + self._sum_of_squares = 0 + self.multiplier = 1.0 + self._resolution + self._count = 0 + self._min = self._max_possible + self._max = 0 + self._buckets = [0] * (self._bucket_for(self._max_possible) + 1) + + def reset(self): + with self._lock: + self._sum = 0 + self._sum_of_squares = 0 + self._count = 0 + self._min = self._max_possible + self._max = 0 + self._buckets = [0] * (self._bucket_for(self._max_possible) + 1) + + def add(self, val): + with self._lock: + self._sum += val + self._sum_of_squares += val * val + self._count += 1 + self._min = min(self._min, val) + self._max = max(self._max, val) + self._buckets[self._bucket_for(val)] += 1 + + def get_data(self): + with self._lock: + data = stats_pb2.HistogramData() + data.bucket.extend(self._buckets) + data.min_seen = self._min + data.max_seen = self._max + data.sum = self._sum + data.sum_of_squares = self._sum_of_squares + data.count = self._count + return data + + def _bucket_for(self, val): + val = min(val, self._max_possible) + return int(math.log(val, self.multiplier)) diff --git a/src/python/grpcio/tests/qps/qps_worker.py b/src/python/grpcio/tests/qps/qps_worker.py new file mode 100644 index 0000000000..3dda718638 --- /dev/null +++ b/src/python/grpcio/tests/qps/qps_worker.py @@ -0,0 +1,60 @@ +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""The entry point for the qps worker.""" + +import argparse +import time + +from src.proto.grpc.testing import services_pb2 + +from tests.qps import worker_server + + +def run_worker_server(port): + servicer = worker_server.WorkerServer() + server = services_pb2.beta_create_WorkerService_server(servicer) + server.add_insecure_port('[::]:{}'.format(port)) + server.start() + servicer.wait_for_quit() + # Drain outstanding requests for clean exit + time.sleep(2) + server.stop(0) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description='gRPC Python performance testing worker') + parser.add_argument('--driver_port', + type=int, + dest='port', + help='The port the worker should listen on') + args = parser.parse_args() + + run_worker_server(args.port) diff --git a/src/python/grpcio/tests/qps/worker_server.py b/src/python/grpcio/tests/qps/worker_server.py new file mode 100644 index 0000000000..0b3acc14e7 --- /dev/null +++ b/src/python/grpcio/tests/qps/worker_server.py @@ -0,0 +1,184 @@ +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import multiprocessing +import random +import threading +import time + +from grpc.beta import implementations +from grpc.framework.interfaces.face import utilities +from src.proto.grpc.testing import control_pb2 +from src.proto.grpc.testing import services_pb2 +from src.proto.grpc.testing import stats_pb2 + +from tests.qps import benchmark_client +from tests.qps import benchmark_server +from tests.qps import client_runner +from tests.qps import histogram +from tests.unit import resources + + +class WorkerServer(services_pb2.BetaWorkerServiceServicer): + """Python Worker Server implementation.""" + + def __init__(self): + self._quit_event = threading.Event() + + def RunServer(self, request_iterator, context): + config = next(request_iterator).setup + server, port = self._create_server(config) + cores = multiprocessing.cpu_count() + server.start() + start_time = time.time() + yield self._get_server_status(start_time, start_time, port, cores) + + for request in request_iterator: + end_time = time.time() + status = self._get_server_status(start_time, end_time, port, cores) + if request.mark.reset: + start_time = end_time + yield status + server.stop(0) + + def _get_server_status(self, start_time, end_time, port, cores): + end_time = time.time() + elapsed_time = end_time - start_time + stats = stats_pb2.ServerStats(time_elapsed=elapsed_time, + time_user=elapsed_time, + time_system=elapsed_time) + return control_pb2.ServerStatus(stats=stats, port=port, cores=cores) + + def _create_server(self, config): + if config.server_type == control_pb2.SYNC_SERVER: + servicer = benchmark_server.BenchmarkServer() + server = services_pb2.beta_create_BenchmarkService_server(servicer) + elif config.server_type == control_pb2.ASYNC_GENERIC_SERVER: + resp_size = config.payload_config.bytebuf_params.resp_size + servicer = benchmark_server.GenericBenchmarkServer(resp_size) + method_implementations = { + ('grpc.testing.BenchmarkService', 'StreamingCall'): + utilities.stream_stream_inline(servicer.StreamingCall), + ('grpc.testing.BenchmarkService', 'UnaryCall'): + utilities.unary_unary_inline(servicer.UnaryCall), + } + server = implementations.server(method_implementations) + else: + raise Exception('Unsupported server type {}'.format(config.server_type)) + + if config.HasField('security_params'): # Use SSL + server_creds = implementations.ssl_server_credentials([( + resources.private_key(), resources.certificate_chain())]) + port = server.add_secure_port('[::]:{}'.format(config.port), server_creds) + else: + port = server.add_insecure_port('[::]:{}'.format(config.port)) + + return (server, port) + + def RunClient(self, request_iterator, context): + config = next(request_iterator).setup + client_runners = [] + qps_data = histogram.Histogram(config.histogram_params.resolution, + config.histogram_params.max_possible) + start_time = time.time() + + # Create a client for each channel + for i in xrange(config.client_channels): + server = config.server_targets[i % len(config.server_targets)] + runner = self._create_client_runner(server, config, qps_data) + client_runners.append(runner) + runner.start() + + end_time = time.time() + yield self._get_client_status(start_time, end_time, qps_data) + + # Respond to stat requests + for request in request_iterator: + end_time = time.time() + status = self._get_client_status(start_time, end_time, qps_data) + if request.mark.reset: + qps_data.reset() + start_time = time.time() + yield status + + # Cleanup the clients + for runner in client_runners: + runner.stop() + + def _get_client_status(self, start_time, end_time, qps_data): + latencies = qps_data.get_data() + end_time = time.time() + elapsed_time = end_time - start_time + stats = stats_pb2.ClientStats(latencies=latencies, + time_elapsed=elapsed_time, + time_user=elapsed_time, + time_system=elapsed_time) + return control_pb2.ClientStatus(stats=stats) + + def _create_client_runner(self, server, config, qps_data): + if config.client_type == control_pb2.SYNC_CLIENT: + if config.rpc_type == control_pb2.UNARY: + client = benchmark_client.UnarySyncBenchmarkClient( + server, config, qps_data) + else: + raise Exception('STREAMING SYNC client not supported') + elif config.client_type == control_pb2.ASYNC_CLIENT: + if config.rpc_type == control_pb2.UNARY: + client = benchmark_client.UnaryAsyncBenchmarkClient( + server, config, qps_data) + elif config.rpc_type == control_pb2.STREAMING: + client = benchmark_client.StreamingAsyncBenchmarkClient( + server, config, qps_data) + else: + raise Exception('Unsupported client type {}'.format(config.client_type)) + + # In multi-channel tests, we split the load across all channels + load_factor = float(config.client_channels) + if config.load_params.WhichOneof('load') == 'closed_loop': + runner = client_runner.ClosedLoopClientRunner( + client, config.outstanding_rpcs_per_channel) + else: # Open loop Poisson + alpha = config.load_params.poisson.offered_load / load_factor + def poisson(): + while True: + yield random.expovariate(alpha) + + runner = client_runner.OpenLoopClientRunner(client, poisson()) + + return runner + + def CoreCount(self, request, context): + return control_pb2.CoreResponse(cores=multiprocessing.cpu_count()) + + def QuitWorker(self, request, context): + self._quit_event.set() + return control_pb2.Void() + + def wait_for_quit(self): + self._quit_event.wait() |