diff options
Diffstat (limited to 'test/cpp')
-rw-r--r-- | test/cpp/end2end/async_end2end_test.cc | 5 | ||||
-rw-r--r-- | test/cpp/qps/client_sync.cc | 51 | ||||
-rw-r--r-- | test/cpp/qps/qps_driver.cc | 9 |
3 files changed, 40 insertions, 25 deletions
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 6c0dfadbb9..0e5a3b05af 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -198,8 +198,9 @@ TEST_F(AsyncEnd2endTest, AsyncNextRpc) { stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_, tag(1))); std::chrono::system_clock::time_point time_now( - std::chrono::system_clock::now()), - time_limit(std::chrono::system_clock::now() + std::chrono::seconds(5)); + std::chrono::system_clock::now()); + std::chrono::system_clock::time_point time_limit( + std::chrono::system_clock::now() + std::chrono::seconds(10)); verify_timed_ok(&srv_cq_, -1, true, time_now, CompletionQueue::TIMEOUT); verify_timed_ok(&cli_cq_, -1, true, time_now, CompletionQueue::TIMEOUT); diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index 0809eb5b6c..264293561d 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -66,11 +66,11 @@ class SynchronousClient : public Client { public: SynchronousClient(const ClientConfig& config) : Client(config) { num_threads_ = - config.outstanding_rpcs_per_channel() * config.client_channels(); + config.outstanding_rpcs_per_channel() * config.client_channels(); responses_.resize(num_threads_); } - virtual ~SynchronousClient() {}; + virtual ~SynchronousClient(){}; protected: size_t num_threads_; @@ -79,10 +79,12 @@ class SynchronousClient : public Client { class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient { public: - SynchronousUnaryClient(const ClientConfig& config): - SynchronousClient(config) {StartThreads(num_threads_);} - ~SynchronousUnaryClient() {EndThreads();} - + SynchronousUnaryClient(const ClientConfig& config) + : SynchronousClient(config) { + StartThreads(num_threads_); + } + ~SynchronousUnaryClient() { EndThreads(); } + bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); double start = Timer::Now(); @@ -96,43 +98,46 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient { class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient { public: - SynchronousStreamingClient(const ClientConfig& config): - SynchronousClient(config) { - for (size_t thread_idx=0;thread_idx<num_threads_;thread_idx++){ + SynchronousStreamingClient(const ClientConfig& config) + : SynchronousClient(config), context_(num_threads_), stream_(num_threads_) { + for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_ = stub->StreamingCall(&context_); + stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); } StartThreads(num_threads_); } ~SynchronousStreamingClient() { EndThreads(); - if (stream_) { - SimpleResponse response; - stream_->WritesDone(); - EXPECT_TRUE(stream_->Finish().IsOk()); + for (auto stream = stream_.begin(); stream != stream_.end(); stream++) { + if (*stream) { + (*stream)->WritesDone(); + EXPECT_TRUE((*stream)->Finish().IsOk()); + } } } bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE { double start = Timer::Now(); - if (stream_->Write(request_) && stream_->Read(&responses_[thread_idx])) { + if (stream_[thread_idx]->Write(request_) && + stream_[thread_idx]->Read(&responses_[thread_idx])) { histogram->Add((Timer::Now() - start) * 1e9); return true; } return false; } - private: - grpc::ClientContext context_; - std::unique_ptr<grpc::ClientReaderWriter<SimpleRequest, - SimpleResponse>> stream_; + + private: + std::vector<grpc::ClientContext> context_; + std::vector<std::unique_ptr<grpc::ClientReaderWriter< + SimpleRequest, SimpleResponse>>> stream_; }; -std::unique_ptr<Client> -CreateSynchronousUnaryClient(const ClientConfig& config) { +std::unique_ptr<Client> CreateSynchronousUnaryClient( + const ClientConfig& config) { return std::unique_ptr<Client>(new SynchronousUnaryClient(config)); } -std::unique_ptr<Client> -CreateSynchronousStreamingClient(const ClientConfig& config) { +std::unique_ptr<Client> CreateSynchronousStreamingClient( + const ClientConfig& config) { return std::unique_ptr<Client>(new SynchronousStreamingClient(config)); } diff --git a/test/cpp/qps/qps_driver.cc b/test/cpp/qps/qps_driver.cc index 93b1247d73..fc8e04201c 100644 --- a/test/cpp/qps/qps_driver.cc +++ b/test/cpp/qps/qps_driver.cc @@ -94,6 +94,15 @@ int main(int argc, char** argv) { server_config.set_threads(FLAGS_server_threads); server_config.set_enable_ssl(FLAGS_enable_ssl); + // If we're running a sync-server streaming test, make sure + // that we have at least as many threads as the active streams + // or else threads will be blocked from forward progress and the + // client will deadlock on a timer. + GPR_ASSERT(!(server_type == grpc::testing::SYNCHRONOUS_SERVER && + rpc_type == grpc::testing::STREAMING && + FLAGS_server_threads < FLAGS_client_channels * + FLAGS_outstanding_rpcs_per_channel)); + auto result = RunScenario(client_config, FLAGS_num_clients, server_config, FLAGS_num_servers, FLAGS_warmup_seconds, FLAGS_benchmark_seconds, |