aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio/tests/qps
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/grpcio/tests/qps')
-rw-r--r--src/python/grpcio/tests/qps/__init__.py28
-rw-r--r--src/python/grpcio/tests/qps/benchmark_client.py216
-rw-r--r--src/python/grpcio/tests/qps/benchmark_server.py58
-rw-r--r--src/python/grpcio/tests/qps/client_runner.py106
-rw-r--r--src/python/grpcio/tests/qps/histogram.py85
-rw-r--r--src/python/grpcio/tests/qps/qps_worker.py58
-rw-r--r--src/python/grpcio/tests/qps/worker_server.py184
7 files changed, 0 insertions, 735 deletions
diff --git a/src/python/grpcio/tests/qps/__init__.py b/src/python/grpcio/tests/qps/__init__.py
deleted file mode 100644
index 100a624dc9..0000000000
--- a/src/python/grpcio/tests/qps/__init__.py
+++ /dev/null
@@ -1,28 +0,0 @@
-# Copyright 2016, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/src/python/grpcio/tests/qps/benchmark_client.py b/src/python/grpcio/tests/qps/benchmark_client.py
deleted file mode 100644
index 080281415d..0000000000
--- a/src/python/grpcio/tests/qps/benchmark_client.py
+++ /dev/null
@@ -1,216 +0,0 @@
-# Copyright 2016, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Defines test client behaviors (UNARY/STREAMING) (SYNC/ASYNC)."""
-
-import abc
-import threading
-import time
-
-from concurrent import futures
-from six.moves import queue
-
-import grpc
-from grpc.beta import implementations
-from grpc.framework.interfaces.face import face
-from src.proto.grpc.testing import messages_pb2
-from src.proto.grpc.testing import services_pb2
-from tests.unit import resources
-from tests.unit.beta import test_utilities
-
-_TIMEOUT = 60 * 60 * 24
-
-
-class BenchmarkClient:
- """Benchmark client interface that exposes a non-blocking send_request()."""
-
- __metaclass__ = abc.ABCMeta
-
- def __init__(self, server, config, hist):
- # Create the stub
- host, port = server.split(':')
- port = int(port)
- if config.HasField('security_params'):
- creds = implementations.ssl_channel_credentials(
- resources.test_root_certificates())
- channel = test_utilities.not_really_secure_channel(
- host, port, creds, config.security_params.server_host_override)
- else:
- channel = implementations.insecure_channel(host, port)
-
- connected_event = threading.Event()
- def wait_for_ready(connectivity):
- if connectivity == grpc.ChannelConnectivity.READY:
- connected_event.set()
- channel.subscribe(wait_for_ready, try_to_connect=True)
- connected_event.wait()
-
- if config.payload_config.WhichOneof('payload') == 'simple_params':
- self._generic = False
- self._stub = services_pb2.beta_create_BenchmarkService_stub(channel)
- payload = messages_pb2.Payload(
- body='\0' * config.payload_config.simple_params.req_size)
- self._request = messages_pb2.SimpleRequest(
- payload=payload,
- response_size=config.payload_config.simple_params.resp_size)
- else:
- self._generic = True
- self._stub = implementations.generic_stub(channel)
- self._request = '\0' * config.payload_config.bytebuf_params.req_size
-
- self._hist = hist
- self._response_callbacks = []
-
- def add_response_callback(self, callback):
- """callback will be invoked as callback(client, query_time)"""
- self._response_callbacks.append(callback)
-
- @abc.abstractmethod
- def send_request(self):
- """Non-blocking wrapper for a client's request operation."""
- raise NotImplementedError()
-
- def start(self):
- pass
-
- def stop(self):
- pass
-
- def _handle_response(self, client, query_time):
- self._hist.add(query_time * 1e9) # Report times in nanoseconds
- for callback in self._response_callbacks:
- callback(client, query_time)
-
-
-class UnarySyncBenchmarkClient(BenchmarkClient):
-
- def __init__(self, server, config, hist):
- super(UnarySyncBenchmarkClient, self).__init__(server, config, hist)
- self._pool = futures.ThreadPoolExecutor(
- max_workers=config.outstanding_rpcs_per_channel)
-
- def send_request(self):
- # Send requests in seperate threads to support multiple outstanding rpcs
- # (See src/proto/grpc/testing/control.proto)
- self._pool.submit(self._dispatch_request)
-
- def stop(self):
- self._pool.shutdown(wait=True)
- self._stub = None
-
- def _dispatch_request(self):
- start_time = time.time()
- self._stub.UnaryCall(self._request, _TIMEOUT)
- end_time = time.time()
- self._handle_response(self, end_time - start_time)
-
-
-class UnaryAsyncBenchmarkClient(BenchmarkClient):
-
- def send_request(self):
- # Use the Future callback api to support multiple outstanding rpcs
- start_time = time.time()
- response_future = self._stub.UnaryCall.future(self._request, _TIMEOUT)
- response_future.add_done_callback(
- lambda resp: self._response_received(start_time, resp))
-
- def _response_received(self, start_time, resp):
- resp.result()
- end_time = time.time()
- self._handle_response(self, end_time - start_time)
-
- def stop(self):
- self._stub = None
-
-
-class _SyncStream(object):
-
- def __init__(self, stub, generic, request, handle_response):
- self._stub = stub
- self._generic = generic
- self._request = request
- self._handle_response = handle_response
- self._is_streaming = False
- self._request_queue = queue.Queue()
- self._send_time_queue = queue.Queue()
-
- def send_request(self):
- self._send_time_queue.put(time.time())
- self._request_queue.put(self._request)
-
- def start(self):
- self._is_streaming = True
- if self._generic:
- stream_callable = self._stub.stream_stream(
- 'grpc.testing.BenchmarkService', 'StreamingCall')
- else:
- stream_callable = self._stub.StreamingCall
-
- response_stream = stream_callable(self._request_generator(), _TIMEOUT)
- for _ in response_stream:
- self._handle_response(
- self, time.time() - self._send_time_queue.get_nowait())
-
- def stop(self):
- self._is_streaming = False
-
- def _request_generator(self):
- while self._is_streaming:
- try:
- request = self._request_queue.get(block=True, timeout=1.0)
- yield request
- except queue.Empty:
- pass
-
-
-class StreamingSyncBenchmarkClient(BenchmarkClient):
-
- def __init__(self, server, config, hist):
- super(StreamingSyncBenchmarkClient, self).__init__(server, config, hist)
- self._pool = futures.ThreadPoolExecutor(
- max_workers=config.outstanding_rpcs_per_channel)
- self._streams = [_SyncStream(self._stub, self._generic,
- self._request, self._handle_response)
- for _ in xrange(config.outstanding_rpcs_per_channel)]
- self._curr_stream = 0
-
- def send_request(self):
- # Use a round_robin scheduler to determine what stream to send on
- self._streams[self._curr_stream].send_request()
- self._curr_stream = (self._curr_stream + 1) % len(self._streams)
-
- def start(self):
- for stream in self._streams:
- self._pool.submit(stream.start)
-
- def stop(self):
- for stream in self._streams:
- stream.stop()
- self._pool.shutdown(wait=True)
- self._stub = None
diff --git a/src/python/grpcio/tests/qps/benchmark_server.py b/src/python/grpcio/tests/qps/benchmark_server.py
deleted file mode 100644
index 8cbf480d58..0000000000
--- a/src/python/grpcio/tests/qps/benchmark_server.py
+++ /dev/null
@@ -1,58 +0,0 @@
-# Copyright 2016, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-from src.proto.grpc.testing import messages_pb2
-from src.proto.grpc.testing import services_pb2
-
-
-class BenchmarkServer(services_pb2.BetaBenchmarkServiceServicer):
- """Synchronous Server implementation for the Benchmark service."""
-
- def UnaryCall(self, request, context):
- payload = messages_pb2.Payload(body='\0' * request.response_size)
- return messages_pb2.SimpleResponse(payload=payload)
-
- def StreamingCall(self, request_iterator, context):
- for request in request_iterator:
- payload = messages_pb2.Payload(body='\0' * request.response_size)
- yield messages_pb2.SimpleResponse(payload=payload)
-
-
-class GenericBenchmarkServer(services_pb2.BetaBenchmarkServiceServicer):
- """Generic Server implementation for the Benchmark service."""
-
- def __init__(self, resp_size):
- self._response = '\0' * resp_size
-
- def UnaryCall(self, request, context):
- return self._response
-
- def StreamingCall(self, request_iterator, context):
- for request in request_iterator:
- yield self._response
diff --git a/src/python/grpcio/tests/qps/client_runner.py b/src/python/grpcio/tests/qps/client_runner.py
deleted file mode 100644
index 1fd58687ad..0000000000
--- a/src/python/grpcio/tests/qps/client_runner.py
+++ /dev/null
@@ -1,106 +0,0 @@
-# Copyright 2016, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Defines behavior for WHEN clients send requests.
-
-Each client exposes a non-blocking send_request() method that the
-ClientRunner invokes either periodically or in response to some event.
-"""
-
-import abc
-import threading
-import time
-
-
-class ClientRunner:
- """Abstract interface for sending requests from clients."""
-
- __metaclass__ = abc.ABCMeta
-
- def __init__(self, client):
- self._client = client
-
- @abc.abstractmethod
- def start(self):
- raise NotImplementedError()
-
- @abc.abstractmethod
- def stop(self):
- raise NotImplementedError()
-
-
-class OpenLoopClientRunner(ClientRunner):
-
- def __init__(self, client, interval_generator):
- super(OpenLoopClientRunner, self).__init__(client)
- self._is_running = False
- self._interval_generator = interval_generator
- self._dispatch_thread = threading.Thread(
- target=self._dispatch_requests, args=())
-
- def start(self):
- self._is_running = True
- self._client.start()
- self._dispatch_thread.start()
-
- def stop(self):
- self._is_running = False
- self._client.stop()
- self._dispatch_thread.join()
- self._client = None
-
- def _dispatch_requests(self):
- while self._is_running:
- self._client.send_request()
- time.sleep(next(self._interval_generator))
-
-
-class ClosedLoopClientRunner(ClientRunner):
-
- def __init__(self, client, request_count):
- super(ClosedLoopClientRunner, self).__init__(client)
- self._is_running = False
- self._request_count = request_count
- # Send a new request on each response for closed loop
- self._client.add_response_callback(self._send_request)
-
- def start(self):
- self._is_running = True
- self._client.start()
- for _ in xrange(self._request_count):
- self._client.send_request()
-
- def stop(self):
- self._is_running = False
- self._client.stop()
- self._client = None
-
- def _send_request(self, client, response_time):
- if self._is_running:
- client.send_request()
diff --git a/src/python/grpcio/tests/qps/histogram.py b/src/python/grpcio/tests/qps/histogram.py
deleted file mode 100644
index 9a7b5eb2ba..0000000000
--- a/src/python/grpcio/tests/qps/histogram.py
+++ /dev/null
@@ -1,85 +0,0 @@
-# Copyright 2016, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-import math
-import threading
-
-from src.proto.grpc.testing import stats_pb2
-
-
-class Histogram(object):
- """Histogram class used for recording performance testing data.
-
- This class is thread safe.
- """
-
- def __init__(self, resolution, max_possible):
- self._lock = threading.Lock()
- self._resolution = resolution
- self._max_possible = max_possible
- self._sum = 0
- self._sum_of_squares = 0
- self.multiplier = 1.0 + self._resolution
- self._count = 0
- self._min = self._max_possible
- self._max = 0
- self._buckets = [0] * (self._bucket_for(self._max_possible) + 1)
-
- def reset(self):
- with self._lock:
- self._sum = 0
- self._sum_of_squares = 0
- self._count = 0
- self._min = self._max_possible
- self._max = 0
- self._buckets = [0] * (self._bucket_for(self._max_possible) + 1)
-
- def add(self, val):
- with self._lock:
- self._sum += val
- self._sum_of_squares += val * val
- self._count += 1
- self._min = min(self._min, val)
- self._max = max(self._max, val)
- self._buckets[self._bucket_for(val)] += 1
-
- def get_data(self):
- with self._lock:
- data = stats_pb2.HistogramData()
- data.bucket.extend(self._buckets)
- data.min_seen = self._min
- data.max_seen = self._max
- data.sum = self._sum
- data.sum_of_squares = self._sum_of_squares
- data.count = self._count
- return data
-
- def _bucket_for(self, val):
- val = min(val, self._max_possible)
- return int(math.log(val, self.multiplier))
diff --git a/src/python/grpcio/tests/qps/qps_worker.py b/src/python/grpcio/tests/qps/qps_worker.py
deleted file mode 100644
index 16926379a5..0000000000
--- a/src/python/grpcio/tests/qps/qps_worker.py
+++ /dev/null
@@ -1,58 +0,0 @@
-# Copyright 2016, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""The entry point for the qps worker."""
-
-import argparse
-import time
-
-from src.proto.grpc.testing import services_pb2
-
-from tests.qps import worker_server
-
-
-def run_worker_server(port):
- servicer = worker_server.WorkerServer()
- server = services_pb2.beta_create_WorkerService_server(servicer)
- server.add_insecure_port('[::]:{}'.format(port))
- server.start()
- servicer.wait_for_quit()
- server.stop(2)
-
-
-if __name__ == '__main__':
- parser = argparse.ArgumentParser(
- description='gRPC Python performance testing worker')
- parser.add_argument('--driver_port',
- type=int,
- dest='port',
- help='The port the worker should listen on')
- args = parser.parse_args()
-
- run_worker_server(args.port)
diff --git a/src/python/grpcio/tests/qps/worker_server.py b/src/python/grpcio/tests/qps/worker_server.py
deleted file mode 100644
index d41f8377c2..0000000000
--- a/src/python/grpcio/tests/qps/worker_server.py
+++ /dev/null
@@ -1,184 +0,0 @@
-# Copyright 2016, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-import multiprocessing
-import random
-import threading
-import time
-
-from grpc.beta import implementations
-from grpc.framework.interfaces.face import utilities
-from src.proto.grpc.testing import control_pb2
-from src.proto.grpc.testing import services_pb2
-from src.proto.grpc.testing import stats_pb2
-
-from tests.qps import benchmark_client
-from tests.qps import benchmark_server
-from tests.qps import client_runner
-from tests.qps import histogram
-from tests.unit import resources
-
-
-class WorkerServer(services_pb2.BetaWorkerServiceServicer):
- """Python Worker Server implementation."""
-
- def __init__(self):
- self._quit_event = threading.Event()
-
- def RunServer(self, request_iterator, context):
- config = next(request_iterator).setup
- server, port = self._create_server(config)
- cores = multiprocessing.cpu_count()
- server.start()
- start_time = time.time()
- yield self._get_server_status(start_time, start_time, port, cores)
-
- for request in request_iterator:
- end_time = time.time()
- status = self._get_server_status(start_time, end_time, port, cores)
- if request.mark.reset:
- start_time = end_time
- yield status
- server.stop(0)
-
- def _get_server_status(self, start_time, end_time, port, cores):
- end_time = time.time()
- elapsed_time = end_time - start_time
- stats = stats_pb2.ServerStats(time_elapsed=elapsed_time,
- time_user=elapsed_time,
- time_system=elapsed_time)
- return control_pb2.ServerStatus(stats=stats, port=port, cores=cores)
-
- def _create_server(self, config):
- if config.server_type == control_pb2.SYNC_SERVER:
- servicer = benchmark_server.BenchmarkServer()
- server = services_pb2.beta_create_BenchmarkService_server(servicer)
- elif config.server_type == control_pb2.ASYNC_GENERIC_SERVER:
- resp_size = config.payload_config.bytebuf_params.resp_size
- servicer = benchmark_server.GenericBenchmarkServer(resp_size)
- method_implementations = {
- ('grpc.testing.BenchmarkService', 'StreamingCall'):
- utilities.stream_stream_inline(servicer.StreamingCall),
- ('grpc.testing.BenchmarkService', 'UnaryCall'):
- utilities.unary_unary_inline(servicer.UnaryCall),
- }
- server = implementations.server(method_implementations)
- else:
- raise Exception('Unsupported server type {}'.format(config.server_type))
-
- if config.HasField('security_params'): # Use SSL
- server_creds = implementations.ssl_server_credentials([(
- resources.private_key(), resources.certificate_chain())])
- port = server.add_secure_port('[::]:{}'.format(config.port), server_creds)
- else:
- port = server.add_insecure_port('[::]:{}'.format(config.port))
-
- return (server, port)
-
- def RunClient(self, request_iterator, context):
- config = next(request_iterator).setup
- client_runners = []
- qps_data = histogram.Histogram(config.histogram_params.resolution,
- config.histogram_params.max_possible)
- start_time = time.time()
-
- # Create a client for each channel
- for i in xrange(config.client_channels):
- server = config.server_targets[i % len(config.server_targets)]
- runner = self._create_client_runner(server, config, qps_data)
- client_runners.append(runner)
- runner.start()
-
- end_time = time.time()
- yield self._get_client_status(start_time, end_time, qps_data)
-
- # Respond to stat requests
- for request in request_iterator:
- end_time = time.time()
- status = self._get_client_status(start_time, end_time, qps_data)
- if request.mark.reset:
- qps_data.reset()
- start_time = time.time()
- yield status
-
- # Cleanup the clients
- for runner in client_runners:
- runner.stop()
-
- def _get_client_status(self, start_time, end_time, qps_data):
- latencies = qps_data.get_data()
- end_time = time.time()
- elapsed_time = end_time - start_time
- stats = stats_pb2.ClientStats(latencies=latencies,
- time_elapsed=elapsed_time,
- time_user=elapsed_time,
- time_system=elapsed_time)
- return control_pb2.ClientStatus(stats=stats)
-
- def _create_client_runner(self, server, config, qps_data):
- if config.client_type == control_pb2.SYNC_CLIENT:
- if config.rpc_type == control_pb2.UNARY:
- client = benchmark_client.UnarySyncBenchmarkClient(
- server, config, qps_data)
- elif config.rpc_type == control_pb2.STREAMING:
- client = benchmark_client.StreamingSyncBenchmarkClient(
- server, config, qps_data)
- elif config.client_type == control_pb2.ASYNC_CLIENT:
- if config.rpc_type == control_pb2.UNARY:
- client = benchmark_client.UnaryAsyncBenchmarkClient(
- server, config, qps_data)
- else:
- raise Exception('Async streaming client not supported')
- else:
- raise Exception('Unsupported client type {}'.format(config.client_type))
-
- # In multi-channel tests, we split the load across all channels
- load_factor = float(config.client_channels)
- if config.load_params.WhichOneof('load') == 'closed_loop':
- runner = client_runner.ClosedLoopClientRunner(
- client, config.outstanding_rpcs_per_channel)
- else: # Open loop Poisson
- alpha = config.load_params.poisson.offered_load / load_factor
- def poisson():
- while True:
- yield random.expovariate(alpha)
-
- runner = client_runner.OpenLoopClientRunner(client, poisson())
-
- return runner
-
- def CoreCount(self, request, context):
- return control_pb2.CoreResponse(cores=multiprocessing.cpu_count())
-
- def QuitWorker(self, request, context):
- self._quit_event.set()
- return control_pb2.Void()
-
- def wait_for_quit(self):
- self._quit_event.wait()