aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio_tests/tests/qps
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-07-08 14:01:39 -0700
committerGravatar Craig Tiller <ctiller@google.com>2016-07-08 14:01:39 -0700
commit919175b6194e28476626b235ae4fe02caede0bcf (patch)
treebb39dc431fd237c3859800e46804a8e1e78623d0 /src/python/grpcio_tests/tests/qps
parentece4aaf578ac61e07b0a9a30a50b88425cdb7e25 (diff)
parent32d3fbe284ddd7e90b49cdf72349d661869969ca (diff)
Merge github.com:grpc/grpc into grand-unified-closures
Diffstat (limited to 'src/python/grpcio_tests/tests/qps')
-rw-r--r--src/python/grpcio_tests/tests/qps/benchmark_client.py37
-rw-r--r--src/python/grpcio_tests/tests/qps/benchmark_server.py4
-rw-r--r--src/python/grpcio_tests/tests/qps/qps_worker.py7
-rw-r--r--src/python/grpcio_tests/tests/qps/worker_server.py36
4 files changed, 48 insertions, 36 deletions
diff --git a/src/python/grpcio_tests/tests/qps/benchmark_client.py b/src/python/grpcio_tests/tests/qps/benchmark_client.py
index 080281415d..83b46c914e 100644
--- a/src/python/grpcio_tests/tests/qps/benchmark_client.py
+++ b/src/python/grpcio_tests/tests/qps/benchmark_client.py
@@ -37,16 +37,23 @@ from concurrent import futures
from six.moves import queue
import grpc
-from grpc.beta import implementations
-from grpc.framework.interfaces.face import face
from src.proto.grpc.testing import messages_pb2
from src.proto.grpc.testing import services_pb2
from tests.unit import resources
-from tests.unit.beta import test_utilities
+from tests.unit import test_common
_TIMEOUT = 60 * 60 * 24
+class GenericStub(object):
+
+ def __init__(self, channel):
+ self.UnaryCall = channel.unary_unary(
+ '/grpc.testing.BenchmarkService/UnaryCall')
+ self.StreamingCall = channel.stream_stream(
+ '/grpc.testing.BenchmarkService/StreamingCall')
+
+
class BenchmarkClient:
"""Benchmark client interface that exposes a non-blocking send_request()."""
@@ -54,15 +61,12 @@ class BenchmarkClient:
def __init__(self, server, config, hist):
# Create the stub
- host, port = server.split(':')
- port = int(port)
if config.HasField('security_params'):
- creds = implementations.ssl_channel_credentials(
- resources.test_root_certificates())
- channel = test_utilities.not_really_secure_channel(
- host, port, creds, config.security_params.server_host_override)
+ creds = grpc.ssl_channel_credentials(resources.test_root_certificates())
+ channel = test_common.test_secure_channel(
+ server, creds, config.security_params.server_host_override)
else:
- channel = implementations.insecure_channel(host, port)
+ channel = grpc.insecure_channel(server)
connected_event = threading.Event()
def wait_for_ready(connectivity):
@@ -73,7 +77,7 @@ class BenchmarkClient:
if config.payload_config.WhichOneof('payload') == 'simple_params':
self._generic = False
- self._stub = services_pb2.beta_create_BenchmarkService_stub(channel)
+ self._stub = services_pb2.BenchmarkServiceStub(channel)
payload = messages_pb2.Payload(
body='\0' * config.payload_config.simple_params.req_size)
self._request = messages_pb2.SimpleRequest(
@@ -81,7 +85,7 @@ class BenchmarkClient:
response_size=config.payload_config.simple_params.resp_size)
else:
self._generic = True
- self._stub = implementations.generic_stub(channel)
+ self._stub = GenericStub(channel)
self._request = '\0' * config.payload_config.bytebuf_params.req_size
self._hist = hist
@@ -166,13 +170,8 @@ class _SyncStream(object):
def start(self):
self._is_streaming = True
- if self._generic:
- stream_callable = self._stub.stream_stream(
- 'grpc.testing.BenchmarkService', 'StreamingCall')
- else:
- stream_callable = self._stub.StreamingCall
-
- response_stream = stream_callable(self._request_generator(), _TIMEOUT)
+ response_stream = self._stub.StreamingCall(
+ self._request_generator(), _TIMEOUT)
for _ in response_stream:
self._handle_response(
self, time.time() - self._send_time_queue.get_nowait())
diff --git a/src/python/grpcio_tests/tests/qps/benchmark_server.py b/src/python/grpcio_tests/tests/qps/benchmark_server.py
index 8cbf480d58..2b76b810cd 100644
--- a/src/python/grpcio_tests/tests/qps/benchmark_server.py
+++ b/src/python/grpcio_tests/tests/qps/benchmark_server.py
@@ -31,7 +31,7 @@ from src.proto.grpc.testing import messages_pb2
from src.proto.grpc.testing import services_pb2
-class BenchmarkServer(services_pb2.BetaBenchmarkServiceServicer):
+class BenchmarkServer(services_pb2.BenchmarkServiceServicer):
"""Synchronous Server implementation for the Benchmark service."""
def UnaryCall(self, request, context):
@@ -44,7 +44,7 @@ class BenchmarkServer(services_pb2.BetaBenchmarkServiceServicer):
yield messages_pb2.SimpleResponse(payload=payload)
-class GenericBenchmarkServer(services_pb2.BetaBenchmarkServiceServicer):
+class GenericBenchmarkServer(services_pb2.BenchmarkServiceServicer):
"""Generic Server implementation for the Benchmark service."""
def __init__(self, resp_size):
diff --git a/src/python/grpcio_tests/tests/qps/qps_worker.py b/src/python/grpcio_tests/tests/qps/qps_worker.py
index 16926379a5..3abf0d08dd 100644
--- a/src/python/grpcio_tests/tests/qps/qps_worker.py
+++ b/src/python/grpcio_tests/tests/qps/qps_worker.py
@@ -32,18 +32,21 @@
import argparse
import time
+from concurrent import futures
+import grpc
from src.proto.grpc.testing import services_pb2
from tests.qps import worker_server
def run_worker_server(port):
+ server = grpc.server((), futures.ThreadPoolExecutor(max_workers=5))
servicer = worker_server.WorkerServer()
- server = services_pb2.beta_create_WorkerService_server(servicer)
+ services_pb2.add_WorkerServiceServicer_to_server(servicer, server)
server.add_insecure_port('[::]:{}'.format(port))
server.start()
servicer.wait_for_quit()
- server.stop(2)
+ server.stop(0)
if __name__ == '__main__':
diff --git a/src/python/grpcio_tests/tests/qps/worker_server.py b/src/python/grpcio_tests/tests/qps/worker_server.py
index d41f8377c2..932a1ffe2b 100644
--- a/src/python/grpcio_tests/tests/qps/worker_server.py
+++ b/src/python/grpcio_tests/tests/qps/worker_server.py
@@ -32,8 +32,8 @@ import random
import threading
import time
-from grpc.beta import implementations
-from grpc.framework.interfaces.face import utilities
+from concurrent import futures
+import grpc
from src.proto.grpc.testing import control_pb2
from src.proto.grpc.testing import services_pb2
from src.proto.grpc.testing import stats_pb2
@@ -45,7 +45,7 @@ from tests.qps import histogram
from tests.unit import resources
-class WorkerServer(services_pb2.BetaWorkerServiceServicer):
+class WorkerServer(services_pb2.WorkerServiceServicer):
"""Python Worker Server implementation."""
def __init__(self):
@@ -65,7 +65,7 @@ class WorkerServer(services_pb2.BetaWorkerServiceServicer):
if request.mark.reset:
start_time = end_time
yield status
- server.stop(0)
+ server.stop(None)
def _get_server_status(self, start_time, end_time, port, cores):
end_time = time.time()
@@ -76,25 +76,35 @@ class WorkerServer(services_pb2.BetaWorkerServiceServicer):
return control_pb2.ServerStatus(stats=stats, port=port, cores=cores)
def _create_server(self, config):
- if config.server_type == control_pb2.SYNC_SERVER:
+ if config.async_server_threads == 0:
+ # This is the default concurrent.futures thread pool size, but
+ # None doesn't seem to work
+ server_threads = multiprocessing.cpu_count() * 5
+ else:
+ server_threads = config.async_server_threads
+ server = grpc.server((), futures.ThreadPoolExecutor(
+ max_workers=server_threads))
+ if config.server_type == control_pb2.ASYNC_SERVER:
servicer = benchmark_server.BenchmarkServer()
- server = services_pb2.beta_create_BenchmarkService_server(servicer)
+ services_pb2.add_BenchmarkServiceServicer_to_server(servicer, server)
elif config.server_type == control_pb2.ASYNC_GENERIC_SERVER:
resp_size = config.payload_config.bytebuf_params.resp_size
servicer = benchmark_server.GenericBenchmarkServer(resp_size)
method_implementations = {
- ('grpc.testing.BenchmarkService', 'StreamingCall'):
- utilities.stream_stream_inline(servicer.StreamingCall),
- ('grpc.testing.BenchmarkService', 'UnaryCall'):
- utilities.unary_unary_inline(servicer.UnaryCall),
+ 'StreamingCall':
+ grpc.stream_stream_rpc_method_handler(servicer.StreamingCall),
+ 'UnaryCall':
+ grpc.unary_unary_rpc_method_handler(servicer.UnaryCall),
}
- server = implementations.server(method_implementations)
+ handler = grpc.method_handlers_generic_handler(
+ 'grpc.testing.BenchmarkService', method_implementations)
+ server.add_generic_rpc_handlers((handler,))
else:
raise Exception('Unsupported server type {}'.format(config.server_type))
if config.HasField('security_params'): # Use SSL
- server_creds = implementations.ssl_server_credentials([(
- resources.private_key(), resources.certificate_chain())])
+ server_creds = grpc.ssl_server_credentials(
+ ((resources.private_key(), resources.certificate_chain()),))
port = server.add_secure_port('[::]:{}'.format(config.port), server_creds)
else:
port = server.add_insecure_port('[::]:{}'.format(config.port))