aboutsummaryrefslogtreecommitdiffhomepage
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/core/transport/chttp2/context_list_test.cc7
-rw-r--r--test/cpp/microbenchmarks/BUILD7
-rw-r--r--test/cpp/microbenchmarks/bm_timer.cc118
-rw-r--r--test/cpp/qps/client.h89
4 files changed, 188 insertions, 33 deletions
diff --git a/test/core/transport/chttp2/context_list_test.cc b/test/core/transport/chttp2/context_list_test.cc
index edbe658a89..0379eaaee4 100644
--- a/test/core/transport/chttp2/context_list_test.cc
+++ b/test/core/transport/chttp2/context_list_test.cc
@@ -36,8 +36,12 @@ namespace {
const uint32_t kByteOffset = 123;
-void TestExecuteFlushesListVerifier(void* arg, grpc_core::Timestamps* ts) {
+void* DummyArgsCopier(void* arg) { return arg; }
+
+void TestExecuteFlushesListVerifier(void* arg, grpc_core::Timestamps* ts,
+ grpc_error* error) {
ASSERT_NE(arg, nullptr);
+ EXPECT_EQ(error, GRPC_ERROR_NONE);
EXPECT_EQ(ts->byte_offset, kByteOffset);
gpr_atm* done = reinterpret_cast<gpr_atm*>(arg);
gpr_atm_rel_store(done, static_cast<gpr_atm>(1));
@@ -52,6 +56,7 @@ void discard_write(grpc_slice slice) {}
TEST(ContextList, ExecuteFlushesList) {
grpc_core::ContextList* list = nullptr;
grpc_http2_set_write_timestamps_callback(TestExecuteFlushesListVerifier);
+ grpc_http2_set_fn_get_copied_context(DummyArgsCopier);
const int kNumElems = 5;
grpc_core::ExecCtx exec_ctx;
grpc_stream_refcount ref;
diff --git a/test/cpp/microbenchmarks/BUILD b/test/cpp/microbenchmarks/BUILD
index b5890bece7..a29462f78f 100644
--- a/test/cpp/microbenchmarks/BUILD
+++ b/test/cpp/microbenchmarks/BUILD
@@ -189,3 +189,10 @@ grpc_cc_binary(
"//src/proto/grpc/testing:echo_proto",
],
)
+
+grpc_cc_binary(
+ name = "bm_timer",
+ testonly = 1,
+ srcs = ["bm_timer.cc"],
+ deps = [":helpers"],
+)
diff --git a/test/cpp/microbenchmarks/bm_timer.cc b/test/cpp/microbenchmarks/bm_timer.cc
new file mode 100644
index 0000000000..f5a411251b
--- /dev/null
+++ b/test/cpp/microbenchmarks/bm_timer.cc
@@ -0,0 +1,118 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <benchmark/benchmark.h>
+#include <string.h>
+#include <atomic>
+#include <vector>
+
+#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include "test/cpp/microbenchmarks/helpers.h"
+#include "test/cpp/util/test_config.h"
+
+#include "src/core/lib/iomgr/timer.h"
+
+namespace grpc {
+namespace testing {
+
+auto& force_library_initialization = Library::get();
+
+struct TimerClosure {
+ grpc_timer timer;
+ grpc_closure closure;
+};
+
+static void BM_InitCancelTimer(benchmark::State& state) {
+ constexpr int kTimerCount = 1024;
+ TrackCounters track_counters;
+ grpc_core::ExecCtx exec_ctx;
+ std::vector<TimerClosure> timer_closures(kTimerCount);
+ int i = 0;
+ while (state.KeepRunning()) {
+ TimerClosure* timer_closure = &timer_closures[i++ % kTimerCount];
+ GRPC_CLOSURE_INIT(&timer_closure->closure,
+ [](void* /*args*/, grpc_error* /*err*/) {}, nullptr,
+ grpc_schedule_on_exec_ctx);
+ grpc_timer_init(&timer_closure->timer, GRPC_MILLIS_INF_FUTURE,
+ &timer_closure->closure);
+ grpc_timer_cancel(&timer_closure->timer);
+ exec_ctx.Flush();
+ }
+ track_counters.Finish(state);
+}
+BENCHMARK(BM_InitCancelTimer);
+
+static void BM_TimerBatch(benchmark::State& state) {
+ constexpr int kTimerCount = 1024;
+ const bool check = state.range(0);
+ const bool reverse = state.range(1);
+
+ const grpc_millis start =
+ reverse ? GRPC_MILLIS_INF_FUTURE : GRPC_MILLIS_INF_FUTURE - kTimerCount;
+ const grpc_millis end =
+ reverse ? GRPC_MILLIS_INF_FUTURE - kTimerCount : GRPC_MILLIS_INF_FUTURE;
+ const grpc_millis increment = reverse ? -1 : 1;
+
+ TrackCounters track_counters;
+ grpc_core::ExecCtx exec_ctx;
+ std::vector<TimerClosure> timer_closures(kTimerCount);
+ while (state.KeepRunning()) {
+ for (grpc_millis deadline = start; deadline != end; deadline += increment) {
+ TimerClosure* timer_closure = &timer_closures[deadline % kTimerCount];
+ GRPC_CLOSURE_INIT(&timer_closure->closure,
+ [](void* /*args*/, grpc_error* /*err*/) {}, nullptr,
+ grpc_schedule_on_exec_ctx);
+
+ grpc_timer_init(&timer_closure->timer, deadline, &timer_closure->closure);
+ }
+ if (check) {
+ grpc_millis next;
+ grpc_timer_check(&next);
+ }
+ for (grpc_millis deadline = start; deadline != end; deadline += increment) {
+ TimerClosure* timer_closure = &timer_closures[deadline % kTimerCount];
+ grpc_timer_cancel(&timer_closure->timer);
+ }
+ exec_ctx.Flush();
+ }
+ track_counters.Finish(state);
+}
+BENCHMARK(BM_TimerBatch)
+ ->Args({/*check=*/false, /*reverse=*/false})
+ ->Args({/*check=*/false, /*reverse=*/true})
+ ->Args({/*check=*/true, /*reverse=*/false})
+ ->Args({/*check=*/true, /*reverse=*/true})
+ ->ThreadRange(1, 128);
+
+} // namespace testing
+} // namespace grpc
+
+// Some distros have RunSpecifiedBenchmarks under the benchmark namespace,
+// and others do not. This allows us to support both modes.
+namespace benchmark {
+void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); }
+} // namespace benchmark
+
+int main(int argc, char** argv) {
+ ::benchmark::Initialize(&argc, argv);
+ ::grpc::testing::InitTest(&argc, &argv, false);
+ benchmark::RunTheBenchmarksNamespaced();
+ return 0;
+}
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index 668d941916..73f91eed2d 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -429,13 +429,7 @@ class ClientImpl : public Client {
config.server_targets(i % config.server_targets_size()), config,
create_stub_, i);
}
- std::vector<std::unique_ptr<std::thread>> connecting_threads;
- for (auto& c : channels_) {
- connecting_threads.emplace_back(c.WaitForReady());
- }
- for (auto& t : connecting_threads) {
- t->join();
- }
+ WaitForChannelsToConnect();
median_latency_collection_interval_seconds_ =
config.median_latency_collection_interval_millis() / 1e3;
ClientRequestCreator<RequestType> create_req(&request_,
@@ -443,6 +437,61 @@ class ClientImpl : public Client {
}
virtual ~ClientImpl() {}
+ void WaitForChannelsToConnect() {
+ int connect_deadline_seconds = 10;
+ /* Allow optionally overriding connect_deadline in order
+ * to deal with benchmark environments in which the server
+ * can take a long time to become ready. */
+ char* channel_connect_timeout_str =
+ gpr_getenv("QPS_WORKER_CHANNEL_CONNECT_TIMEOUT");
+ if (channel_connect_timeout_str != nullptr &&
+ strcmp(channel_connect_timeout_str, "") != 0) {
+ connect_deadline_seconds = atoi(channel_connect_timeout_str);
+ }
+ gpr_log(GPR_INFO,
+ "Waiting for up to %d seconds for all channels to connect",
+ connect_deadline_seconds);
+ gpr_free(channel_connect_timeout_str);
+ gpr_timespec connect_deadline = gpr_time_add(
+ gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_seconds(connect_deadline_seconds, GPR_TIMESPAN));
+ CompletionQueue cq;
+ size_t num_remaining = 0;
+ for (auto& c : channels_) {
+ if (!c.is_inproc()) {
+ Channel* channel = c.get_channel();
+ grpc_connectivity_state last_observed = channel->GetState(true);
+ if (last_observed == GRPC_CHANNEL_READY) {
+ gpr_log(GPR_INFO, "Channel %p connected!", channel);
+ } else {
+ num_remaining++;
+ channel->NotifyOnStateChange(last_observed, connect_deadline, &cq,
+ channel);
+ }
+ }
+ }
+ while (num_remaining > 0) {
+ bool ok = false;
+ void* tag = nullptr;
+ cq.Next(&tag, &ok);
+ Channel* channel = static_cast<Channel*>(tag);
+ if (!ok) {
+ gpr_log(GPR_ERROR, "Channel %p failed to connect within the deadline",
+ channel);
+ abort();
+ } else {
+ grpc_connectivity_state last_observed = channel->GetState(true);
+ if (last_observed == GRPC_CHANNEL_READY) {
+ gpr_log(GPR_INFO, "Channel %p connected!", channel);
+ num_remaining--;
+ } else {
+ channel->NotifyOnStateChange(last_observed, connect_deadline, &cq,
+ channel);
+ }
+ }
+ }
+ }
+
protected:
const int cores_;
RequestType request_;
@@ -485,31 +534,7 @@ class ClientImpl : public Client {
}
Channel* get_channel() { return channel_.get(); }
StubType* get_stub() { return stub_.get(); }
-
- std::unique_ptr<std::thread> WaitForReady() {
- return std::unique_ptr<std::thread>(new std::thread([this]() {
- if (!is_inproc_) {
- int connect_deadline = 10;
- /* Allow optionally overriding connect_deadline in order
- * to deal with benchmark environments in which the server
- * can take a long time to become ready. */
- char* channel_connect_timeout_str =
- gpr_getenv("QPS_WORKER_CHANNEL_CONNECT_TIMEOUT");
- if (channel_connect_timeout_str != nullptr &&
- strcmp(channel_connect_timeout_str, "") != 0) {
- connect_deadline = atoi(channel_connect_timeout_str);
- }
- gpr_log(GPR_INFO,
- "Waiting for up to %d seconds for the channel %p to connect",
- connect_deadline, channel_.get());
- gpr_free(channel_connect_timeout_str);
- GPR_ASSERT(channel_->WaitForConnected(gpr_time_add(
- gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_seconds(connect_deadline, GPR_TIMESPAN))));
- gpr_log(GPR_INFO, "Channel %p connected!", channel_.get());
- }
- }));
- }
+ bool is_inproc() { return is_inproc_; }
private:
void set_channel_args(const ClientConfig& config, ChannelArguments* args) {