diff options
Diffstat (limited to 'test/cpp')
21 files changed, 428 insertions, 103 deletions
diff --git a/test/cpp/common/alarm_cpp_test.cc b/test/cpp/common/alarm_cpp_test.cc index 4745ef14ec..d4381c0515 100644 --- a/test/cpp/common/alarm_cpp_test.cc +++ b/test/cpp/common/alarm_cpp_test.cc @@ -55,6 +55,53 @@ TEST(AlarmTest, RegularExpiry) { EXPECT_EQ(junk, output_tag); } +TEST(AlarmTest, RegularExpiryChrono) { + CompletionQueue cq; + void* junk = reinterpret_cast<void*>(1618033); + std::chrono::system_clock::time_point one_sec_deadline = + std::chrono::system_clock::now() + std::chrono::seconds(1); + Alarm alarm(&cq, one_sec_deadline, junk); + + void* output_tag; + bool ok; + const CompletionQueue::NextStatus status = cq.AsyncNext( + (void**)&output_tag, &ok, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(2)); + + EXPECT_EQ(status, CompletionQueue::GOT_EVENT); + EXPECT_TRUE(ok); + EXPECT_EQ(junk, output_tag); +} + +TEST(AlarmTest, ZeroExpiry) { + CompletionQueue cq; + void* junk = reinterpret_cast<void*>(1618033); + Alarm alarm(&cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(0), junk); + + void* output_tag; + bool ok; + const CompletionQueue::NextStatus status = cq.AsyncNext( + (void**)&output_tag, &ok, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(0)); + + EXPECT_EQ(status, CompletionQueue::GOT_EVENT); + EXPECT_TRUE(ok); + EXPECT_EQ(junk, output_tag); +} + +TEST(AlarmTest, NegativeExpiry) { + CompletionQueue cq; + void* junk = reinterpret_cast<void*>(1618033); + Alarm alarm(&cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(-1), junk); + + void* output_tag; + bool ok; + const CompletionQueue::NextStatus status = cq.AsyncNext( + (void**)&output_tag, &ok, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(0)); + + EXPECT_EQ(status, CompletionQueue::GOT_EVENT); + EXPECT_TRUE(ok); + EXPECT_EQ(junk, output_tag); +} + TEST(AlarmTest, Cancellation) { CompletionQueue cq; void* junk = reinterpret_cast<void*>(1618033); diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index a15cbd7ee2..9ca3bf98f8 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -989,6 +989,10 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { if (server_try_cancel == CANCEL_AFTER_PROCESSING) { ServerTryCancel(&srv_ctx); + + // Client reads may fail bacause it is notified that the stream is + // cancelled. + ignore_cq_result = true; } // Client attemts to read the three messages from the server diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index ce8e4d2a10..dc2c4f6426 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -437,9 +437,10 @@ class End2endServerTryCancelTest : public End2endTest { break; case CANCEL_AFTER_PROCESSING: - // Server cancelled after writing all messages. Client must have read - // all messages - EXPECT_EQ(num_msgs_read, kNumResponseStreamsMsgs); + // Even though the Server cancelled after writing all messages, the RPC + // may be cancelled before the Client got a chance to read all the + // messages. + EXPECT_LE(num_msgs_read, kNumResponseStreamsMsgs); break; default: { @@ -519,7 +520,11 @@ class End2endServerTryCancelTest : public End2endTest { case CANCEL_AFTER_PROCESSING: EXPECT_EQ(num_msgs_sent, num_messages); - EXPECT_EQ(num_msgs_read, num_msgs_sent); + + // The Server cancelled after reading the last message and after writing + // the message to the client. However, the RPC cancellation might have + // taken effect before the client actually read the response. + EXPECT_LE(num_msgs_read, num_msgs_sent); break; default: @@ -904,9 +909,9 @@ TEST_P(End2endTest, SimultaneousReadWritesDone) { std::thread reader_thread(ReaderThreadFunc, stream.get(), &ev); gpr_event_wait(&ev, gpr_inf_future(GPR_CLOCK_REALTIME)); stream->WritesDone(); + reader_thread.join(); Status s = stream->Finish(); EXPECT_TRUE(s.ok()); - reader_thread.join(); } TEST_P(End2endTest, ChannelState) { diff --git a/test/cpp/end2end/test_service_impl.cc b/test/cpp/end2end/test_service_impl.cc index 66d11d0dfc..7c3e514eff 100644 --- a/test/cpp/end2end/test_service_impl.cc +++ b/test/cpp/end2end/test_service_impl.cc @@ -326,7 +326,11 @@ void TestServiceImpl::ServerTryCancel(ServerContext* context) { EXPECT_FALSE(context->IsCancelled()); context->TryCancel(); gpr_log(GPR_INFO, "Server called TryCancel() to cancel the request"); - EXPECT_TRUE(context->IsCancelled()); + // Now wait until it's really canceled + while (!context->IsCancelled()) { + gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_micros(1000, GPR_TIMESPAN))); + } } } // namespace testing diff --git a/test/cpp/grpclb/grpclb_api_test.cc b/test/cpp/grpclb/grpclb_api_test.cc new file mode 100644 index 0000000000..bd4885fb4c --- /dev/null +++ b/test/cpp/grpclb/grpclb_api_test.cc @@ -0,0 +1,133 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <gtest/gtest.h> +#include <string> + +#include "src/core/client_config/lb_policies/load_balancer_api.h" +#include "src/proto/grpc/lb/v0/load_balancer.pb.h" // C++ version + +namespace grpc { +namespace { + +using grpc::lb::v0::LoadBalanceRequest; +using grpc::lb::v0::LoadBalanceResponse; + +class GrpclbTest : public ::testing::Test {}; + +TEST_F(GrpclbTest, CreateRequest) { + const std::string service_name = "AServiceName"; + LoadBalanceRequest request; + grpc_grpclb_request* c_req = grpc_grpclb_request_create(service_name.c_str()); + gpr_slice slice = grpc_grpclb_request_encode(c_req); + const int num_bytes_written = GPR_SLICE_LENGTH(slice); + EXPECT_GT(num_bytes_written, 0); + request.ParseFromArray(GPR_SLICE_START_PTR(slice), num_bytes_written); + EXPECT_EQ(request.initial_request().name(), service_name); + gpr_slice_unref(slice); + grpc_grpclb_request_destroy(c_req); +} + +TEST_F(GrpclbTest, ParseResponse) { + LoadBalanceResponse response; + const std::string client_config_str = "I'm a client config"; + auto* initial_response = response.mutable_initial_response(); + initial_response->set_client_config(client_config_str); + auto* client_stats_report_interval = + initial_response->mutable_client_stats_report_interval(); + client_stats_report_interval->set_seconds(123); + client_stats_report_interval->set_nanos(456); + + const std::string encoded_response = response.SerializeAsString(); + gpr_slice encoded_slice = + gpr_slice_from_copied_string(encoded_response.c_str()); + grpc_grpclb_response* c_response = grpc_grpclb_response_parse(encoded_slice); + EXPECT_TRUE(c_response->has_initial_response); + EXPECT_TRUE(c_response->initial_response.has_client_config); + EXPECT_FALSE(c_response->initial_response.has_load_balancer_delegate); + EXPECT_TRUE(strcmp(c_response->initial_response.client_config, + client_config_str.c_str()) == 0); + EXPECT_EQ(c_response->initial_response.client_stats_report_interval.seconds, + 123); + EXPECT_EQ(c_response->initial_response.client_stats_report_interval.nanos, + 456); + gpr_slice_unref(encoded_slice); + grpc_grpclb_response_destroy(c_response); +} + +TEST_F(GrpclbTest, ParseResponseServerList) { + LoadBalanceResponse response; + auto* serverlist = response.mutable_server_list(); + auto* server = serverlist->add_servers(); + server->set_ip_address("127.0.0.1"); + server->set_port(12345); + server->set_drop_request(true); + server = response.mutable_server_list()->add_servers(); + server->set_ip_address("10.0.0.1"); + server->set_port(54321); + server->set_drop_request(false); + auto* expiration_interval = serverlist->mutable_expiration_interval(); + expiration_interval->set_seconds(888); + expiration_interval->set_nanos(999); + + const std::string encoded_response = response.SerializeAsString(); + gpr_slice encoded_slice = + gpr_slice_from_copied_string(encoded_response.c_str()); + grpc_grpclb_serverlist* c_serverlist = + grpc_grpclb_response_parse_serverlist(encoded_slice); + ASSERT_EQ(c_serverlist->num_servers, 2ul); + EXPECT_TRUE(c_serverlist->servers[0]->has_ip_address); + EXPECT_TRUE(strcmp(c_serverlist->servers[0]->ip_address, "127.0.0.1") == 0); + EXPECT_EQ(c_serverlist->servers[0]->port, 12345); + EXPECT_TRUE(c_serverlist->servers[0]->drop_request); + EXPECT_TRUE(c_serverlist->servers[1]->has_ip_address); + EXPECT_TRUE(strcmp(c_serverlist->servers[1]->ip_address, "10.0.0.1") == 0); + EXPECT_EQ(c_serverlist->servers[1]->port, 54321); + EXPECT_FALSE(c_serverlist->servers[1]->drop_request); + + EXPECT_TRUE(c_serverlist->expiration_interval.has_seconds); + EXPECT_EQ(c_serverlist->expiration_interval.seconds, 888); + EXPECT_TRUE(c_serverlist->expiration_interval.has_nanos); + EXPECT_EQ(c_serverlist->expiration_interval.nanos, 999); + + gpr_slice_unref(encoded_slice); + grpc_grpclb_destroy_serverlist(c_serverlist); +} + +} // namespace +} // namespace grpc + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/test/cpp/interop/metrics_client.cc b/test/cpp/interop/metrics_client.cc index 0c140ffd85..bd48c7d4ef 100644 --- a/test/cpp/interop/metrics_client.cc +++ b/test/cpp/interop/metrics_client.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -37,39 +37,45 @@ #include <gflags/gflags.h> #include <grpc++/grpc++.h> -#include "test/cpp/util/metrics_server.h" -#include "test/cpp/util/test_config.h" #include "src/proto/grpc/testing/metrics.grpc.pb.h" #include "src/proto/grpc/testing/metrics.pb.h" +#include "test/cpp/util/metrics_server.h" +#include "test/cpp/util/test_config.h" DEFINE_string(metrics_server_address, "", "The metrics server addresses in the fomrat <hostname>:<port>"); +DEFINE_bool(total_only, false, + "If true, this prints only the total value of all gauges"); + +int kDeadlineSecs = 10; using grpc::testing::EmptyMessage; using grpc::testing::GaugeResponse; using grpc::testing::MetricsService; using grpc::testing::MetricsServiceImpl; -void PrintMetrics(const grpc::string& server_address) { - gpr_log(GPR_INFO, "creating a channel to %s", server_address.c_str()); - std::shared_ptr<grpc::Channel> channel( - grpc::CreateChannel(server_address, grpc::InsecureChannelCredentials())); - - std::unique_ptr<MetricsService::Stub> stub(MetricsService::NewStub(channel)); - +// Prints the values of all Gauges (unless total_only is set to 'true' in which +// case this only prints the sum of all gauge values). +bool PrintMetrics(std::unique_ptr<MetricsService::Stub> stub, bool total_only) { grpc::ClientContext context; EmptyMessage message; + std::chrono::system_clock::time_point deadline = + std::chrono::system_clock::now() + std::chrono::seconds(kDeadlineSecs); + + context.set_deadline(deadline); + std::unique_ptr<grpc::ClientReader<GaugeResponse>> reader( stub->GetAllGauges(&context, message)); GaugeResponse gauge_response; long overall_qps = 0; - int idx = 0; while (reader->Read(&gauge_response)) { if (gauge_response.value_case() == GaugeResponse::kLongValue) { - gpr_log(GPR_INFO, "Gauge: %d (%s: %ld)", ++idx, - gauge_response.name().c_str(), gauge_response.long_value()); + if (!total_only) { + gpr_log(GPR_INFO, "%s: %ld", gauge_response.name().c_str(), + gauge_response.long_value()); + } overall_qps += gauge_response.long_value(); } else { gpr_log(GPR_INFO, "Gauge %s is not a long value", @@ -77,12 +83,14 @@ void PrintMetrics(const grpc::string& server_address) { } } - gpr_log(GPR_INFO, "OVERALL: %ld", overall_qps); + gpr_log(GPR_INFO, "%ld", overall_qps); const grpc::Status status = reader->Finish(); if (!status.ok()) { gpr_log(GPR_ERROR, "Error in getting metrics from the client"); } + + return status.ok(); } int main(int argc, char** argv) { @@ -97,7 +105,12 @@ int main(int argc, char** argv) { return 1; } - PrintMetrics(FLAGS_metrics_server_address); + std::shared_ptr<grpc::Channel> channel(grpc::CreateChannel( + FLAGS_metrics_server_address, grpc::InsecureChannelCredentials())); + + if (!PrintMetrics(MetricsService::NewStub(channel), FLAGS_total_only)) { + return 1; + } return 0; } diff --git a/test/cpp/interop/reconnect_interop_client.cc b/test/cpp/interop/reconnect_interop_client.cc index 1f6b352db1..79a60cc860 100644 --- a/test/cpp/interop/reconnect_interop_client.cc +++ b/test/cpp/interop/reconnect_interop_client.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without diff --git a/test/cpp/interop/server.cc b/test/cpp/interop/server_main.cc index 18ac35d551..18ac35d551 100644 --- a/test/cpp/interop/server.cc +++ b/test/cpp/interop/server_main.cc diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index c94a523fa1..2dc83f0f29 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -46,10 +46,10 @@ #include "src/proto/grpc/testing/payloads.grpc.pb.h" #include "src/proto/grpc/testing/services.grpc.pb.h" -#include "test/cpp/qps/limit_cores.h" #include "test/cpp/qps/histogram.h" #include "test/cpp/qps/interarrival.h" -#include "test/cpp/qps/timer.h" +#include "test/cpp/qps/limit_cores.h" +#include "test/cpp/qps/usage_timer.h" #include "test/cpp/util/create_test_channel.h" namespace grpc { @@ -112,12 +112,12 @@ class ClientRequestCreator<ByteBuffer> { class Client { public: - Client() : timer_(new Timer), interarrival_timer_() {} + Client() : timer_(new UsageTimer), interarrival_timer_() {} virtual ~Client() {} ClientStats Mark(bool reset) { Histogram latencies; - Timer::Result timer_result; + UsageTimer::Result timer_result; // avoid std::vector for old compilers that expect a copy constructor if (reset) { @@ -125,7 +125,7 @@ class Client { for (size_t i = 0; i < threads_.size(); i++) { threads_[i]->BeginSwap(&to_merge[i]); } - std::unique_ptr<Timer> timer(new Timer); + std::unique_ptr<UsageTimer> timer(new UsageTimer); timer_.swap(timer); for (size_t i = 0; i < threads_.size(); i++) { threads_[i]->EndSwap(); @@ -294,7 +294,7 @@ class Client { }; std::vector<std::unique_ptr<Thread>> threads_; - std::unique_ptr<Timer> timer_; + std::unique_ptr<UsageTimer> timer_; InterarrivalTimer interarrival_timer_; std::vector<gpr_timespec> next_time_; diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 9e8767d103..9e9da9909a 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -54,7 +54,7 @@ #include "src/proto/grpc/testing/services.grpc.pb.h" #include "test/cpp/qps/client.h" -#include "test/cpp/qps/timer.h" +#include "test/cpp/qps/usage_timer.h" #include "test/cpp/util/create_test_channel.h" namespace grpc { @@ -107,14 +107,14 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE { switch (next_state_) { case State::READY: - start_ = Timer::Now(); + start_ = UsageTimer::Now(); response_reader_ = start_req_(stub_, &context_, req_, cq_); response_reader_->Finish(&response_, &status_, ClientRpcContext::tag(this)); next_state_ = State::RESP_DONE; return true; case State::RESP_DONE: - hist->Add((Timer::Now() - start_) * 1e9); + hist->Add((UsageTimer::Now() - start_) * 1e9); callback_(status_, &response_); next_state_ = State::INVALID; return false; @@ -287,8 +287,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { next_state_(State::INVALID), callback_(on_done), next_issue_(next_issue), - start_req_(start_req), - start_(Timer::Now()) {} + start_req_(start_req) {} ~ClientRpcContextStreamingImpl() GRPC_OVERRIDE {} void Start(CompletionQueue* cq) GRPC_OVERRIDE { cq_ = cq; @@ -314,7 +313,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { if (!ok) { return false; } - start_ = Timer::Now(); + start_ = UsageTimer::Now(); next_state_ = State::WRITE_DONE; stream_->Write(req_, ClientRpcContext::tag(this)); return true; @@ -327,7 +326,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { return true; break; case State::READ_DONE: - hist->Add((Timer::Now() - start_) * 1e9); + hist->Add((UsageTimer::Now() - start_) * 1e9); callback_(status_, &response_); next_state_ = State::STREAM_IDLE; break; // loop around @@ -415,8 +414,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { next_state_(State::INVALID), callback_(on_done), next_issue_(next_issue), - start_req_(start_req), - start_(Timer::Now()) {} + start_req_(start_req) {} ~ClientRpcContextGenericStreamingImpl() GRPC_OVERRIDE {} void Start(CompletionQueue* cq) GRPC_OVERRIDE { cq_ = cq; @@ -445,7 +443,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { if (!ok) { return false; } - start_ = Timer::Now(); + start_ = UsageTimer::Now(); next_state_ = State::WRITE_DONE; stream_->Write(req_, ClientRpcContext::tag(this)); return true; @@ -458,7 +456,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { return true; break; case State::READ_DONE: - hist->Add((Timer::Now() - start_) * 1e9); + hist->Add((UsageTimer::Now() - start_) * 1e9); callback_(status_, &response_); next_state_ = State::STREAM_IDLE; break; // loop around diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index edfc246a25..4284e07bd4 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -58,7 +58,7 @@ #include "test/cpp/qps/client.h" #include "test/cpp/qps/histogram.h" #include "test/cpp/qps/interarrival.h" -#include "test/cpp/qps/timer.h" +#include "test/cpp/qps/usage_timer.h" namespace grpc { namespace testing { @@ -104,12 +104,12 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient { bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE { WaitToIssue(thread_idx); auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - double start = Timer::Now(); + double start = UsageTimer::Now(); GPR_TIMER_SCOPE("SynchronousUnaryClient::ThreadFunc", 0); grpc::ClientContext context; grpc::Status s = stub->UnaryCall(&context, request_, &responses_[thread_idx]); - histogram->Add((Timer::Now() - start) * 1e9); + histogram->Add((UsageTimer::Now() - start) * 1e9); return s.ok(); } }; @@ -143,10 +143,10 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient { bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE { WaitToIssue(thread_idx); GPR_TIMER_SCOPE("SynchronousStreamingClient::ThreadFunc", 0); - double start = Timer::Now(); + double start = UsageTimer::Now(); if (stream_[thread_idx]->Write(request_) && stream_[thread_idx]->Read(&responses_[thread_idx])) { - histogram->Add((Timer::Now() - start) * 1e9); + histogram->Add((UsageTimer::Now() - start) * 1e9); return true; } return false; diff --git a/test/cpp/qps/qps-sweep.sh b/test/cpp/qps/qps-sweep.sh index 7a35788849..9d3f053a7b 100755 --- a/test/cpp/qps/qps-sweep.sh +++ b/test/cpp/qps/qps-sweep.sh @@ -72,7 +72,7 @@ for secure in true false; do --server_type=ASYNC_GENERIC_SERVER --outstanding_rpcs_per_channel=$deep \ --client_channels=$wide --bbuf_req_size=0 --bbuf_resp_size=0 \ --async_client_threads=0 --async_server_threads=0 --secure_test=$secure \ - --num_servers=1 --num_clients=0 |& tee /tmp/qps-test.$$ + --num_servers=1 --num_clients=0 2>&1 | tee /tmp/qps-test.$$ # Scenario 2b: QPS with a single server core "$bins"/opt/qps_driver --rpc_type=STREAMING --client_type=ASYNC_CLIENT \ diff --git a/test/cpp/qps/qps_openloop_test.cc b/test/cpp/qps/qps_openloop_test.cc index 0ac41d9f96..27f266b32b 100644 --- a/test/cpp/qps/qps_openloop_test.cc +++ b/test/cpp/qps/qps_openloop_test.cc @@ -35,6 +35,7 @@ #include <grpc/support/log.h> +#include "test/core/util/test_config.h" #include "test/cpp/qps/driver.h" #include "test/cpp/qps/report.h" #include "test/cpp/util/benchmark_config.h" @@ -55,11 +56,11 @@ static void RunQPS() { client_config.set_async_client_threads(8); client_config.set_rpc_type(STREAMING); client_config.mutable_load_params()->mutable_poisson()->set_offered_load( - 1000.0); + 1000.0 / GRPC_TEST_SLOWDOWN_FACTOR); ServerConfig server_config; server_config.set_server_type(ASYNC_SERVER); - server_config.set_async_server_threads(4); + server_config.set_async_server_threads(8); const auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2); diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc index 9442017ddf..b83e9d1dd7 100644 --- a/test/cpp/qps/qps_worker.cc +++ b/test/cpp/qps/qps_worker.cc @@ -101,6 +101,19 @@ static std::unique_ptr<Server> CreateServer(const ServerConfig& config) { abort(); } +class ScopedProfile GRPC_FINAL { + public: + ScopedProfile(const char* filename, bool enable) : enable_(enable) { + if (enable_) grpc_profiler_start(filename); + } + ~ScopedProfile() { + if (enable_) grpc_profiler_stop(); + } + + private: + const bool enable_; +}; + class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { public: WorkerServiceImpl(int server_port, QpsWorker* worker) @@ -114,9 +127,8 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { return Status(StatusCode::RESOURCE_EXHAUSTED, ""); } - grpc_profiler_start("qps_client.prof"); + ScopedProfile profile("qps_client.prof", false); Status ret = RunClientBody(ctx, stream); - grpc_profiler_stop(); return ret; } @@ -128,9 +140,8 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { return Status(StatusCode::RESOURCE_EXHAUSTED, ""); } - grpc_profiler_start("qps_server.prof"); + ScopedProfile profile("qps_server.prof", false); Status ret = RunServerBody(ctx, stream); - grpc_profiler_stop(); return ret; } diff --git a/test/cpp/qps/server.h b/test/cpp/qps/server.h index 94a6f8acfa..de46452c3d 100644 --- a/test/cpp/qps/server.h +++ b/test/cpp/qps/server.h @@ -43,14 +43,14 @@ #include "test/core/end2end/data/ssl_test_data.h" #include "test/core/util/port.h" #include "test/cpp/qps/limit_cores.h" -#include "test/cpp/qps/timer.h" +#include "test/cpp/qps/usage_timer.h" namespace grpc { namespace testing { class Server { public: - explicit Server(const ServerConfig& config) : timer_(new Timer) { + explicit Server(const ServerConfig& config) : timer_(new UsageTimer) { cores_ = LimitCores(config.core_list().data(), config.core_list_size()); if (config.port()) { port_ = config.port(); @@ -62,9 +62,9 @@ class Server { virtual ~Server() {} ServerStats Mark(bool reset) { - Timer::Result timer_result; + UsageTimer::Result timer_result; if (reset) { - std::unique_ptr<Timer> timer(new Timer); + std::unique_ptr<UsageTimer> timer(new UsageTimer); timer.swap(timer_); timer_result = timer->Mark(); } else { @@ -108,7 +108,7 @@ class Server { private: int port_; int cores_; - std::unique_ptr<Timer> timer_; + std::unique_ptr<UsageTimer> timer_; }; std::unique_ptr<Server> CreateSynchronousServer(const ServerConfig& config); diff --git a/test/cpp/qps/server_sync.cc b/test/cpp/qps/server_sync.cc index 4b778820d0..b7682f5763 100644 --- a/test/cpp/qps/server_sync.cc +++ b/test/cpp/qps/server_sync.cc @@ -34,18 +34,18 @@ #include <thread> #include <gflags/gflags.h> +#include <grpc++/security/server_credentials.h> +#include <grpc++/server.h> +#include <grpc++/server_builder.h> +#include <grpc++/server_context.h> #include <grpc/grpc.h> #include <grpc/support/alloc.h> #include <grpc/support/host_port.h> #include <grpc/support/log.h> -#include <grpc++/server.h> -#include <grpc++/server_builder.h> -#include <grpc++/server_context.h> -#include <grpc++/security/server_credentials.h> -#include "test/cpp/qps/server.h" -#include "test/cpp/qps/timer.h" #include "src/proto/grpc/testing/services.grpc.pb.h" +#include "test/cpp/qps/server.h" +#include "test/cpp/qps/usage_timer.h" namespace grpc { namespace testing { diff --git a/test/cpp/qps/timer.cc b/test/cpp/qps/usage_timer.cc index 3ec7f49f83..6663a9ac10 100644 --- a/test/cpp/qps/timer.cc +++ b/test/cpp/qps/usage_timer.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -31,15 +31,15 @@ * */ -#include "test/cpp/qps/timer.h" +#include "test/cpp/qps/usage_timer.h" -#include <sys/time.h> -#include <sys/resource.h> #include <grpc/support/time.h> +#include <sys/resource.h> +#include <sys/time.h> -Timer::Timer() : start_(Sample()) {} +UsageTimer::UsageTimer() : start_(Sample()) {} -double Timer::Now() { +double UsageTimer::Now() { auto ts = gpr_now(GPR_CLOCK_REALTIME); return ts.tv_sec + 1e-9 * ts.tv_nsec; } @@ -48,7 +48,7 @@ static double time_double(struct timeval* tv) { return tv->tv_sec + 1e-6 * tv->tv_usec; } -Timer::Result Timer::Sample() { +UsageTimer::Result UsageTimer::Sample() { struct rusage usage; struct timeval tv; gettimeofday(&tv, NULL); @@ -61,7 +61,7 @@ Timer::Result Timer::Sample() { return r; } -Timer::Result Timer::Mark() const { +UsageTimer::Result UsageTimer::Mark() const { Result s = Sample(); Result r; r.wall = s.wall - start_.wall; diff --git a/test/cpp/qps/timer.h b/test/cpp/qps/usage_timer.h index d1aee1a9d1..d19f820564 100644 --- a/test/cpp/qps/timer.h +++ b/test/cpp/qps/usage_timer.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -31,12 +31,12 @@ * */ -#ifndef TEST_QPS_TIMER_H -#define TEST_QPS_TIMER_H +#ifndef TEST_QPS_USAGE_TIMER_H +#define TEST_QPS_USAGE_TIMER_H -class Timer { +class UsageTimer { public: - Timer(); + UsageTimer(); struct Result { double wall; diff --git a/test/cpp/util/metrics_server.cc b/test/cpp/util/metrics_server.cc index 07978d0bdb..34d51eb316 100644 --- a/test/cpp/util/metrics_server.cc +++ b/test/cpp/util/metrics_server.cc @@ -57,7 +57,7 @@ long Gauge::Get() { grpc::Status MetricsServiceImpl::GetAllGauges( ServerContext* context, const EmptyMessage* request, ServerWriter<GaugeResponse>* writer) { - gpr_log(GPR_INFO, "GetAllGauges called"); + gpr_log(GPR_DEBUG, "GetAllGauges called"); std::lock_guard<std::mutex> lock(mu_); for (auto it = gauges_.begin(); it != gauges_.end(); it++) { diff --git a/test/cpp/util/test_credentials_provider.cc b/test/cpp/util/test_credentials_provider.cc index 1086e14258..e314fd6d75 100644 --- a/test/cpp/util/test_credentials_provider.cc +++ b/test/cpp/util/test_credentials_provider.cc @@ -34,48 +34,140 @@ #include "test/cpp/util/test_credentials_provider.h" +#include <unordered_map> + +#include <grpc/support/sync.h> +#include <grpc++/impl/sync.h> + #include "test/core/end2end/data/ssl_test_data.h" +namespace { + +using grpc::ChannelArguments; +using grpc::ChannelCredentials; +using grpc::InsecureChannelCredentials; +using grpc::InsecureServerCredentials; +using grpc::ServerCredentials; +using grpc::SslCredentialsOptions; +using grpc::SslServerCredentialsOptions; +using grpc::testing::CredentialTypeProvider; + +// Provide test credentials. Thread-safe. +class CredentialsProvider { + public: + virtual ~CredentialsProvider() {} + + virtual void AddSecureType( + const grpc::string& type, + std::unique_ptr<CredentialTypeProvider> type_provider) = 0; + virtual std::shared_ptr<ChannelCredentials> GetChannelCredentials( + const grpc::string& type, ChannelArguments* args) = 0; + virtual std::shared_ptr<ServerCredentials> GetServerCredentials( + const grpc::string& type) = 0; + virtual std::vector<grpc::string> GetSecureCredentialsTypeList() = 0; +}; + +class DefaultCredentialsProvider : public CredentialsProvider { + public: + ~DefaultCredentialsProvider() override {} + + void AddSecureType( + const grpc::string& type, + std::unique_ptr<CredentialTypeProvider> type_provider) override { + // This clobbers any existing entry for type, except the defaults, which + // can't be clobbered. + grpc::unique_lock<grpc::mutex> lock(mu_); + added_secure_types_[type] = std::move(type_provider); + } + + std::shared_ptr<ChannelCredentials> GetChannelCredentials( + const grpc::string& type, ChannelArguments* args) override { + if (type == grpc::testing::kInsecureCredentialsType) { + return InsecureChannelCredentials(); + } else if (type == grpc::testing::kTlsCredentialsType) { + SslCredentialsOptions ssl_opts = {test_root_cert, "", ""}; + args->SetSslTargetNameOverride("foo.test.google.fr"); + return SslCredentials(ssl_opts); + } else { + grpc::unique_lock<grpc::mutex> lock(mu_); + auto it(added_secure_types_.find(type)); + if (it == added_secure_types_.end()) { + gpr_log(GPR_ERROR, "Unsupported credentials type %s.", type.c_str()); + return nullptr; + } + return it->second->GetChannelCredentials(args); + } + } + + std::shared_ptr<ServerCredentials> GetServerCredentials( + const grpc::string& type) override { + if (type == grpc::testing::kInsecureCredentialsType) { + return InsecureServerCredentials(); + } else if (type == grpc::testing::kTlsCredentialsType) { + SslServerCredentialsOptions::PemKeyCertPair pkcp = {test_server1_key, + test_server1_cert}; + SslServerCredentialsOptions ssl_opts; + ssl_opts.pem_root_certs = ""; + ssl_opts.pem_key_cert_pairs.push_back(pkcp); + return SslServerCredentials(ssl_opts); + } else { + grpc::unique_lock<grpc::mutex> lock(mu_); + auto it(added_secure_types_.find(type)); + if (it == added_secure_types_.end()) { + gpr_log(GPR_ERROR, "Unsupported credentials type %s.", type.c_str()); + return nullptr; + } + return it->second->GetServerCredentials(); + } + } + std::vector<grpc::string> GetSecureCredentialsTypeList() override { + std::vector<grpc::string> types; + types.push_back(grpc::testing::kTlsCredentialsType); + grpc::unique_lock<grpc::mutex> lock(mu_); + for (const auto& type_pair : added_secure_types_) { + types.push_back(type_pair.first); + } + return types; + } + + private: + grpc::mutex mu_; + std::unordered_map<grpc::string, std::unique_ptr<CredentialTypeProvider> > + added_secure_types_; +}; + +gpr_once g_once_init_provider = GPR_ONCE_INIT; +CredentialsProvider* g_provider = nullptr; + +void CreateDefaultProvider() { g_provider = new DefaultCredentialsProvider; } + +CredentialsProvider* GetProvider() { + gpr_once_init(&g_once_init_provider, &CreateDefaultProvider); + return g_provider; +} + +} // namespace + namespace grpc { namespace testing { -const char kTlsCredentialsType[] = "TLS_CREDENTIALS"; +void AddSecureType(const grpc::string& type, + std::unique_ptr<CredentialTypeProvider> type_provider) { + GetProvider()->AddSecureType(type, std::move(type_provider)); +} std::shared_ptr<ChannelCredentials> GetChannelCredentials( const grpc::string& type, ChannelArguments* args) { - if (type == kInsecureCredentialsType) { - return InsecureChannelCredentials(); - } else if (type == kTlsCredentialsType) { - SslCredentialsOptions ssl_opts = {test_root_cert, "", ""}; - args->SetSslTargetNameOverride("foo.test.google.fr"); - return SslCredentials(ssl_opts); - } else { - gpr_log(GPR_ERROR, "Unsupported credentials type %s.", type.c_str()); - } - return nullptr; + return GetProvider()->GetChannelCredentials(type, args); } std::shared_ptr<ServerCredentials> GetServerCredentials( const grpc::string& type) { - if (type == kInsecureCredentialsType) { - return InsecureServerCredentials(); - } else if (type == kTlsCredentialsType) { - SslServerCredentialsOptions::PemKeyCertPair pkcp = {test_server1_key, - test_server1_cert}; - SslServerCredentialsOptions ssl_opts; - ssl_opts.pem_root_certs = ""; - ssl_opts.pem_key_cert_pairs.push_back(pkcp); - return SslServerCredentials(ssl_opts); - } else { - gpr_log(GPR_ERROR, "Unsupported credentials type %s.", type.c_str()); - } - return nullptr; + return GetProvider()->GetServerCredentials(type); } std::vector<grpc::string> GetSecureCredentialsTypeList() { - std::vector<grpc::string> types; - types.push_back(kTlsCredentialsType); - return types; + return GetProvider()->GetSecureCredentialsTypeList(); } } // namespace testing diff --git a/test/cpp/util/test_credentials_provider.h b/test/cpp/util/test_credentials_provider.h index f7253051a9..50fadb53a2 100644 --- a/test/cpp/util/test_credentials_provider.h +++ b/test/cpp/util/test_credentials_provider.h @@ -44,6 +44,23 @@ namespace grpc { namespace testing { const char kInsecureCredentialsType[] = "INSECURE_CREDENTIALS"; +const char kTlsCredentialsType[] = "TLS_CREDENTIALS"; + +// Provide test credentials of a particular type. +class CredentialTypeProvider { + public: + virtual ~CredentialTypeProvider() {} + + virtual std::shared_ptr<ChannelCredentials> GetChannelCredentials( + ChannelArguments* args) = 0; + virtual std::shared_ptr<ServerCredentials> GetServerCredentials() = 0; +}; + +// Add a secure type in addition to the defaults above +// (kInsecureCredentialsType, kTlsCredentialsType) that can be returned from the +// functions below. +void AddSecureType(const grpc::string& type, + std::unique_ptr<CredentialTypeProvider> type_provider); // Provide channel credentials according to the given type. Alter the channel // arguments if needed. |