aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio/tests/qps/benchmark_client.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/grpcio/tests/qps/benchmark_client.py')
-rw-r--r--src/python/grpcio/tests/qps/benchmark_client.py88
1 files changed, 59 insertions, 29 deletions
diff --git a/src/python/grpcio/tests/qps/benchmark_client.py b/src/python/grpcio/tests/qps/benchmark_client.py
index eed0b0c6da..080281415d 100644
--- a/src/python/grpcio/tests/qps/benchmark_client.py
+++ b/src/python/grpcio/tests/qps/benchmark_client.py
@@ -30,15 +30,15 @@
"""Defines test client behaviors (UNARY/STREAMING) (SYNC/ASYNC)."""
import abc
+import threading
import time
-try:
- import Queue as queue # Python 2.x
-except ImportError:
- import queue # Python 3
from concurrent import futures
+from six.moves import queue
+import grpc
from grpc.beta import implementations
+from grpc.framework.interfaces.face import face
from src.proto.grpc.testing import messages_pb2
from src.proto.grpc.testing import services_pb2
from tests.unit import resources
@@ -64,6 +64,13 @@ class BenchmarkClient:
else:
channel = implementations.insecure_channel(host, port)
+ connected_event = threading.Event()
+ def wait_for_ready(connectivity):
+ if connectivity == grpc.ChannelConnectivity.READY:
+ connected_event.set()
+ channel.subscribe(wait_for_ready, try_to_connect=True)
+ connected_event.wait()
+
if config.payload_config.WhichOneof('payload') == 'simple_params':
self._generic = False
self._stub = services_pb2.beta_create_BenchmarkService_stub(channel)
@@ -81,6 +88,7 @@ class BenchmarkClient:
self._response_callbacks = []
def add_response_callback(self, callback):
+ """callback will be invoked as callback(client, query_time)"""
self._response_callbacks.append(callback)
@abc.abstractmethod
@@ -94,10 +102,10 @@ class BenchmarkClient:
def stop(self):
pass
- def _handle_response(self, query_time):
+ def _handle_response(self, client, query_time):
self._hist.add(query_time * 1e9) # Report times in nanoseconds
for callback in self._response_callbacks:
- callback(query_time)
+ callback(client, query_time)
class UnarySyncBenchmarkClient(BenchmarkClient):
@@ -120,7 +128,7 @@ class UnarySyncBenchmarkClient(BenchmarkClient):
start_time = time.time()
self._stub.UnaryCall(self._request, _TIMEOUT)
end_time = time.time()
- self._handle_response(end_time - start_time)
+ self._handle_response(self, end_time - start_time)
class UnaryAsyncBenchmarkClient(BenchmarkClient):
@@ -135,19 +143,20 @@ class UnaryAsyncBenchmarkClient(BenchmarkClient):
def _response_received(self, start_time, resp):
resp.result()
end_time = time.time()
- self._handle_response(end_time - start_time)
+ self._handle_response(self, end_time - start_time)
def stop(self):
self._stub = None
-class StreamingAsyncBenchmarkClient(BenchmarkClient):
+class _SyncStream(object):
- def __init__(self, server, config, hist):
- super(StreamingAsyncBenchmarkClient, self).__init__(server, config, hist)
+ def __init__(self, stub, generic, request, handle_response):
+ self._stub = stub
+ self._generic = generic
+ self._request = request
+ self._handle_response = handle_response
self._is_streaming = False
- self._pool = futures.ThreadPoolExecutor(max_workers=1)
- # Use a thread-safe queue to put requests on the stream
self._request_queue = queue.Queue()
self._send_time_queue = queue.Queue()
@@ -157,25 +166,19 @@ class StreamingAsyncBenchmarkClient(BenchmarkClient):
def start(self):
self._is_streaming = True
- self._pool.submit(self._request_stream)
-
- def stop(self):
- self._is_streaming = False
- self._pool.shutdown(wait=True)
- self._stub = None
-
- def _request_stream(self):
- self._is_streaming = True
if self._generic:
- response_stream = self._stub.inline_stream_stream(
- 'grpc.testing.BenchmarkService', 'StreamingCall',
- self._request_generator(), _TIMEOUT)
+ stream_callable = self._stub.stream_stream(
+ 'grpc.testing.BenchmarkService', 'StreamingCall')
else:
- response_stream = self._stub.StreamingCall(self._request_generator(),
- _TIMEOUT)
+ stream_callable = self._stub.StreamingCall
+
+ response_stream = stream_callable(self._request_generator(), _TIMEOUT)
for _ in response_stream:
- end_time = time.time()
- self._handle_response(end_time - self._send_time_queue.get_nowait())
+ self._handle_response(
+ self, time.time() - self._send_time_queue.get_nowait())
+
+ def stop(self):
+ self._is_streaming = False
def _request_generator(self):
while self._is_streaming:
@@ -184,3 +187,30 @@ class StreamingAsyncBenchmarkClient(BenchmarkClient):
yield request
except queue.Empty:
pass
+
+
+class StreamingSyncBenchmarkClient(BenchmarkClient):
+
+ def __init__(self, server, config, hist):
+ super(StreamingSyncBenchmarkClient, self).__init__(server, config, hist)
+ self._pool = futures.ThreadPoolExecutor(
+ max_workers=config.outstanding_rpcs_per_channel)
+ self._streams = [_SyncStream(self._stub, self._generic,
+ self._request, self._handle_response)
+ for _ in xrange(config.outstanding_rpcs_per_channel)]
+ self._curr_stream = 0
+
+ def send_request(self):
+ # Use a round_robin scheduler to determine what stream to send on
+ self._streams[self._curr_stream].send_request()
+ self._curr_stream = (self._curr_stream + 1) % len(self._streams)
+
+ def start(self):
+ for stream in self._streams:
+ self._pool.submit(stream.start)
+
+ def stop(self):
+ for stream in self._streams:
+ stream.stop()
+ self._pool.shutdown(wait=True)
+ self._stub = None