aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio/tests/qps/worker_server.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/grpcio/tests/qps/worker_server.py')
-rw-r--r--src/python/grpcio/tests/qps/worker_server.py184
1 files changed, 184 insertions, 0 deletions
diff --git a/src/python/grpcio/tests/qps/worker_server.py b/src/python/grpcio/tests/qps/worker_server.py
new file mode 100644
index 0000000000..0b3acc14e7
--- /dev/null
+++ b/src/python/grpcio/tests/qps/worker_server.py
@@ -0,0 +1,184 @@
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import multiprocessing
+import random
+import threading
+import time
+
+from grpc.beta import implementations
+from grpc.framework.interfaces.face import utilities
+from src.proto.grpc.testing import control_pb2
+from src.proto.grpc.testing import services_pb2
+from src.proto.grpc.testing import stats_pb2
+
+from tests.qps import benchmark_client
+from tests.qps import benchmark_server
+from tests.qps import client_runner
+from tests.qps import histogram
+from tests.unit import resources
+
+
+class WorkerServer(services_pb2.BetaWorkerServiceServicer):
+ """Python Worker Server implementation."""
+
+ def __init__(self):
+ self._quit_event = threading.Event()
+
+ def RunServer(self, request_iterator, context):
+ config = next(request_iterator).setup
+ server, port = self._create_server(config)
+ cores = multiprocessing.cpu_count()
+ server.start()
+ start_time = time.time()
+ yield self._get_server_status(start_time, start_time, port, cores)
+
+ for request in request_iterator:
+ end_time = time.time()
+ status = self._get_server_status(start_time, end_time, port, cores)
+ if request.mark.reset:
+ start_time = end_time
+ yield status
+ server.stop(0)
+
+ def _get_server_status(self, start_time, end_time, port, cores):
+ end_time = time.time()
+ elapsed_time = end_time - start_time
+ stats = stats_pb2.ServerStats(time_elapsed=elapsed_time,
+ time_user=elapsed_time,
+ time_system=elapsed_time)
+ return control_pb2.ServerStatus(stats=stats, port=port, cores=cores)
+
+ def _create_server(self, config):
+ if config.server_type == control_pb2.SYNC_SERVER:
+ servicer = benchmark_server.BenchmarkServer()
+ server = services_pb2.beta_create_BenchmarkService_server(servicer)
+ elif config.server_type == control_pb2.ASYNC_GENERIC_SERVER:
+ resp_size = config.payload_config.bytebuf_params.resp_size
+ servicer = benchmark_server.GenericBenchmarkServer(resp_size)
+ method_implementations = {
+ ('grpc.testing.BenchmarkService', 'StreamingCall'):
+ utilities.stream_stream_inline(servicer.StreamingCall),
+ ('grpc.testing.BenchmarkService', 'UnaryCall'):
+ utilities.unary_unary_inline(servicer.UnaryCall),
+ }
+ server = implementations.server(method_implementations)
+ else:
+ raise Exception('Unsupported server type {}'.format(config.server_type))
+
+ if config.HasField('security_params'): # Use SSL
+ server_creds = implementations.ssl_server_credentials([(
+ resources.private_key(), resources.certificate_chain())])
+ port = server.add_secure_port('[::]:{}'.format(config.port), server_creds)
+ else:
+ port = server.add_insecure_port('[::]:{}'.format(config.port))
+
+ return (server, port)
+
+ def RunClient(self, request_iterator, context):
+ config = next(request_iterator).setup
+ client_runners = []
+ qps_data = histogram.Histogram(config.histogram_params.resolution,
+ config.histogram_params.max_possible)
+ start_time = time.time()
+
+ # Create a client for each channel
+ for i in xrange(config.client_channels):
+ server = config.server_targets[i % len(config.server_targets)]
+ runner = self._create_client_runner(server, config, qps_data)
+ client_runners.append(runner)
+ runner.start()
+
+ end_time = time.time()
+ yield self._get_client_status(start_time, end_time, qps_data)
+
+ # Respond to stat requests
+ for request in request_iterator:
+ end_time = time.time()
+ status = self._get_client_status(start_time, end_time, qps_data)
+ if request.mark.reset:
+ qps_data.reset()
+ start_time = time.time()
+ yield status
+
+ # Cleanup the clients
+ for runner in client_runners:
+ runner.stop()
+
+ def _get_client_status(self, start_time, end_time, qps_data):
+ latencies = qps_data.get_data()
+ end_time = time.time()
+ elapsed_time = end_time - start_time
+ stats = stats_pb2.ClientStats(latencies=latencies,
+ time_elapsed=elapsed_time,
+ time_user=elapsed_time,
+ time_system=elapsed_time)
+ return control_pb2.ClientStatus(stats=stats)
+
+ def _create_client_runner(self, server, config, qps_data):
+ if config.client_type == control_pb2.SYNC_CLIENT:
+ if config.rpc_type == control_pb2.UNARY:
+ client = benchmark_client.UnarySyncBenchmarkClient(
+ server, config, qps_data)
+ else:
+ raise Exception('STREAMING SYNC client not supported')
+ elif config.client_type == control_pb2.ASYNC_CLIENT:
+ if config.rpc_type == control_pb2.UNARY:
+ client = benchmark_client.UnaryAsyncBenchmarkClient(
+ server, config, qps_data)
+ elif config.rpc_type == control_pb2.STREAMING:
+ client = benchmark_client.StreamingAsyncBenchmarkClient(
+ server, config, qps_data)
+ else:
+ raise Exception('Unsupported client type {}'.format(config.client_type))
+
+ # In multi-channel tests, we split the load across all channels
+ load_factor = float(config.client_channels)
+ if config.load_params.WhichOneof('load') == 'closed_loop':
+ runner = client_runner.ClosedLoopClientRunner(
+ client, config.outstanding_rpcs_per_channel)
+ else: # Open loop Poisson
+ alpha = config.load_params.poisson.offered_load / load_factor
+ def poisson():
+ while True:
+ yield random.expovariate(alpha)
+
+ runner = client_runner.OpenLoopClientRunner(client, poisson())
+
+ return runner
+
+ def CoreCount(self, request, context):
+ return control_pb2.CoreResponse(cores=multiprocessing.cpu_count())
+
+ def QuitWorker(self, request, context):
+ self._quit_event.set()
+ return control_pb2.Void()
+
+ def wait_for_quit(self):
+ self._quit_event.wait()