diff options
author | 2016-01-27 19:57:58 -0800 | |
---|---|---|
committer | 2016-01-27 19:57:58 -0800 | |
commit | b4e51b52bdd3a41f44e4959d08957f7142c9c56a (patch) | |
tree | 699df3ca4dfe307dc9e139dab565dc1db1a700fe /test/cpp | |
parent | 5a9462339dd35de13acd44e7c8a001ac727c038e (diff) | |
parent | f846aaf41baf34d68f9aa7e3daa2c65cd75dd7f1 (diff) |
Merge branch 'master' of github.com:grpc/grpc into sync-async-plus-interfaces
Diffstat (limited to 'test/cpp')
-rw-r--r-- | test/cpp/end2end/end2end_test.cc | 168 | ||||
-rw-r--r-- | test/cpp/qps/client_async.cc | 37 | ||||
-rw-r--r-- | test/cpp/qps/driver.cc | 20 | ||||
-rw-r--r-- | test/cpp/qps/generic_async_streaming_ping_pong_test.cc | 2 | ||||
-rw-r--r-- | test/cpp/qps/qps_driver.cc | 4 | ||||
-rw-r--r-- | test/cpp/qps/qps_worker.cc | 24 | ||||
-rw-r--r-- | test/cpp/qps/server.h | 1 | ||||
-rw-r--r-- | test/cpp/qps/server_async.cc | 20 |
8 files changed, 249 insertions, 27 deletions
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 58b20e61c9..5a414ebc86 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -54,7 +54,6 @@ #include "test/core/end2end/data/ssl_test_data.h" #include "test/core/util/port.h" #include "test/core/util/test_config.h" -#include "test/cpp/end2end/test_service_impl.h" #include "test/cpp/util/string_ref_helper.h" using grpc::testing::EchoRequest; @@ -65,6 +64,40 @@ namespace grpc { namespace testing { namespace { +const char* kServerCancelAfterReads = "cancel_after_reads"; + +// When echo_deadline is requested, deadline seen in the ServerContext is set in +// the response in seconds. +void MaybeEchoDeadline(ServerContext* context, const EchoRequest* request, + EchoResponse* response) { + if (request->has_param() && request->param().echo_deadline()) { + gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME); + if (context->deadline() != system_clock::time_point::max()) { + Timepoint2Timespec(context->deadline(), &deadline); + } + response->mutable_param()->set_request_deadline(deadline.tv_sec); + } +} + +void CheckServerAuthContext(const ServerContext* context, + const grpc::string& expected_client_identity) { + std::shared_ptr<const AuthContext> auth_ctx = context->auth_context(); + std::vector<grpc::string_ref> ssl = + auth_ctx->FindPropertyValues("transport_security_type"); + EXPECT_EQ(1u, ssl.size()); + EXPECT_EQ("ssl", ToString(ssl[0])); + if (expected_client_identity.length() == 0) { + EXPECT_TRUE(auth_ctx->GetPeerIdentityPropertyName().empty()); + EXPECT_TRUE(auth_ctx->GetPeerIdentity().empty()); + EXPECT_FALSE(auth_ctx->IsPeerAuthenticated()); + } else { + auto identity = auth_ctx->GetPeerIdentity(); + EXPECT_TRUE(auth_ctx->IsPeerAuthenticated()); + EXPECT_EQ(1u, identity.size()); + EXPECT_EQ(expected_client_identity, identity[0]); + } +} + bool CheckIsLocalhost(const grpc::string& addr) { const grpc::string kIpv6("ipv6:[::1]:"); const grpc::string kIpv4MappedIpv6("ipv6:[::ffff:127.0.0.1]:"); @@ -179,6 +212,138 @@ class Proxy : public ::grpc::testing::EchoTestService::Service { std::unique_ptr< ::grpc::testing::EchoTestService::Stub> stub_; }; +class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { + public: + TestServiceImpl() : signal_client_(false), host_() {} + explicit TestServiceImpl(const grpc::string& host) + : signal_client_(false), host_(new grpc::string(host)) {} + + Status Echo(ServerContext* context, const EchoRequest* request, + EchoResponse* response) GRPC_OVERRIDE { + response->set_message(request->message()); + MaybeEchoDeadline(context, request, response); + if (host_) { + response->mutable_param()->set_host(*host_); + } + if (request->has_param() && request->param().client_cancel_after_us()) { + { + std::unique_lock<std::mutex> lock(mu_); + signal_client_ = true; + } + while (!context->IsCancelled()) { + gpr_sleep_until(gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_micros(request->param().client_cancel_after_us(), + GPR_TIMESPAN))); + } + return Status::CANCELLED; + } else if (request->has_param() && + request->param().server_cancel_after_us()) { + gpr_sleep_until(gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_micros(request->param().server_cancel_after_us(), + GPR_TIMESPAN))); + return Status::CANCELLED; + } else if (!request->has_param() || + !request->param().skip_cancelled_check()) { + EXPECT_FALSE(context->IsCancelled()); + } + + if (request->has_param() && request->param().echo_metadata()) { + const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata = + context->client_metadata(); + for (std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator + iter = client_metadata.begin(); + iter != client_metadata.end(); ++iter) { + context->AddTrailingMetadata(ToString(iter->first), + ToString(iter->second)); + } + } + if (request->has_param() && + (request->param().expected_client_identity().length() > 0 || + request->param().check_auth_context())) { + CheckServerAuthContext(context, + request->param().expected_client_identity()); + } + if (request->has_param() && + request->param().response_message_length() > 0) { + response->set_message( + grpc::string(request->param().response_message_length(), '\0')); + } + if (request->has_param() && request->param().echo_peer()) { + response->mutable_param()->set_peer(context->peer()); + } + return Status::OK; + } + + // Unimplemented is left unimplemented to test the returned error. + + Status RequestStream(ServerContext* context, + ServerReader<EchoRequest>* reader, + EchoResponse* response) GRPC_OVERRIDE { + EchoRequest request; + response->set_message(""); + int cancel_after_reads = 0; + const std::multimap<grpc::string_ref, grpc::string_ref>& + client_initial_metadata = context->client_metadata(); + if (client_initial_metadata.find(kServerCancelAfterReads) != + client_initial_metadata.end()) { + std::istringstream iss(ToString( + client_initial_metadata.find(kServerCancelAfterReads)->second)); + iss >> cancel_after_reads; + gpr_log(GPR_INFO, "cancel_after_reads %d", cancel_after_reads); + } + while (reader->Read(&request)) { + if (cancel_after_reads == 1) { + gpr_log(GPR_INFO, "return cancel status"); + return Status::CANCELLED; + } else if (cancel_after_reads > 0) { + cancel_after_reads--; + } + response->mutable_message()->append(request.message()); + } + return Status::OK; + } + + // Return 3 messages. + // TODO(yangg) make it generic by adding a parameter into EchoRequest + Status ResponseStream(ServerContext* context, const EchoRequest* request, + ServerWriter<EchoResponse>* writer) GRPC_OVERRIDE { + EchoResponse response; + response.set_message(request->message() + "0"); + writer->Write(response); + response.set_message(request->message() + "1"); + writer->Write(response); + response.set_message(request->message() + "2"); + writer->Write(response); + + return Status::OK; + } + + Status BidiStream(ServerContext* context, + ServerReaderWriter<EchoResponse, EchoRequest>* stream) + GRPC_OVERRIDE { + EchoRequest request; + EchoResponse response; + while (stream->Read(&request)) { + gpr_log(GPR_INFO, "recv msg %s", request.message().c_str()); + response.set_message(request.message()); + stream->Write(response); + } + return Status::OK; + } + + bool signal_client() { + std::unique_lock<std::mutex> lock(mu_); + return signal_client_; + } + + private: + bool signal_client_; + std::mutex mu_; + std::unique_ptr<grpc::string> host_; +}; + class TestServiceImplDupPkg : public ::grpc::testing::duplicate::EchoTestService::Service { public: @@ -659,6 +824,7 @@ TEST_P(ProxyEnd2endTest, RpcDeadlineExpires) { EchoRequest request; EchoResponse response; request.set_message("Hello"); + request.mutable_param()->set_skip_cancelled_check(true); ClientContext context; std::chrono::system_clock::time_point deadline = diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 60ff7d9a32..cbb2dbe70b 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -48,6 +48,7 @@ #include <grpc++/client_context.h> #include <grpc++/generic/generic_stub.h> #include <grpc/grpc.h> +#include <grpc/support/cpu.h> #include <grpc/support/histogram.h> #include <grpc/support/log.h> @@ -160,20 +161,22 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { using Client::closed_loop_; using ClientImpl<StubType, RequestType>::channels_; using ClientImpl<StubType, RequestType>::request_; - AsyncClient(const ClientConfig& config, - std::function<ClientRpcContext*(int, StubType*, - const RequestType&)> setup_ctx, - std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)> - create_stub) + AsyncClient( + const ClientConfig& config, + std::function<ClientRpcContext*(int, StubType*, const RequestType&)> + setup_ctx, + std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)> + create_stub) : ClientImpl<StubType, RequestType>(config, create_stub), + num_async_threads_(NumThreads(config)), channel_lock_(new std::mutex[config.client_channels()]), contexts_(config.client_channels()), max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()), channel_count_(config.client_channels()), - pref_channel_inc_(config.async_client_threads()) { - SetupLoadTest(config, config.async_client_threads()); + pref_channel_inc_(num_async_threads_) { + SetupLoadTest(config, num_async_threads_); - for (int i = 0; i < config.async_client_threads(); i++) { + for (int i = 0; i < num_async_threads_; i++) { cli_cqs_.emplace_back(new CompletionQueue); if (!closed_loop_) { rpc_deadlines_.emplace_back(); @@ -326,6 +329,9 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { return true; } + protected: + int num_async_threads_; + private: class boolean { // exists only to avoid data-race on vector<bool> public: @@ -340,6 +346,15 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { private: bool val_; }; + static int NumThreads(const ClientConfig& config) { + int num_threads = config.async_client_threads(); + if (num_threads <= 0) { // Use dynamic sizing + num_threads = gpr_cpu_num_cores(); + gpr_log(GPR_INFO, "Sizing client server to %d threads", num_threads); + } + return num_threads; + } + std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_; std::vector<deadline_list> rpc_deadlines_; // per thread deadlines @@ -365,7 +380,7 @@ class AsyncUnaryClient GRPC_FINAL public: explicit AsyncUnaryClient(const ClientConfig& config) : AsyncClient(config, SetupCtx, BenchmarkStubCreator) { - StartThreads(config.async_client_threads()); + StartThreads(num_async_threads_); } ~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); } @@ -463,7 +478,7 @@ class AsyncStreamingClient GRPC_FINAL // async streaming currently only supports closed loop GPR_ASSERT(closed_loop_); - StartThreads(config.async_client_threads()); + StartThreads(num_async_threads_); } ~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); } @@ -568,7 +583,7 @@ class GenericAsyncStreamingClient GRPC_FINAL // async streaming currently only supports closed loop GPR_ASSERT(closed_loop_); - StartThreads(config.async_client_threads()); + StartThreads(num_async_threads_); } ~GenericAsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); } diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index 197d1ddd96..490156aec2 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -31,25 +31,25 @@ * */ +#include <deque> #include <list> #include <thread> -#include <deque> #include <vector> -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/host_port.h> #include <grpc++/channel.h> #include <grpc++/client_context.h> #include <grpc++/create_channel.h> +#include <grpc/support/alloc.h> +#include <grpc/support/host_port.h> +#include <grpc/support/log.h> #include "src/core/support/env.h" +#include "src/proto/grpc/testing/services.grpc.pb.h" #include "test/core/util/port.h" #include "test/core/util/test_config.h" #include "test/cpp/qps/driver.h" #include "test/cpp/qps/histogram.h" #include "test/cpp/qps/qps_worker.h" -#include "src/proto/grpc/testing/services.grpc.pb.h" using std::list; using std::thread; @@ -143,6 +143,12 @@ std::unique_ptr<ScenarioResult> RunScenario( } } + // if num_clients is set to <=0, do dynamic sizing: all workers + // except for servers are clients + if (num_clients <= 0) { + num_clients = workers.size() - num_servers; + } + // TODO(ctiller): support running multiple configurations, and binpack // client/server pairs // to available workers @@ -162,6 +168,8 @@ std::unique_ptr<ScenarioResult> RunScenario( // where class contained in std::vector must have a copy constructor auto* servers = new ServerData[num_servers]; for (size_t i = 0; i < num_servers; i++) { + gpr_log(GPR_INFO, "Starting server on %s (worker #%d)", workers[i].c_str(), + i); servers[i].stub = WorkerService::NewStub( CreateChannel(workers[i], InsecureChannelCredentials())); ServerArgs args; @@ -189,6 +197,8 @@ std::unique_ptr<ScenarioResult> RunScenario( // where class contained in std::vector must have a copy constructor auto* clients = new ClientData[num_clients]; for (size_t i = 0; i < num_clients; i++) { + gpr_log(GPR_INFO, "Starting client on %s (worker #%d)", + workers[i + num_servers].c_str(), i + num_servers); clients[i].stub = WorkerService::NewStub( CreateChannel(workers[i + num_servers], InsecureChannelCredentials())); ClientArgs args; diff --git a/test/cpp/qps/generic_async_streaming_ping_pong_test.cc b/test/cpp/qps/generic_async_streaming_ping_pong_test.cc index 2b2e1c820f..81c0f24103 100644 --- a/test/cpp/qps/generic_async_streaming_ping_pong_test.cc +++ b/test/cpp/qps/generic_async_streaming_ping_pong_test.cc @@ -60,7 +60,7 @@ static void RunGenericAsyncStreamingPingPong() { bbuf->set_req_size(0); ServerConfig server_config; - server_config.set_server_type(ASYNC_SERVER); + server_config.set_server_type(ASYNC_GENERIC_SERVER); server_config.set_host("localhost"); server_config.set_async_server_threads(1); diff --git a/test/cpp/qps/qps_driver.cc b/test/cpp/qps/qps_driver.cc index 9816a09592..680e4b19b9 100644 --- a/test/cpp/qps/qps_driver.cc +++ b/test/cpp/qps/qps_driver.cc @@ -153,7 +153,7 @@ static void QpsDriver() { ServerConfig server_config; server_config.set_server_type(server_type); - server_config.set_host("localhost"); + server_config.set_host("::"); // Use the wildcard server address server_config.set_async_server_threads(FLAGS_async_server_threads); if (FLAGS_secure_test) { @@ -170,7 +170,7 @@ static void QpsDriver() { GPR_ASSERT(!client_config.payload_config().has_bytebuf_params() || (client_config.client_type() == ASYNC_CLIENT && client_config.rpc_type() == STREAMING && - server_config.server_type() == ASYNC_SERVER)); + server_config.server_type() == ASYNC_GENERIC_SERVER)); const auto result = RunScenario( client_config, FLAGS_num_clients, server_config, FLAGS_num_servers, diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc index c0276d05b3..6316605aaf 100644 --- a/test/cpp/qps/qps_worker.cc +++ b/test/cpp/qps/qps_worker.cc @@ -61,6 +61,11 @@ namespace grpc { namespace testing { static std::unique_ptr<Client> CreateClient(const ClientConfig& config) { + gpr_log(GPR_INFO, "Starting client of type %s %s %d", + ClientType_Name(config.client_type()).c_str(), + RpcType_Name(config.rpc_type()).c_str(), + config.payload_config().has_bytebuf_params()); + switch (config.client_type()) { case ClientType::SYNC_CLIENT: return (config.rpc_type() == RpcType::UNARY) @@ -81,6 +86,9 @@ static std::unique_ptr<Client> CreateClient(const ClientConfig& config) { static void LimitCores(int cores) {} static std::unique_ptr<Server> CreateServer(const ServerConfig& config) { + gpr_log(GPR_INFO, "Starting server of type %s", + ServerType_Name(config.server_type()).c_str()); + if (config.core_limit() > 0) { LimitCores(config.core_limit()); } @@ -89,6 +97,8 @@ static std::unique_ptr<Server> CreateServer(const ServerConfig& config) { return CreateSynchronousServer(config); case ServerType::ASYNC_SERVER: return CreateAsyncServer(config); + case ServerType::ASYNC_GENERIC_SERVER: + return CreateAsyncGenericServer(config); default: abort(); } @@ -169,22 +179,29 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { if (!args.has_setup()) { return Status(StatusCode::INVALID_ARGUMENT, ""); } + gpr_log(GPR_INFO, "RunClientBody: about to create client"); auto client = CreateClient(args.setup()); if (!client) { return Status(StatusCode::INVALID_ARGUMENT, ""); } + gpr_log(GPR_INFO, "RunClientBody: client created"); ClientStatus status; if (!stream->Write(status)) { return Status(StatusCode::UNKNOWN, ""); } + gpr_log(GPR_INFO, "RunClientBody: creation status reported"); while (stream->Read(&args)) { + gpr_log(GPR_INFO, "RunClientBody: Message read"); if (!args.has_mark()) { + gpr_log(GPR_INFO, "RunClientBody: Message is not a mark!"); return Status(StatusCode::INVALID_ARGUMENT, ""); } *status.mutable_stats() = client->Mark(args.mark().reset()); stream->Write(status); + gpr_log(GPR_INFO, "RunClientBody: Mark response given"); } + gpr_log(GPR_INFO, "RunClientBody: Returning"); return Status::OK; } @@ -200,24 +217,31 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { if (server_port_ != 0) { args.mutable_setup()->set_port(server_port_); } + gpr_log(GPR_INFO, "RunServerBody: about to create server"); auto server = CreateServer(args.setup()); if (!server) { return Status(StatusCode::INVALID_ARGUMENT, ""); } + gpr_log(GPR_INFO, "RunServerBody: server created"); ServerStatus status; status.set_port(server->port()); status.set_cores(server->cores()); if (!stream->Write(status)) { return Status(StatusCode::UNKNOWN, ""); } + gpr_log(GPR_INFO, "RunServerBody: creation status reported"); while (stream->Read(&args)) { + gpr_log(GPR_INFO, "RunServerBody: Message read"); if (!args.has_mark()) { + gpr_log(GPR_INFO, "RunServerBody: Message not a mark!"); return Status(StatusCode::INVALID_ARGUMENT, ""); } *status.mutable_stats() = server->Mark(args.mark().reset()); stream->Write(status); + gpr_log(GPR_INFO, "RunServerBody: Mark response given"); } + gpr_log(GPR_INFO, "RunServerBody: Returning"); return Status::OK; } diff --git a/test/cpp/qps/server.h b/test/cpp/qps/server.h index 32a3e85026..196fdac8f3 100644 --- a/test/cpp/qps/server.h +++ b/test/cpp/qps/server.h @@ -108,6 +108,7 @@ class Server { std::unique_ptr<Server> CreateSynchronousServer(const ServerConfig& config); std::unique_ptr<Server> CreateAsyncServer(const ServerConfig& config); +std::unique_ptr<Server> CreateAsyncGenericServer(const ServerConfig& config); } // namespace testing } // namespace grpc diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index 32a8fc48e6..b6656526b0 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -50,8 +50,8 @@ #include <grpc/support/log.h> #include <gtest/gtest.h> -#include "test/cpp/qps/server.h" #include "src/proto/grpc/testing/services.grpc.pb.h" +#include "test/cpp/qps/server.h" namespace grpc { namespace testing { @@ -85,7 +85,13 @@ class AsyncQpsServerTest : public Server { register_service(&builder, &async_service_); - for (int i = 0; i < config.async_server_threads(); i++) { + int num_threads = config.async_server_threads(); + if (num_threads <= 0) { // dynamic sizing + num_threads = cores(); + gpr_log(GPR_INFO, "Sizing async server to %d threads", num_threads); + } + + for (int i = 0; i < num_threads; i++) { srv_cqs_.emplace_back(builder.AddCompletionQueue()); } @@ -96,8 +102,8 @@ class AsyncQpsServerTest : public Server { auto process_rpc_bound = std::bind(process_rpc, config.payload_config(), _1, _2); - for (int i = 0; i < 10000 / config.async_server_threads(); i++) { - for (int j = 0; j < config.async_server_threads(); j++) { + for (int i = 0; i < 10000 / num_threads; i++) { + for (int j = 0; j < num_threads; j++) { if (request_unary_function) { auto request_unary = std::bind(request_unary_function, &async_service_, _1, _2, _3, @@ -115,10 +121,10 @@ class AsyncQpsServerTest : public Server { } } - for (int i = 0; i < config.async_server_threads(); i++) { + for (int i = 0; i < num_threads; i++) { shutdown_state_.emplace_back(new PerThreadShutdownState()); } - for (int i = 0; i < config.async_server_threads(); i++) { + for (int i = 0; i < num_threads; i++) { threads_.emplace_back(&AsyncQpsServerTest::ThreadFunc, this, i); } } @@ -373,7 +379,7 @@ static Status ProcessGenericRPC(const PayloadConfig &payload_config, const ByteBuffer *request, ByteBuffer *response) { int resp_size = payload_config.bytebuf_params().resp_size(); - std::unique_ptr<char> buf(new char[resp_size]); + std::unique_ptr<char[]> buf(new char[resp_size]); gpr_slice s = gpr_slice_from_copied_buffer(buf.get(), resp_size); Slice slice(s, Slice::STEAL_REF); *response = ByteBuffer(&slice, 1); |