From 92fe70e9836f2c761cbf5a15da34af9c13869697 Mon Sep 17 00:00:00 2001 From: vpai Date: Tue, 13 Jan 2015 11:21:38 -0800 Subject: Added new results to report resource usage and QPS, along with new proto support for this and also to turn the client into a server that can be controlled remotely (client part not implemented yet) Change on 2015/01/13 by vpai ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=83863541 --- test/cpp/qps/client.cc | 47 ++++++++++--- test/cpp/qps/qpstest.proto | 162 +++++++++++++++++++++++++++++++++++++++++++++ test/cpp/qps/server.cc | 24 ++++++- 3 files changed, 223 insertions(+), 10 deletions(-) create mode 100644 test/cpp/qps/qpstest.proto (limited to 'test') diff --git a/test/cpp/qps/client.cc b/test/cpp/qps/client.cc index 287cef8192..71c3385035 100644 --- a/test/cpp/qps/client.cc +++ b/test/cpp/qps/client.cc @@ -46,7 +46,7 @@ #include #include #include "test/cpp/util/create_test_channel.h" -#include "test/cpp/interop/test.pb.h" +#include "test/cpp/qps/qpstest.pb.h" DEFINE_bool(enable_ssl, false, "Whether to use ssl/tls."); DEFINE_int32(server_port, 0, "Server port."); @@ -74,8 +74,10 @@ DEFINE_string(workload, "", "Workload parameters"); using grpc::ChannelInterface; using grpc::CreateTestChannel; +using grpc::testing::ServerStats; using grpc::testing::SimpleRequest; using grpc::testing::SimpleResponse; +using grpc::testing::StatsRequest; using grpc::testing::TestService; static double now() { @@ -120,6 +122,14 @@ void RunTest(const int client_threads, const int client_channels, std::vector threads; // Will add threads when ready to execute std::vector<::gpr_histogram *> thread_stats(client_threads); + TestService::Stub *stub_stats = channels[0].get_stub(); + grpc::ClientContext context_stats_begin; + StatsRequest stats_request; + ServerStats server_stats_begin; + stats_request.set_test_num(0); + grpc::Status status_beg = stub_stats->CollectServerStats( + &context_stats_begin, stats_request, &server_stats_begin); + for (int i = 0; i < client_threads; i++) { gpr_histogram *hist = gpr_histogram_create(0.01, 60e9); GPR_ASSERT(hist != NULL); @@ -161,11 +171,9 @@ void RunTest(const int client_threads, const int client_channels, } for (int i = 0; i < client_threads; i++) { gpr_histogram *h = thread_stats[i]; - gpr_log(GPR_INFO, "latency at thread %d (50/95/99/99.9): %f/%f/%f/%f", - i, - gpr_histogram_percentile(h, 50), - gpr_histogram_percentile(h, 95), - gpr_histogram_percentile(h, 99), + gpr_log(GPR_INFO, "latency at thread %d (50/90/95/99/99.9): %f/%f/%f/%f/%f", + i, gpr_histogram_percentile(h, 50), gpr_histogram_percentile(h, 90), + gpr_histogram_percentile(h, 95), gpr_histogram_percentile(h, 99), gpr_histogram_percentile(h, 99.9)); gpr_histogram_merge(hist, h); gpr_histogram_destroy(h); @@ -174,11 +182,32 @@ void RunTest(const int client_threads, const int client_channels, gpr_log( GPR_INFO, "latency across %d threads with %d channels and %d payload " - "(50/95/99/99.9): %f / %f / %f / %f", + "(50/90/95/99/99.9): %f / %f / %f / %f / %f", client_threads, client_channels, payload_size, - gpr_histogram_percentile(hist, 50), gpr_histogram_percentile(hist, 95), - gpr_histogram_percentile(hist, 99), gpr_histogram_percentile(hist, 99.9)); + gpr_histogram_percentile(hist, 50), gpr_histogram_percentile(hist, 90), + gpr_histogram_percentile(hist, 95), gpr_histogram_percentile(hist, 99), + gpr_histogram_percentile(hist, 99.9)); gpr_histogram_destroy(hist); + + grpc::ClientContext context_stats_end; + ServerStats server_stats_end; + grpc::Status status_end = stub_stats->CollectServerStats( + &context_stats_end, stats_request, &server_stats_end); + + double elapsed = server_stats_end.time_now() - server_stats_begin.time_now(); + int total_rpcs = client_threads * num_rpcs; + double utime = server_stats_end.time_user() - server_stats_begin.time_user(); + double stime = + server_stats_end.time_system() - server_stats_begin.time_system(); + gpr_log(GPR_INFO, + "Elapsed time: %.3f\n" + "RPC Count: %d\n" + "QPS: %.3f\n" + "System time: %.3f\n" + "User time: %.3f\n" + "Resource usage: %.1f%%\n", + elapsed, total_rpcs, total_rpcs / elapsed, stime, utime, + (stime + utime) / elapsed * 100.0); } int main(int argc, char **argv) { diff --git a/test/cpp/qps/qpstest.proto b/test/cpp/qps/qpstest.proto new file mode 100644 index 0000000000..cd7a5174ed --- /dev/null +++ b/test/cpp/qps/qpstest.proto @@ -0,0 +1,162 @@ +// An integration test service that covers all the method signature permutations +// of unary/streaming requests/responses. +syntax = "proto2"; + +package grpc.testing; + +option java_api_version = 2; +option cc_api_version = 2; +option java_package = "com.google.net.stubby.testing.integration"; + +enum PayloadType { + // Compressable text format. + COMPRESSABLE= 1; + + // Uncompressable binary format. + UNCOMPRESSABLE = 2; + + // Randomly chosen from all other formats defined in this enum. + RANDOM = 3; +} + +message StatsRequest { + // run number + optional int32 test_num = 1; +} + +message ServerStats { + // wall clock time for timestamp + required double time_now = 1; + + // user time used by the server process and threads + required double time_user = 2; + + // server time used by the server process and all threads + required double time_system = 3; + + // RPC count so far + optional int32 num_rpcs = 4; +} + +message Payload { + // The type of data in body. + optional PayloadType type = 1; + // Primary contents of payload. + optional bytes body = 2; +} + +message Latencies { + required double l_50 = 1; + required double l_90 = 2; + required double l_99 = 3; + required double l_999 = 4; +} + +message StartArgs { + required string server_host = 1; + required int32 server_port = 2; + optional bool enable_ssl = 3 [default = false]; + optional int32 client_threads = 4 [default = 1]; + optional int32 client_channels = 5 [default = -1]; + optional int32 num_rpcs = 6 [default = 1]; + optional int32 payload_size = 7 [default = 1]; +} + +message StartResult { + required Latencies latencies = 1; + required int32 num_rpcs = 2; + required double time_elapsed = 3; + required double time_user = 4; + required double time_system = 5; +} + +message SimpleRequest { + // Desired payload type in the response from the server. + // If response_type is RANDOM, server randomly chooses one from other formats. + optional PayloadType response_type = 1 [default=COMPRESSABLE]; + + // Desired payload size in the response from the server. + // If response_type is COMPRESSABLE, this denotes the size before compression. + optional int32 response_size = 2; + + // Optional input payload sent along with the request. + optional Payload payload = 3; +} + +message SimpleResponse { + optional Payload payload = 1; +} + +message StreamingInputCallRequest { + // Optional input payload sent along with the request. + optional Payload payload = 1; + + // Not expecting any payload from the response. +} + +message StreamingInputCallResponse { + // Aggregated size of payloads received from the client. + optional int32 aggregated_payload_size = 1; +} + +message ResponseParameters { + // Desired payload sizes in responses from the server. + // If response_type is COMPRESSABLE, this denotes the size before compression. + required int32 size = 1; + + // Desired interval between consecutive responses in the response stream in + // microseconds. + required int32 interval_us = 2; +} + +message StreamingOutputCallRequest { + // Desired payload type in the response from the server. + // If response_type is RANDOM, the payload from each response in the stream + // might be of different types. This is to simulate a mixed type of payload + // stream. + optional PayloadType response_type = 1 [default=COMPRESSABLE]; + + repeated ResponseParameters response_parameters = 2; + + // Optional input payload sent along with the request. + optional Payload payload = 3; +} + +message StreamingOutputCallResponse { + optional Payload payload = 1; +} + +service TestService { + // Start test with specified workload + rpc StartTest(StartArgs) returns (Latencies); + + // Collect stats from server, ignore request content + rpc CollectServerStats(StatsRequest) returns (ServerStats); + + // One request followed by one response. + // The server returns the client payload as-is. + rpc UnaryCall(SimpleRequest) returns (SimpleResponse); + + // One request followed by a sequence of responses (streamed download). + // The server returns the payload with client desired type and sizes. + rpc StreamingOutputCall(StreamingOutputCallRequest) + returns (stream StreamingOutputCallResponse); + + // A sequence of requests followed by one response (streamed upload). + // The server returns the aggregated size of client payload as the result. + rpc StreamingInputCall(stream StreamingInputCallRequest) + returns (StreamingInputCallResponse); + + // A sequence of requests with each request served by the server immediately. + // As one request could lead to multiple responses, this interface + // demonstrates the idea of full duplexing. + rpc FullDuplexCall(stream StreamingOutputCallRequest) + returns (stream StreamingOutputCallResponse); + + // A sequence of requests followed by a sequence of responses. + // The server buffers all the client requests and then serves them in order. A + // stream of responses are returned to the client when the server starts with + // first request. + rpc HalfDuplexCall(stream StreamingOutputCallRequest) + returns (stream StreamingOutputCallResponse); +} diff --git a/test/cpp/qps/server.cc b/test/cpp/qps/server.cc index 5e426127f0..b2a4cde59f 100644 --- a/test/cpp/qps/server.cc +++ b/test/cpp/qps/server.cc @@ -31,6 +31,8 @@ * */ +#include +#include #include #include @@ -41,7 +43,7 @@ #include #include #include -#include "test/cpp/interop/test.pb.h" +#include "test/cpp/qps/qpstest.pb.h" #include #include @@ -54,11 +56,17 @@ using grpc::ServerBuilder; using grpc::ServerContext; using grpc::testing::Payload; using grpc::testing::PayloadType; +using grpc::testing::ServerStats; using grpc::testing::SimpleRequest; using grpc::testing::SimpleResponse; +using grpc::testing::StatsRequest; using grpc::testing::TestService; using grpc::Status; +static double time_double(struct timeval* tv) { + return tv->tv_sec + 1e-6 * tv->tv_usec; +} + bool SetPayload(PayloadType type, int size, Payload* payload) { PayloadType response_type = type; // TODO(yangg): Support UNCOMPRESSABLE payload. @@ -72,7 +80,21 @@ bool SetPayload(PayloadType type, int size, Payload* payload) { } class TestServiceImpl : public TestService::Service { + private: + int num_rpcs; + public: + Status CollectServerStats(ServerContext* context, const StatsRequest*, + ServerStats* response) { + struct rusage usage; + struct timeval tv; + gettimeofday(&tv, NULL); + getrusage(RUSAGE_SELF, &usage); + response->set_time_now(time_double(&tv)); + response->set_time_user(time_double(&usage.ru_utime)); + response->set_time_system(time_double(&usage.ru_stime)); + return Status::OK; + } Status UnaryCall(ServerContext* context, const SimpleRequest* request, SimpleResponse* response) { if (request->has_response_size() && request->response_size() > 0) { -- cgit v1.2.3