aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio
diff options
context:
space:
mode:
authorGravatar Ken Payson <kpayson@google.com>2016-04-19 12:08:34 -0700
committerGravatar Ken Payson <kpayson@google.com>2016-04-28 20:01:06 -0700
commit0482c1046b837be17c3a056af6569d84b8f52c23 (patch)
tree4efa2d6271958cf8449e45ddb93ab70d18476d98 /src/python/grpcio
parenta4e7ecab1a512bae1c40b9b301661ba24677b0ed (diff)
Python QPS Worker/initial scenarios
Diffstat (limited to 'src/python/grpcio')
-rw-r--r--src/python/grpcio/tests/qps/__init__.py28
-rw-r--r--src/python/grpcio/tests/qps/benchmark_client.py186
-rw-r--r--src/python/grpcio/tests/qps/benchmark_server.py58
-rw-r--r--src/python/grpcio/tests/qps/client_runner.py104
-rw-r--r--src/python/grpcio/tests/qps/histogram.py85
-rw-r--r--src/python/grpcio/tests/qps/qps_worker.py60
-rw-r--r--src/python/grpcio/tests/qps/worker_server.py184
7 files changed, 705 insertions, 0 deletions
diff --git a/src/python/grpcio/tests/qps/__init__.py b/src/python/grpcio/tests/qps/__init__.py
new file mode 100644
index 0000000000..100a624dc9
--- /dev/null
+++ b/src/python/grpcio/tests/qps/__init__.py
@@ -0,0 +1,28 @@
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/src/python/grpcio/tests/qps/benchmark_client.py b/src/python/grpcio/tests/qps/benchmark_client.py
new file mode 100644
index 0000000000..eed0b0c6da
--- /dev/null
+++ b/src/python/grpcio/tests/qps/benchmark_client.py
@@ -0,0 +1,186 @@
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Defines test client behaviors (UNARY/STREAMING) (SYNC/ASYNC)."""
+
+import abc
+import time
+try:
+ import Queue as queue # Python 2.x
+except ImportError:
+ import queue # Python 3
+
+from concurrent import futures
+
+from grpc.beta import implementations
+from src.proto.grpc.testing import messages_pb2
+from src.proto.grpc.testing import services_pb2
+from tests.unit import resources
+from tests.unit.beta import test_utilities
+
+_TIMEOUT = 60 * 60 * 24
+
+
+class BenchmarkClient:
+ """Benchmark client interface that exposes a non-blocking send_request()."""
+
+ __metaclass__ = abc.ABCMeta
+
+ def __init__(self, server, config, hist):
+ # Create the stub
+ host, port = server.split(':')
+ port = int(port)
+ if config.HasField('security_params'):
+ creds = implementations.ssl_channel_credentials(
+ resources.test_root_certificates())
+ channel = test_utilities.not_really_secure_channel(
+ host, port, creds, config.security_params.server_host_override)
+ else:
+ channel = implementations.insecure_channel(host, port)
+
+ if config.payload_config.WhichOneof('payload') == 'simple_params':
+ self._generic = False
+ self._stub = services_pb2.beta_create_BenchmarkService_stub(channel)
+ payload = messages_pb2.Payload(
+ body='\0' * config.payload_config.simple_params.req_size)
+ self._request = messages_pb2.SimpleRequest(
+ payload=payload,
+ response_size=config.payload_config.simple_params.resp_size)
+ else:
+ self._generic = True
+ self._stub = implementations.generic_stub(channel)
+ self._request = '\0' * config.payload_config.bytebuf_params.req_size
+
+ self._hist = hist
+ self._response_callbacks = []
+
+ def add_response_callback(self, callback):
+ self._response_callbacks.append(callback)
+
+ @abc.abstractmethod
+ def send_request(self):
+ """Non-blocking wrapper for a client's request operation."""
+ raise NotImplementedError()
+
+ def start(self):
+ pass
+
+ def stop(self):
+ pass
+
+ def _handle_response(self, query_time):
+ self._hist.add(query_time * 1e9) # Report times in nanoseconds
+ for callback in self._response_callbacks:
+ callback(query_time)
+
+
+class UnarySyncBenchmarkClient(BenchmarkClient):
+
+ def __init__(self, server, config, hist):
+ super(UnarySyncBenchmarkClient, self).__init__(server, config, hist)
+ self._pool = futures.ThreadPoolExecutor(
+ max_workers=config.outstanding_rpcs_per_channel)
+
+ def send_request(self):
+ # Send requests in seperate threads to support multiple outstanding rpcs
+ # (See src/proto/grpc/testing/control.proto)
+ self._pool.submit(self._dispatch_request)
+
+ def stop(self):
+ self._pool.shutdown(wait=True)
+ self._stub = None
+
+ def _dispatch_request(self):
+ start_time = time.time()
+ self._stub.UnaryCall(self._request, _TIMEOUT)
+ end_time = time.time()
+ self._handle_response(end_time - start_time)
+
+
+class UnaryAsyncBenchmarkClient(BenchmarkClient):
+
+ def send_request(self):
+ # Use the Future callback api to support multiple outstanding rpcs
+ start_time = time.time()
+ response_future = self._stub.UnaryCall.future(self._request, _TIMEOUT)
+ response_future.add_done_callback(
+ lambda resp: self._response_received(start_time, resp))
+
+ def _response_received(self, start_time, resp):
+ resp.result()
+ end_time = time.time()
+ self._handle_response(end_time - start_time)
+
+ def stop(self):
+ self._stub = None
+
+
+class StreamingAsyncBenchmarkClient(BenchmarkClient):
+
+ def __init__(self, server, config, hist):
+ super(StreamingAsyncBenchmarkClient, self).__init__(server, config, hist)
+ self._is_streaming = False
+ self._pool = futures.ThreadPoolExecutor(max_workers=1)
+ # Use a thread-safe queue to put requests on the stream
+ self._request_queue = queue.Queue()
+ self._send_time_queue = queue.Queue()
+
+ def send_request(self):
+ self._send_time_queue.put(time.time())
+ self._request_queue.put(self._request)
+
+ def start(self):
+ self._is_streaming = True
+ self._pool.submit(self._request_stream)
+
+ def stop(self):
+ self._is_streaming = False
+ self._pool.shutdown(wait=True)
+ self._stub = None
+
+ def _request_stream(self):
+ self._is_streaming = True
+ if self._generic:
+ response_stream = self._stub.inline_stream_stream(
+ 'grpc.testing.BenchmarkService', 'StreamingCall',
+ self._request_generator(), _TIMEOUT)
+ else:
+ response_stream = self._stub.StreamingCall(self._request_generator(),
+ _TIMEOUT)
+ for _ in response_stream:
+ end_time = time.time()
+ self._handle_response(end_time - self._send_time_queue.get_nowait())
+
+ def _request_generator(self):
+ while self._is_streaming:
+ try:
+ request = self._request_queue.get(block=True, timeout=1.0)
+ yield request
+ except queue.Empty:
+ pass
diff --git a/src/python/grpcio/tests/qps/benchmark_server.py b/src/python/grpcio/tests/qps/benchmark_server.py
new file mode 100644
index 0000000000..8cbf480d58
--- /dev/null
+++ b/src/python/grpcio/tests/qps/benchmark_server.py
@@ -0,0 +1,58 @@
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+from src.proto.grpc.testing import messages_pb2
+from src.proto.grpc.testing import services_pb2
+
+
+class BenchmarkServer(services_pb2.BetaBenchmarkServiceServicer):
+ """Synchronous Server implementation for the Benchmark service."""
+
+ def UnaryCall(self, request, context):
+ payload = messages_pb2.Payload(body='\0' * request.response_size)
+ return messages_pb2.SimpleResponse(payload=payload)
+
+ def StreamingCall(self, request_iterator, context):
+ for request in request_iterator:
+ payload = messages_pb2.Payload(body='\0' * request.response_size)
+ yield messages_pb2.SimpleResponse(payload=payload)
+
+
+class GenericBenchmarkServer(services_pb2.BetaBenchmarkServiceServicer):
+ """Generic Server implementation for the Benchmark service."""
+
+ def __init__(self, resp_size):
+ self._response = '\0' * resp_size
+
+ def UnaryCall(self, request, context):
+ return self._response
+
+ def StreamingCall(self, request_iterator, context):
+ for request in request_iterator:
+ yield self._response
diff --git a/src/python/grpcio/tests/qps/client_runner.py b/src/python/grpcio/tests/qps/client_runner.py
new file mode 100644
index 0000000000..a36c30ccc0
--- /dev/null
+++ b/src/python/grpcio/tests/qps/client_runner.py
@@ -0,0 +1,104 @@
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Defines behavior for WHEN clients send requests.
+
+Each client exposes a non-blocking send_request() method that the
+ClientRunner invokes either periodically or in response to some event.
+"""
+
+import abc
+import thread
+import time
+
+
+class ClientRunner:
+ """Abstract interface for sending requests from clients."""
+
+ __metaclass__ = abc.ABCMeta
+
+ def __init__(self, client):
+ self._client = client
+
+ @abc.abstractmethod
+ def start(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def stop(self):
+ raise NotImplementedError()
+
+
+class OpenLoopClientRunner(ClientRunner):
+
+ def __init__(self, client, interval_generator):
+ super(OpenLoopClientRunner, self).__init__(client)
+ self._is_running = False
+ self._interval_generator = interval_generator
+
+ def start(self):
+ self._is_running = True
+ self._client.start()
+ thread.start_new_thread(self._dispatch_requests, ())
+
+ def stop(self):
+ self._is_running = False
+ self._client.stop()
+ self._client = None
+
+ def _dispatch_requests(self):
+ while self._is_running:
+ self._client.send_request()
+ time.sleep(next(self._interval_generator))
+
+
+class ClosedLoopClientRunner(ClientRunner):
+
+ def __init__(self, client, request_count):
+ super(ClosedLoopClientRunner, self).__init__(client)
+ self._is_running = False
+ self._request_count = request_count
+ # Send a new request on each response for closed loop
+ self._client.add_response_callback(self._send_request)
+
+ def start(self):
+ self._is_running = True
+ for _ in xrange(self._request_count):
+ self._client.send_request()
+ self._client.start()
+
+ def stop(self):
+ self._is_running = False
+ self._client.stop()
+ self._client = None
+
+ def _send_request(self, response_time):
+ if self._is_running:
+ self._client.send_request()
+
diff --git a/src/python/grpcio/tests/qps/histogram.py b/src/python/grpcio/tests/qps/histogram.py
new file mode 100644
index 0000000000..9a7b5eb2ba
--- /dev/null
+++ b/src/python/grpcio/tests/qps/histogram.py
@@ -0,0 +1,85 @@
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import math
+import threading
+
+from src.proto.grpc.testing import stats_pb2
+
+
+class Histogram(object):
+ """Histogram class used for recording performance testing data.
+
+ This class is thread safe.
+ """
+
+ def __init__(self, resolution, max_possible):
+ self._lock = threading.Lock()
+ self._resolution = resolution
+ self._max_possible = max_possible
+ self._sum = 0
+ self._sum_of_squares = 0
+ self.multiplier = 1.0 + self._resolution
+ self._count = 0
+ self._min = self._max_possible
+ self._max = 0
+ self._buckets = [0] * (self._bucket_for(self._max_possible) + 1)
+
+ def reset(self):
+ with self._lock:
+ self._sum = 0
+ self._sum_of_squares = 0
+ self._count = 0
+ self._min = self._max_possible
+ self._max = 0
+ self._buckets = [0] * (self._bucket_for(self._max_possible) + 1)
+
+ def add(self, val):
+ with self._lock:
+ self._sum += val
+ self._sum_of_squares += val * val
+ self._count += 1
+ self._min = min(self._min, val)
+ self._max = max(self._max, val)
+ self._buckets[self._bucket_for(val)] += 1
+
+ def get_data(self):
+ with self._lock:
+ data = stats_pb2.HistogramData()
+ data.bucket.extend(self._buckets)
+ data.min_seen = self._min
+ data.max_seen = self._max
+ data.sum = self._sum
+ data.sum_of_squares = self._sum_of_squares
+ data.count = self._count
+ return data
+
+ def _bucket_for(self, val):
+ val = min(val, self._max_possible)
+ return int(math.log(val, self.multiplier))
diff --git a/src/python/grpcio/tests/qps/qps_worker.py b/src/python/grpcio/tests/qps/qps_worker.py
new file mode 100644
index 0000000000..3dda718638
--- /dev/null
+++ b/src/python/grpcio/tests/qps/qps_worker.py
@@ -0,0 +1,60 @@
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""The entry point for the qps worker."""
+
+import argparse
+import time
+
+from src.proto.grpc.testing import services_pb2
+
+from tests.qps import worker_server
+
+
+def run_worker_server(port):
+ servicer = worker_server.WorkerServer()
+ server = services_pb2.beta_create_WorkerService_server(servicer)
+ server.add_insecure_port('[::]:{}'.format(port))
+ server.start()
+ servicer.wait_for_quit()
+ # Drain outstanding requests for clean exit
+ time.sleep(2)
+ server.stop(0)
+
+
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser(
+ description='gRPC Python performance testing worker')
+ parser.add_argument('--driver_port',
+ type=int,
+ dest='port',
+ help='The port the worker should listen on')
+ args = parser.parse_args()
+
+ run_worker_server(args.port)
diff --git a/src/python/grpcio/tests/qps/worker_server.py b/src/python/grpcio/tests/qps/worker_server.py
new file mode 100644
index 0000000000..0b3acc14e7
--- /dev/null
+++ b/src/python/grpcio/tests/qps/worker_server.py
@@ -0,0 +1,184 @@
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import multiprocessing
+import random
+import threading
+import time
+
+from grpc.beta import implementations
+from grpc.framework.interfaces.face import utilities
+from src.proto.grpc.testing import control_pb2
+from src.proto.grpc.testing import services_pb2
+from src.proto.grpc.testing import stats_pb2
+
+from tests.qps import benchmark_client
+from tests.qps import benchmark_server
+from tests.qps import client_runner
+from tests.qps import histogram
+from tests.unit import resources
+
+
+class WorkerServer(services_pb2.BetaWorkerServiceServicer):
+ """Python Worker Server implementation."""
+
+ def __init__(self):
+ self._quit_event = threading.Event()
+
+ def RunServer(self, request_iterator, context):
+ config = next(request_iterator).setup
+ server, port = self._create_server(config)
+ cores = multiprocessing.cpu_count()
+ server.start()
+ start_time = time.time()
+ yield self._get_server_status(start_time, start_time, port, cores)
+
+ for request in request_iterator:
+ end_time = time.time()
+ status = self._get_server_status(start_time, end_time, port, cores)
+ if request.mark.reset:
+ start_time = end_time
+ yield status
+ server.stop(0)
+
+ def _get_server_status(self, start_time, end_time, port, cores):
+ end_time = time.time()
+ elapsed_time = end_time - start_time
+ stats = stats_pb2.ServerStats(time_elapsed=elapsed_time,
+ time_user=elapsed_time,
+ time_system=elapsed_time)
+ return control_pb2.ServerStatus(stats=stats, port=port, cores=cores)
+
+ def _create_server(self, config):
+ if config.server_type == control_pb2.SYNC_SERVER:
+ servicer = benchmark_server.BenchmarkServer()
+ server = services_pb2.beta_create_BenchmarkService_server(servicer)
+ elif config.server_type == control_pb2.ASYNC_GENERIC_SERVER:
+ resp_size = config.payload_config.bytebuf_params.resp_size
+ servicer = benchmark_server.GenericBenchmarkServer(resp_size)
+ method_implementations = {
+ ('grpc.testing.BenchmarkService', 'StreamingCall'):
+ utilities.stream_stream_inline(servicer.StreamingCall),
+ ('grpc.testing.BenchmarkService', 'UnaryCall'):
+ utilities.unary_unary_inline(servicer.UnaryCall),
+ }
+ server = implementations.server(method_implementations)
+ else:
+ raise Exception('Unsupported server type {}'.format(config.server_type))
+
+ if config.HasField('security_params'): # Use SSL
+ server_creds = implementations.ssl_server_credentials([(
+ resources.private_key(), resources.certificate_chain())])
+ port = server.add_secure_port('[::]:{}'.format(config.port), server_creds)
+ else:
+ port = server.add_insecure_port('[::]:{}'.format(config.port))
+
+ return (server, port)
+
+ def RunClient(self, request_iterator, context):
+ config = next(request_iterator).setup
+ client_runners = []
+ qps_data = histogram.Histogram(config.histogram_params.resolution,
+ config.histogram_params.max_possible)
+ start_time = time.time()
+
+ # Create a client for each channel
+ for i in xrange(config.client_channels):
+ server = config.server_targets[i % len(config.server_targets)]
+ runner = self._create_client_runner(server, config, qps_data)
+ client_runners.append(runner)
+ runner.start()
+
+ end_time = time.time()
+ yield self._get_client_status(start_time, end_time, qps_data)
+
+ # Respond to stat requests
+ for request in request_iterator:
+ end_time = time.time()
+ status = self._get_client_status(start_time, end_time, qps_data)
+ if request.mark.reset:
+ qps_data.reset()
+ start_time = time.time()
+ yield status
+
+ # Cleanup the clients
+ for runner in client_runners:
+ runner.stop()
+
+ def _get_client_status(self, start_time, end_time, qps_data):
+ latencies = qps_data.get_data()
+ end_time = time.time()
+ elapsed_time = end_time - start_time
+ stats = stats_pb2.ClientStats(latencies=latencies,
+ time_elapsed=elapsed_time,
+ time_user=elapsed_time,
+ time_system=elapsed_time)
+ return control_pb2.ClientStatus(stats=stats)
+
+ def _create_client_runner(self, server, config, qps_data):
+ if config.client_type == control_pb2.SYNC_CLIENT:
+ if config.rpc_type == control_pb2.UNARY:
+ client = benchmark_client.UnarySyncBenchmarkClient(
+ server, config, qps_data)
+ else:
+ raise Exception('STREAMING SYNC client not supported')
+ elif config.client_type == control_pb2.ASYNC_CLIENT:
+ if config.rpc_type == control_pb2.UNARY:
+ client = benchmark_client.UnaryAsyncBenchmarkClient(
+ server, config, qps_data)
+ elif config.rpc_type == control_pb2.STREAMING:
+ client = benchmark_client.StreamingAsyncBenchmarkClient(
+ server, config, qps_data)
+ else:
+ raise Exception('Unsupported client type {}'.format(config.client_type))
+
+ # In multi-channel tests, we split the load across all channels
+ load_factor = float(config.client_channels)
+ if config.load_params.WhichOneof('load') == 'closed_loop':
+ runner = client_runner.ClosedLoopClientRunner(
+ client, config.outstanding_rpcs_per_channel)
+ else: # Open loop Poisson
+ alpha = config.load_params.poisson.offered_load / load_factor
+ def poisson():
+ while True:
+ yield random.expovariate(alpha)
+
+ runner = client_runner.OpenLoopClientRunner(client, poisson())
+
+ return runner
+
+ def CoreCount(self, request, context):
+ return control_pb2.CoreResponse(cores=multiprocessing.cpu_count())
+
+ def QuitWorker(self, request, context):
+ self._quit_event.set()
+ return control_pb2.Void()
+
+ def wait_for_quit(self):
+ self._quit_event.wait()