diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/core/transport/chttp2/context_list_test.cc | 7 | ||||
-rw-r--r-- | test/cpp/microbenchmarks/BUILD | 7 | ||||
-rw-r--r-- | test/cpp/microbenchmarks/bm_timer.cc | 118 | ||||
-rw-r--r-- | test/cpp/qps/client.h | 89 |
4 files changed, 188 insertions, 33 deletions
diff --git a/test/core/transport/chttp2/context_list_test.cc b/test/core/transport/chttp2/context_list_test.cc index edbe658a89..0379eaaee4 100644 --- a/test/core/transport/chttp2/context_list_test.cc +++ b/test/core/transport/chttp2/context_list_test.cc @@ -36,8 +36,12 @@ namespace { const uint32_t kByteOffset = 123; -void TestExecuteFlushesListVerifier(void* arg, grpc_core::Timestamps* ts) { +void* DummyArgsCopier(void* arg) { return arg; } + +void TestExecuteFlushesListVerifier(void* arg, grpc_core::Timestamps* ts, + grpc_error* error) { ASSERT_NE(arg, nullptr); + EXPECT_EQ(error, GRPC_ERROR_NONE); EXPECT_EQ(ts->byte_offset, kByteOffset); gpr_atm* done = reinterpret_cast<gpr_atm*>(arg); gpr_atm_rel_store(done, static_cast<gpr_atm>(1)); @@ -52,6 +56,7 @@ void discard_write(grpc_slice slice) {} TEST(ContextList, ExecuteFlushesList) { grpc_core::ContextList* list = nullptr; grpc_http2_set_write_timestamps_callback(TestExecuteFlushesListVerifier); + grpc_http2_set_fn_get_copied_context(DummyArgsCopier); const int kNumElems = 5; grpc_core::ExecCtx exec_ctx; grpc_stream_refcount ref; diff --git a/test/cpp/microbenchmarks/BUILD b/test/cpp/microbenchmarks/BUILD index b5890bece7..a29462f78f 100644 --- a/test/cpp/microbenchmarks/BUILD +++ b/test/cpp/microbenchmarks/BUILD @@ -189,3 +189,10 @@ grpc_cc_binary( "//src/proto/grpc/testing:echo_proto", ], ) + +grpc_cc_binary( + name = "bm_timer", + testonly = 1, + srcs = ["bm_timer.cc"], + deps = [":helpers"], +) diff --git a/test/cpp/microbenchmarks/bm_timer.cc b/test/cpp/microbenchmarks/bm_timer.cc new file mode 100644 index 0000000000..f5a411251b --- /dev/null +++ b/test/cpp/microbenchmarks/bm_timer.cc @@ -0,0 +1,118 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <benchmark/benchmark.h> +#include <string.h> +#include <atomic> +#include <vector> + +#include <grpc/grpc.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include "test/cpp/microbenchmarks/helpers.h" +#include "test/cpp/util/test_config.h" + +#include "src/core/lib/iomgr/timer.h" + +namespace grpc { +namespace testing { + +auto& force_library_initialization = Library::get(); + +struct TimerClosure { + grpc_timer timer; + grpc_closure closure; +}; + +static void BM_InitCancelTimer(benchmark::State& state) { + constexpr int kTimerCount = 1024; + TrackCounters track_counters; + grpc_core::ExecCtx exec_ctx; + std::vector<TimerClosure> timer_closures(kTimerCount); + int i = 0; + while (state.KeepRunning()) { + TimerClosure* timer_closure = &timer_closures[i++ % kTimerCount]; + GRPC_CLOSURE_INIT(&timer_closure->closure, + [](void* /*args*/, grpc_error* /*err*/) {}, nullptr, + grpc_schedule_on_exec_ctx); + grpc_timer_init(&timer_closure->timer, GRPC_MILLIS_INF_FUTURE, + &timer_closure->closure); + grpc_timer_cancel(&timer_closure->timer); + exec_ctx.Flush(); + } + track_counters.Finish(state); +} +BENCHMARK(BM_InitCancelTimer); + +static void BM_TimerBatch(benchmark::State& state) { + constexpr int kTimerCount = 1024; + const bool check = state.range(0); + const bool reverse = state.range(1); + + const grpc_millis start = + reverse ? GRPC_MILLIS_INF_FUTURE : GRPC_MILLIS_INF_FUTURE - kTimerCount; + const grpc_millis end = + reverse ? GRPC_MILLIS_INF_FUTURE - kTimerCount : GRPC_MILLIS_INF_FUTURE; + const grpc_millis increment = reverse ? -1 : 1; + + TrackCounters track_counters; + grpc_core::ExecCtx exec_ctx; + std::vector<TimerClosure> timer_closures(kTimerCount); + while (state.KeepRunning()) { + for (grpc_millis deadline = start; deadline != end; deadline += increment) { + TimerClosure* timer_closure = &timer_closures[deadline % kTimerCount]; + GRPC_CLOSURE_INIT(&timer_closure->closure, + [](void* /*args*/, grpc_error* /*err*/) {}, nullptr, + grpc_schedule_on_exec_ctx); + + grpc_timer_init(&timer_closure->timer, deadline, &timer_closure->closure); + } + if (check) { + grpc_millis next; + grpc_timer_check(&next); + } + for (grpc_millis deadline = start; deadline != end; deadline += increment) { + TimerClosure* timer_closure = &timer_closures[deadline % kTimerCount]; + grpc_timer_cancel(&timer_closure->timer); + } + exec_ctx.Flush(); + } + track_counters.Finish(state); +} +BENCHMARK(BM_TimerBatch) + ->Args({/*check=*/false, /*reverse=*/false}) + ->Args({/*check=*/false, /*reverse=*/true}) + ->Args({/*check=*/true, /*reverse=*/false}) + ->Args({/*check=*/true, /*reverse=*/true}) + ->ThreadRange(1, 128); + +} // namespace testing +} // namespace grpc + +// Some distros have RunSpecifiedBenchmarks under the benchmark namespace, +// and others do not. This allows us to support both modes. +namespace benchmark { +void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); } +} // namespace benchmark + +int main(int argc, char** argv) { + ::benchmark::Initialize(&argc, argv); + ::grpc::testing::InitTest(&argc, &argv, false); + benchmark::RunTheBenchmarksNamespaced(); + return 0; +} diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 668d941916..73f91eed2d 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -429,13 +429,7 @@ class ClientImpl : public Client { config.server_targets(i % config.server_targets_size()), config, create_stub_, i); } - std::vector<std::unique_ptr<std::thread>> connecting_threads; - for (auto& c : channels_) { - connecting_threads.emplace_back(c.WaitForReady()); - } - for (auto& t : connecting_threads) { - t->join(); - } + WaitForChannelsToConnect(); median_latency_collection_interval_seconds_ = config.median_latency_collection_interval_millis() / 1e3; ClientRequestCreator<RequestType> create_req(&request_, @@ -443,6 +437,61 @@ class ClientImpl : public Client { } virtual ~ClientImpl() {} + void WaitForChannelsToConnect() { + int connect_deadline_seconds = 10; + /* Allow optionally overriding connect_deadline in order + * to deal with benchmark environments in which the server + * can take a long time to become ready. */ + char* channel_connect_timeout_str = + gpr_getenv("QPS_WORKER_CHANNEL_CONNECT_TIMEOUT"); + if (channel_connect_timeout_str != nullptr && + strcmp(channel_connect_timeout_str, "") != 0) { + connect_deadline_seconds = atoi(channel_connect_timeout_str); + } + gpr_log(GPR_INFO, + "Waiting for up to %d seconds for all channels to connect", + connect_deadline_seconds); + gpr_free(channel_connect_timeout_str); + gpr_timespec connect_deadline = gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_seconds(connect_deadline_seconds, GPR_TIMESPAN)); + CompletionQueue cq; + size_t num_remaining = 0; + for (auto& c : channels_) { + if (!c.is_inproc()) { + Channel* channel = c.get_channel(); + grpc_connectivity_state last_observed = channel->GetState(true); + if (last_observed == GRPC_CHANNEL_READY) { + gpr_log(GPR_INFO, "Channel %p connected!", channel); + } else { + num_remaining++; + channel->NotifyOnStateChange(last_observed, connect_deadline, &cq, + channel); + } + } + } + while (num_remaining > 0) { + bool ok = false; + void* tag = nullptr; + cq.Next(&tag, &ok); + Channel* channel = static_cast<Channel*>(tag); + if (!ok) { + gpr_log(GPR_ERROR, "Channel %p failed to connect within the deadline", + channel); + abort(); + } else { + grpc_connectivity_state last_observed = channel->GetState(true); + if (last_observed == GRPC_CHANNEL_READY) { + gpr_log(GPR_INFO, "Channel %p connected!", channel); + num_remaining--; + } else { + channel->NotifyOnStateChange(last_observed, connect_deadline, &cq, + channel); + } + } + } + } + protected: const int cores_; RequestType request_; @@ -485,31 +534,7 @@ class ClientImpl : public Client { } Channel* get_channel() { return channel_.get(); } StubType* get_stub() { return stub_.get(); } - - std::unique_ptr<std::thread> WaitForReady() { - return std::unique_ptr<std::thread>(new std::thread([this]() { - if (!is_inproc_) { - int connect_deadline = 10; - /* Allow optionally overriding connect_deadline in order - * to deal with benchmark environments in which the server - * can take a long time to become ready. */ - char* channel_connect_timeout_str = - gpr_getenv("QPS_WORKER_CHANNEL_CONNECT_TIMEOUT"); - if (channel_connect_timeout_str != nullptr && - strcmp(channel_connect_timeout_str, "") != 0) { - connect_deadline = atoi(channel_connect_timeout_str); - } - gpr_log(GPR_INFO, - "Waiting for up to %d seconds for the channel %p to connect", - connect_deadline, channel_.get()); - gpr_free(channel_connect_timeout_str); - GPR_ASSERT(channel_->WaitForConnected(gpr_time_add( - gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_seconds(connect_deadline, GPR_TIMESPAN)))); - gpr_log(GPR_INFO, "Channel %p connected!", channel_.get()); - } - })); - } + bool is_inproc() { return is_inproc_; } private: void set_channel_args(const ClientConfig& config, ChannelArguments* args) { |