aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio_tests/tests/qps/benchmark_client.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/grpcio_tests/tests/qps/benchmark_client.py')
-rw-r--r--src/python/grpcio_tests/tests/qps/benchmark_client.py280
1 files changed, 141 insertions, 139 deletions
diff --git a/src/python/grpcio_tests/tests/qps/benchmark_client.py b/src/python/grpcio_tests/tests/qps/benchmark_client.py
index 650e4756e7..2e8afc8e7f 100644
--- a/src/python/grpcio_tests/tests/qps/benchmark_client.py
+++ b/src/python/grpcio_tests/tests/qps/benchmark_client.py
@@ -26,7 +26,6 @@
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
"""Defines test client behaviors (UNARY/STREAMING) (SYNC/ASYNC)."""
import abc
@@ -47,165 +46,168 @@ _TIMEOUT = 60 * 60 * 24
class GenericStub(object):
- def __init__(self, channel):
- self.UnaryCall = channel.unary_unary(
- '/grpc.testing.BenchmarkService/UnaryCall')
- self.StreamingCall = channel.stream_stream(
- '/grpc.testing.BenchmarkService/StreamingCall')
+ def __init__(self, channel):
+ self.UnaryCall = channel.unary_unary(
+ '/grpc.testing.BenchmarkService/UnaryCall')
+ self.StreamingCall = channel.stream_stream(
+ '/grpc.testing.BenchmarkService/StreamingCall')
class BenchmarkClient:
- """Benchmark client interface that exposes a non-blocking send_request()."""
-
- __metaclass__ = abc.ABCMeta
-
- def __init__(self, server, config, hist):
- # Create the stub
- if config.HasField('security_params'):
- creds = grpc.ssl_channel_credentials(resources.test_root_certificates())
- channel = test_common.test_secure_channel(
- server, creds, config.security_params.server_host_override)
- else:
- channel = grpc.insecure_channel(server)
-
- # waits for the channel to be ready before we start sending messages
- grpc.channel_ready_future(channel).result()
-
- if config.payload_config.WhichOneof('payload') == 'simple_params':
- self._generic = False
- self._stub = services_pb2.BenchmarkServiceStub(channel)
- payload = messages_pb2.Payload(
- body='\0' * config.payload_config.simple_params.req_size)
- self._request = messages_pb2.SimpleRequest(
- payload=payload,
- response_size=config.payload_config.simple_params.resp_size)
- else:
- self._generic = True
- self._stub = GenericStub(channel)
- self._request = '\0' * config.payload_config.bytebuf_params.req_size
-
- self._hist = hist
- self._response_callbacks = []
-
- def add_response_callback(self, callback):
- """callback will be invoked as callback(client, query_time)"""
- self._response_callbacks.append(callback)
-
- @abc.abstractmethod
- def send_request(self):
- """Non-blocking wrapper for a client's request operation."""
- raise NotImplementedError()
-
- def start(self):
- pass
-
- def stop(self):
- pass
-
- def _handle_response(self, client, query_time):
- self._hist.add(query_time * 1e9) # Report times in nanoseconds
- for callback in self._response_callbacks:
- callback(client, query_time)
+ """Benchmark client interface that exposes a non-blocking send_request()."""
+
+ __metaclass__ = abc.ABCMeta
+
+ def __init__(self, server, config, hist):
+ # Create the stub
+ if config.HasField('security_params'):
+ creds = grpc.ssl_channel_credentials(
+ resources.test_root_certificates())
+ channel = test_common.test_secure_channel(
+ server, creds, config.security_params.server_host_override)
+ else:
+ channel = grpc.insecure_channel(server)
+
+ # waits for the channel to be ready before we start sending messages
+ grpc.channel_ready_future(channel).result()
+
+ if config.payload_config.WhichOneof('payload') == 'simple_params':
+ self._generic = False
+ self._stub = services_pb2.BenchmarkServiceStub(channel)
+ payload = messages_pb2.Payload(
+ body='\0' * config.payload_config.simple_params.req_size)
+ self._request = messages_pb2.SimpleRequest(
+ payload=payload,
+ response_size=config.payload_config.simple_params.resp_size)
+ else:
+ self._generic = True
+ self._stub = GenericStub(channel)
+ self._request = '\0' * config.payload_config.bytebuf_params.req_size
+
+ self._hist = hist
+ self._response_callbacks = []
+
+ def add_response_callback(self, callback):
+ """callback will be invoked as callback(client, query_time)"""
+ self._response_callbacks.append(callback)
+
+ @abc.abstractmethod
+ def send_request(self):
+ """Non-blocking wrapper for a client's request operation."""
+ raise NotImplementedError()
+
+ def start(self):
+ pass
+
+ def stop(self):
+ pass
+
+ def _handle_response(self, client, query_time):
+ self._hist.add(query_time * 1e9) # Report times in nanoseconds
+ for callback in self._response_callbacks:
+ callback(client, query_time)
class UnarySyncBenchmarkClient(BenchmarkClient):
- def __init__(self, server, config, hist):
- super(UnarySyncBenchmarkClient, self).__init__(server, config, hist)
- self._pool = futures.ThreadPoolExecutor(
- max_workers=config.outstanding_rpcs_per_channel)
+ def __init__(self, server, config, hist):
+ super(UnarySyncBenchmarkClient, self).__init__(server, config, hist)
+ self._pool = futures.ThreadPoolExecutor(
+ max_workers=config.outstanding_rpcs_per_channel)
- def send_request(self):
- # Send requests in seperate threads to support multiple outstanding rpcs
- # (See src/proto/grpc/testing/control.proto)
- self._pool.submit(self._dispatch_request)
+ def send_request(self):
+ # Send requests in seperate threads to support multiple outstanding rpcs
+ # (See src/proto/grpc/testing/control.proto)
+ self._pool.submit(self._dispatch_request)
- def stop(self):
- self._pool.shutdown(wait=True)
- self._stub = None
+ def stop(self):
+ self._pool.shutdown(wait=True)
+ self._stub = None
- def _dispatch_request(self):
- start_time = time.time()
- self._stub.UnaryCall(self._request, _TIMEOUT)
- end_time = time.time()
- self._handle_response(self, end_time - start_time)
+ def _dispatch_request(self):
+ start_time = time.time()
+ self._stub.UnaryCall(self._request, _TIMEOUT)
+ end_time = time.time()
+ self._handle_response(self, end_time - start_time)
class UnaryAsyncBenchmarkClient(BenchmarkClient):
- def send_request(self):
- # Use the Future callback api to support multiple outstanding rpcs
- start_time = time.time()
- response_future = self._stub.UnaryCall.future(self._request, _TIMEOUT)
- response_future.add_done_callback(
- lambda resp: self._response_received(start_time, resp))
+ def send_request(self):
+ # Use the Future callback api to support multiple outstanding rpcs
+ start_time = time.time()
+ response_future = self._stub.UnaryCall.future(self._request, _TIMEOUT)
+ response_future.add_done_callback(
+ lambda resp: self._response_received(start_time, resp))
- def _response_received(self, start_time, resp):
- resp.result()
- end_time = time.time()
- self._handle_response(self, end_time - start_time)
+ def _response_received(self, start_time, resp):
+ resp.result()
+ end_time = time.time()
+ self._handle_response(self, end_time - start_time)
- def stop(self):
- self._stub = None
+ def stop(self):
+ self._stub = None
class _SyncStream(object):
- def __init__(self, stub, generic, request, handle_response):
- self._stub = stub
- self._generic = generic
- self._request = request
- self._handle_response = handle_response
- self._is_streaming = False
- self._request_queue = queue.Queue()
- self._send_time_queue = queue.Queue()
-
- def send_request(self):
- self._send_time_queue.put(time.time())
- self._request_queue.put(self._request)
-
- def start(self):
- self._is_streaming = True
- response_stream = self._stub.StreamingCall(
- self._request_generator(), _TIMEOUT)
- for _ in response_stream:
- self._handle_response(
- self, time.time() - self._send_time_queue.get_nowait())
-
- def stop(self):
- self._is_streaming = False
-
- def _request_generator(self):
- while self._is_streaming:
- try:
- request = self._request_queue.get(block=True, timeout=1.0)
- yield request
- except queue.Empty:
- pass
+ def __init__(self, stub, generic, request, handle_response):
+ self._stub = stub
+ self._generic = generic
+ self._request = request
+ self._handle_response = handle_response
+ self._is_streaming = False
+ self._request_queue = queue.Queue()
+ self._send_time_queue = queue.Queue()
+
+ def send_request(self):
+ self._send_time_queue.put(time.time())
+ self._request_queue.put(self._request)
+
+ def start(self):
+ self._is_streaming = True
+ response_stream = self._stub.StreamingCall(self._request_generator(),
+ _TIMEOUT)
+ for _ in response_stream:
+ self._handle_response(
+ self, time.time() - self._send_time_queue.get_nowait())
+
+ def stop(self):
+ self._is_streaming = False
+
+ def _request_generator(self):
+ while self._is_streaming:
+ try:
+ request = self._request_queue.get(block=True, timeout=1.0)
+ yield request
+ except queue.Empty:
+ pass
class StreamingSyncBenchmarkClient(BenchmarkClient):
- def __init__(self, server, config, hist):
- super(StreamingSyncBenchmarkClient, self).__init__(server, config, hist)
- self._pool = futures.ThreadPoolExecutor(
- max_workers=config.outstanding_rpcs_per_channel)
- self._streams = [_SyncStream(self._stub, self._generic,
- self._request, self._handle_response)
- for _ in xrange(config.outstanding_rpcs_per_channel)]
- self._curr_stream = 0
-
- def send_request(self):
- # Use a round_robin scheduler to determine what stream to send on
- self._streams[self._curr_stream].send_request()
- self._curr_stream = (self._curr_stream + 1) % len(self._streams)
-
- def start(self):
- for stream in self._streams:
- self._pool.submit(stream.start)
-
- def stop(self):
- for stream in self._streams:
- stream.stop()
- self._pool.shutdown(wait=True)
- self._stub = None
+ def __init__(self, server, config, hist):
+ super(StreamingSyncBenchmarkClient, self).__init__(server, config, hist)
+ self._pool = futures.ThreadPoolExecutor(
+ max_workers=config.outstanding_rpcs_per_channel)
+ self._streams = [
+ _SyncStream(self._stub, self._generic, self._request,
+ self._handle_response)
+ for _ in xrange(config.outstanding_rpcs_per_channel)
+ ]
+ self._curr_stream = 0
+
+ def send_request(self):
+ # Use a round_robin scheduler to determine what stream to send on
+ self._streams[self._curr_stream].send_request()
+ self._curr_stream = (self._curr_stream + 1) % len(self._streams)
+
+ def start(self):
+ for stream in self._streams:
+ self._pool.submit(stream.start)
+
+ def stop(self):
+ for stream in self._streams:
+ stream.stop()
+ self._pool.shutdown(wait=True)
+ self._stub = None