aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/lib/channel/channelz.cc73
-rw-r--r--src/core/lib/channel/channelz.h24
-rw-r--r--src/core/lib/iomgr/exec_ctx.h7
-rw-r--r--test/core/channel/channelz_test.cc3
4 files changed, 74 insertions, 33 deletions
diff --git a/src/core/lib/channel/channelz.cc b/src/core/lib/channel/channelz.cc
index 339c827525..e9c29489a2 100644
--- a/src/core/lib/channel/channelz.cc
+++ b/src/core/lib/channel/channelz.cc
@@ -34,6 +34,7 @@
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/transport/error_utils.h"
@@ -55,35 +56,77 @@ char* BaseNode::RenderJsonString() {
}
CallCountingHelper::CallCountingHelper() {
- gpr_atm_no_barrier_store(&last_call_started_millis_,
- (gpr_atm)ExecCtx::Get()->Now());
+ num_cores_ = GPR_MAX(1, gpr_cpu_num_cores());
+ per_cpu_counter_data_storage_ =
+ static_cast<CounterData*>(gpr_zalloc(sizeof(CounterData) * num_cores_));
}
-CallCountingHelper::~CallCountingHelper() {}
+CallCountingHelper::~CallCountingHelper() {
+ gpr_free(per_cpu_counter_data_storage_);
+}
void CallCountingHelper::RecordCallStarted() {
- gpr_atm_no_barrier_fetch_add(&calls_started_, static_cast<gpr_atm>(1));
- gpr_atm_no_barrier_store(&last_call_started_millis_,
- (gpr_atm)ExecCtx::Get()->Now());
+ gpr_atm_no_barrier_fetch_add(
+ &per_cpu_counter_data_storage_[grpc_core::ExecCtx::Get()->starting_cpu()]
+ .calls_started_,
+ static_cast<gpr_atm>(1));
+ gpr_atm_no_barrier_store(
+ &per_cpu_counter_data_storage_[grpc_core::ExecCtx::Get()->starting_cpu()]
+ .last_call_started_millis_,
+ (gpr_atm)ExecCtx::Get()->Now());
+}
+
+void CallCountingHelper::RecordCallFailed() {
+ gpr_atm_no_barrier_fetch_add(
+ &per_cpu_counter_data_storage_[grpc_core::ExecCtx::Get()->starting_cpu()]
+ .calls_failed_,
+ static_cast<gpr_atm>(1));
+}
+
+void CallCountingHelper::RecordCallSucceeded() {
+ gpr_atm_no_barrier_fetch_add(
+ &per_cpu_counter_data_storage_[grpc_core::ExecCtx::Get()->starting_cpu()]
+ .calls_succeeded_,
+ static_cast<gpr_atm>(1));
+}
+
+CallCountingHelper::CounterData CallCountingHelper::Collect() {
+ CounterData out;
+ memset(&out, 0, sizeof(out));
+ for (size_t core = 0; core < num_cores_; ++core) {
+ out.calls_started_ += gpr_atm_no_barrier_load(
+ &per_cpu_counter_data_storage_[core].calls_started_);
+ out.calls_succeeded_ += gpr_atm_no_barrier_load(
+ &per_cpu_counter_data_storage_[core].calls_succeeded_);
+ out.calls_failed_ += gpr_atm_no_barrier_load(
+ &per_cpu_counter_data_storage_[core].calls_failed_);
+ gpr_atm last_call = gpr_atm_no_barrier_load(
+ &per_cpu_counter_data_storage_[core].last_call_started_millis_);
+ if (last_call > out.last_call_started_millis_) {
+ out.last_call_started_millis_ = last_call;
+ }
+ }
+ return out;
}
void CallCountingHelper::PopulateCallCounts(grpc_json* json) {
grpc_json* json_iterator = nullptr;
- if (calls_started_ != 0) {
+ CounterData data = Collect();
+ if (data.calls_started_ != 0) {
json_iterator = grpc_json_add_number_string_child(
- json, json_iterator, "callsStarted", calls_started_);
+ json, json_iterator, "callsStarted", data.calls_started_);
}
- if (calls_succeeded_ != 0) {
+ if (data.calls_succeeded_ != 0) {
json_iterator = grpc_json_add_number_string_child(
- json, json_iterator, "callsSucceeded", calls_succeeded_);
+ json, json_iterator, "callsSucceeded", data.calls_succeeded_);
}
- if (calls_failed_) {
+ if (data.calls_failed_) {
json_iterator = grpc_json_add_number_string_child(
- json, json_iterator, "callsFailed", calls_failed_);
+ json, json_iterator, "callsFailed", data.calls_failed_);
}
- if (calls_started_ != 0) {
- gpr_timespec ts =
- grpc_millis_to_timespec(last_call_started_millis_, GPR_CLOCK_REALTIME);
+ if (data.calls_started_ != 0) {
+ gpr_timespec ts = grpc_millis_to_timespec(data.last_call_started_millis_,
+ GPR_CLOCK_REALTIME);
json_iterator =
grpc_json_create_child(json_iterator, json, "lastCallStartedTimestamp",
gpr_format_timespec(ts), GRPC_JSON_STRING, true);
diff --git a/src/core/lib/channel/channelz.h b/src/core/lib/channel/channelz.h
index b7ae101238..ee2ca40a20 100644
--- a/src/core/lib/channel/channelz.h
+++ b/src/core/lib/channel/channelz.h
@@ -91,12 +91,8 @@ class CallCountingHelper {
~CallCountingHelper();
void RecordCallStarted();
- void RecordCallFailed() {
- gpr_atm_no_barrier_fetch_add(&calls_failed_, static_cast<gpr_atm>(1));
- }
- void RecordCallSucceeded() {
- gpr_atm_no_barrier_fetch_add(&calls_succeeded_, static_cast<gpr_atm>(1));
- }
+ void RecordCallFailed();
+ void RecordCallSucceeded();
// Common rendering of the call count data and last_call_started_timestamp.
void PopulateCallCounts(grpc_json* json);
@@ -105,10 +101,18 @@ class CallCountingHelper {
// testing peer friend.
friend class testing::CallCountingHelperPeer;
- gpr_atm calls_started_ = 0;
- gpr_atm calls_succeeded_ = 0;
- gpr_atm calls_failed_ = 0;
- gpr_atm last_call_started_millis_ = 0;
+ struct CounterData {
+ gpr_atm calls_started_ = 0;
+ gpr_atm calls_succeeded_ = 0;
+ gpr_atm calls_failed_ = 0;
+ gpr_atm last_call_started_millis_ = 0;
+ };
+
+ // collects the sharded data into one CounterData struct.
+ CounterData Collect();
+
+ CounterData* per_cpu_counter_data_storage_ = nullptr;
+ size_t num_cores_ = 0;
};
// Handles channelz bookkeeping for channels
diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h
index f3528d527a..e90eb54cd3 100644
--- a/src/core/lib/iomgr/exec_ctx.h
+++ b/src/core/lib/iomgr/exec_ctx.h
@@ -116,12 +116,7 @@ class ExecCtx {
ExecCtx(const ExecCtx&) = delete;
ExecCtx& operator=(const ExecCtx&) = delete;
- /** Return starting_cpu. This is only required for stats collection and is
- * hence only defined if GRPC_COLLECT_STATS is enabled.
- */
-#if defined(GRPC_COLLECT_STATS) || !defined(NDEBUG)
unsigned starting_cpu() const { return starting_cpu_; }
-#endif /* defined(GRPC_COLLECT_STATS) || !defined(NDEBUG) */
struct CombinerData {
/* currently active combiner: updated only via combiner.c */
@@ -223,9 +218,7 @@ class ExecCtx {
CombinerData combiner_data_ = {nullptr, nullptr};
uintptr_t flags_;
-#if defined(GRPC_COLLECT_STATS) || !defined(NDEBUG)
unsigned starting_cpu_ = gpr_cpu_current_cpu();
-#endif /* defined(GRPC_COLLECT_STATS) || !defined(NDEBUG) */
bool now_is_valid_ = false;
grpc_millis now_ = 0;
diff --git a/test/core/channel/channelz_test.cc b/test/core/channel/channelz_test.cc
index bcda30d9f2..b7b35aede4 100644
--- a/test/core/channel/channelz_test.cc
+++ b/test/core/channel/channelz_test.cc
@@ -49,8 +49,9 @@ class CallCountingHelperPeer {
public:
explicit CallCountingHelperPeer(CallCountingHelper* node) : node_(node) {}
grpc_millis last_call_started_millis() const {
+ CallCountingHelper::CounterData data = node_->Collect();
return (grpc_millis)gpr_atm_no_barrier_load(
- &node_->last_call_started_millis_);
+ &data.last_call_started_millis_);
}
private: