aboutsummaryrefslogtreecommitdiffhomepage
path: root/test
diff options
context:
space:
mode:
authorGravatar vjpai <vpai@google.com>2015-05-07 10:15:04 -0700
committerGravatar vjpai <vpai@google.com>2015-05-07 10:15:04 -0700
commitf993194b088f62d91f79462e348056e484bdb565 (patch)
tree3c35f02df0d9366f86a4936ca80325ef779e7eb4 /test
parent8dd7aab00e4f29caf7fbc360a320e21959dd8578 (diff)
parent0fe994401a387d4968f69dc9c75421364fdbb577 (diff)
Merge branch 'stream_ctx' into poisson
Conflicts: test/cpp/qps/client_sync.cc
Diffstat (limited to 'test')
-rw-r--r--test/cpp/qps/client_sync.cc49
-rw-r--r--test/cpp/qps/qps_driver.cc9
2 files changed, 36 insertions, 22 deletions
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc
index 72bcead27d..6a89c5acc2 100644
--- a/test/cpp/qps/client_sync.cc
+++ b/test/cpp/qps/client_sync.cc
@@ -68,12 +68,12 @@ class SynchronousClient : public Client {
public:
SynchronousClient(const ClientConfig& config) : Client(config) {
num_threads_ =
- config.outstanding_rpcs_per_channel() * config.client_channels();
+ config.outstanding_rpcs_per_channel() * config.client_channels();
responses_.resize(num_threads_);
SetupLoadTest(config, num_threads_);
}
- virtual ~SynchronousClient() {};
+ virtual ~SynchronousClient(){};
protected:
void WaitToIssue(int thread_idx) {
@@ -89,9 +89,11 @@ class SynchronousClient : public Client {
class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient {
public:
- SynchronousUnaryClient(const ClientConfig& config):
- SynchronousClient(config) {StartThreads(num_threads_);}
- ~SynchronousUnaryClient() {EndThreads();}
+ SynchronousUnaryClient(const ClientConfig& config)
+ : SynchronousClient(config) {
+ StartThreads(num_threads_);
+ }
+ ~SynchronousUnaryClient() { EndThreads(); }
bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE {
WaitToIssue(thread_idx);
@@ -107,44 +109,47 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient {
class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient {
public:
- SynchronousStreamingClient(const ClientConfig& config):
- SynchronousClient(config) {
- for (size_t thread_idx=0;thread_idx<num_threads_;thread_idx++){
+ SynchronousStreamingClient(const ClientConfig& config)
+ : SynchronousClient(config), context_(num_threads_), stream_(num_threads_) {
+ for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) {
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
- stream_ = stub->StreamingCall(&context_);
+ stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
}
StartThreads(num_threads_);
}
~SynchronousStreamingClient() {
EndThreads();
- if (stream_) {
- SimpleResponse response;
- stream_->WritesDone();
- EXPECT_TRUE(stream_->Finish().IsOk());
+ for (auto stream = stream_.begin(); stream != stream_.end(); stream++) {
+ if (*stream) {
+ (*stream)->WritesDone();
+ EXPECT_TRUE((*stream)->Finish().IsOk());
+ }
}
}
bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE {
WaitToIssue(thread_idx);
double start = Timer::Now();
- if (stream_->Write(request_) && stream_->Read(&responses_[thread_idx])) {
+ if (stream_[thread_idx]->Write(request_) &&
+ stream_[thread_idx]->Read(&responses_[thread_idx])) {
histogram->Add((Timer::Now() - start) * 1e9);
return true;
}
return false;
}
- private:
- grpc::ClientContext context_;
- std::unique_ptr<grpc::ClientReaderWriter<SimpleRequest,
- SimpleResponse>> stream_;
+
+ private:
+ std::vector<grpc::ClientContext> context_;
+ std::vector<std::unique_ptr<grpc::ClientReaderWriter<
+ SimpleRequest, SimpleResponse>>> stream_;
};
-std::unique_ptr<Client>
-CreateSynchronousUnaryClient(const ClientConfig& config) {
+std::unique_ptr<Client> CreateSynchronousUnaryClient(
+ const ClientConfig& config) {
return std::unique_ptr<Client>(new SynchronousUnaryClient(config));
}
-std::unique_ptr<Client>
-CreateSynchronousStreamingClient(const ClientConfig& config) {
+std::unique_ptr<Client> CreateSynchronousStreamingClient(
+ const ClientConfig& config) {
return std::unique_ptr<Client>(new SynchronousStreamingClient(config));
}
diff --git a/test/cpp/qps/qps_driver.cc b/test/cpp/qps/qps_driver.cc
index d0ec5c2b4b..dfa3f06753 100644
--- a/test/cpp/qps/qps_driver.cc
+++ b/test/cpp/qps/qps_driver.cc
@@ -135,6 +135,15 @@ int main(int argc, char** argv) {
server_config.set_threads(FLAGS_server_threads);
server_config.set_enable_ssl(FLAGS_enable_ssl);
+ // If we're running a sync-server streaming test, make sure
+ // that we have at least as many threads as the active streams
+ // or else threads will be blocked from forward progress and the
+ // client will deadlock on a timer.
+ GPR_ASSERT(!(server_type == grpc::testing::SYNCHRONOUS_SERVER &&
+ rpc_type == grpc::testing::STREAMING &&
+ FLAGS_server_threads < FLAGS_client_channels *
+ FLAGS_outstanding_rpcs_per_channel));
+
auto result = RunScenario(client_config, FLAGS_num_clients,
server_config, FLAGS_num_servers,
FLAGS_warmup_seconds, FLAGS_benchmark_seconds,