diff options
-rw-r--r-- | src/core/lib/channel/channelz.cc | 73 | ||||
-rw-r--r-- | src/core/lib/channel/channelz.h | 24 | ||||
-rw-r--r-- | src/core/lib/iomgr/exec_ctx.h | 7 | ||||
-rw-r--r-- | test/core/channel/channelz_test.cc | 3 |
4 files changed, 74 insertions, 33 deletions
diff --git a/src/core/lib/channel/channelz.cc b/src/core/lib/channel/channelz.cc index 339c827525..e9c29489a2 100644 --- a/src/core/lib/channel/channelz.cc +++ b/src/core/lib/channel/channelz.cc @@ -34,6 +34,7 @@ #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/memory.h" #include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/transport/error_utils.h" @@ -55,35 +56,77 @@ char* BaseNode::RenderJsonString() { } CallCountingHelper::CallCountingHelper() { - gpr_atm_no_barrier_store(&last_call_started_millis_, - (gpr_atm)ExecCtx::Get()->Now()); + num_cores_ = GPR_MAX(1, gpr_cpu_num_cores()); + per_cpu_counter_data_storage_ = + static_cast<CounterData*>(gpr_zalloc(sizeof(CounterData) * num_cores_)); } -CallCountingHelper::~CallCountingHelper() {} +CallCountingHelper::~CallCountingHelper() { + gpr_free(per_cpu_counter_data_storage_); +} void CallCountingHelper::RecordCallStarted() { - gpr_atm_no_barrier_fetch_add(&calls_started_, static_cast<gpr_atm>(1)); - gpr_atm_no_barrier_store(&last_call_started_millis_, - (gpr_atm)ExecCtx::Get()->Now()); + gpr_atm_no_barrier_fetch_add( + &per_cpu_counter_data_storage_[grpc_core::ExecCtx::Get()->starting_cpu()] + .calls_started_, + static_cast<gpr_atm>(1)); + gpr_atm_no_barrier_store( + &per_cpu_counter_data_storage_[grpc_core::ExecCtx::Get()->starting_cpu()] + .last_call_started_millis_, + (gpr_atm)ExecCtx::Get()->Now()); +} + +void CallCountingHelper::RecordCallFailed() { + gpr_atm_no_barrier_fetch_add( + &per_cpu_counter_data_storage_[grpc_core::ExecCtx::Get()->starting_cpu()] + .calls_failed_, + static_cast<gpr_atm>(1)); +} + +void CallCountingHelper::RecordCallSucceeded() { + gpr_atm_no_barrier_fetch_add( + &per_cpu_counter_data_storage_[grpc_core::ExecCtx::Get()->starting_cpu()] + .calls_succeeded_, + static_cast<gpr_atm>(1)); +} + +CallCountingHelper::CounterData CallCountingHelper::Collect() { + CounterData out; + memset(&out, 0, sizeof(out)); + for (size_t core = 0; core < num_cores_; ++core) { + out.calls_started_ += gpr_atm_no_barrier_load( + &per_cpu_counter_data_storage_[core].calls_started_); + out.calls_succeeded_ += gpr_atm_no_barrier_load( + &per_cpu_counter_data_storage_[core].calls_succeeded_); + out.calls_failed_ += gpr_atm_no_barrier_load( + &per_cpu_counter_data_storage_[core].calls_failed_); + gpr_atm last_call = gpr_atm_no_barrier_load( + &per_cpu_counter_data_storage_[core].last_call_started_millis_); + if (last_call > out.last_call_started_millis_) { + out.last_call_started_millis_ = last_call; + } + } + return out; } void CallCountingHelper::PopulateCallCounts(grpc_json* json) { grpc_json* json_iterator = nullptr; - if (calls_started_ != 0) { + CounterData data = Collect(); + if (data.calls_started_ != 0) { json_iterator = grpc_json_add_number_string_child( - json, json_iterator, "callsStarted", calls_started_); + json, json_iterator, "callsStarted", data.calls_started_); } - if (calls_succeeded_ != 0) { + if (data.calls_succeeded_ != 0) { json_iterator = grpc_json_add_number_string_child( - json, json_iterator, "callsSucceeded", calls_succeeded_); + json, json_iterator, "callsSucceeded", data.calls_succeeded_); } - if (calls_failed_) { + if (data.calls_failed_) { json_iterator = grpc_json_add_number_string_child( - json, json_iterator, "callsFailed", calls_failed_); + json, json_iterator, "callsFailed", data.calls_failed_); } - if (calls_started_ != 0) { - gpr_timespec ts = - grpc_millis_to_timespec(last_call_started_millis_, GPR_CLOCK_REALTIME); + if (data.calls_started_ != 0) { + gpr_timespec ts = grpc_millis_to_timespec(data.last_call_started_millis_, + GPR_CLOCK_REALTIME); json_iterator = grpc_json_create_child(json_iterator, json, "lastCallStartedTimestamp", gpr_format_timespec(ts), GRPC_JSON_STRING, true); diff --git a/src/core/lib/channel/channelz.h b/src/core/lib/channel/channelz.h index b7ae101238..ee2ca40a20 100644 --- a/src/core/lib/channel/channelz.h +++ b/src/core/lib/channel/channelz.h @@ -91,12 +91,8 @@ class CallCountingHelper { ~CallCountingHelper(); void RecordCallStarted(); - void RecordCallFailed() { - gpr_atm_no_barrier_fetch_add(&calls_failed_, static_cast<gpr_atm>(1)); - } - void RecordCallSucceeded() { - gpr_atm_no_barrier_fetch_add(&calls_succeeded_, static_cast<gpr_atm>(1)); - } + void RecordCallFailed(); + void RecordCallSucceeded(); // Common rendering of the call count data and last_call_started_timestamp. void PopulateCallCounts(grpc_json* json); @@ -105,10 +101,18 @@ class CallCountingHelper { // testing peer friend. friend class testing::CallCountingHelperPeer; - gpr_atm calls_started_ = 0; - gpr_atm calls_succeeded_ = 0; - gpr_atm calls_failed_ = 0; - gpr_atm last_call_started_millis_ = 0; + struct CounterData { + gpr_atm calls_started_ = 0; + gpr_atm calls_succeeded_ = 0; + gpr_atm calls_failed_ = 0; + gpr_atm last_call_started_millis_ = 0; + }; + + // collects the sharded data into one CounterData struct. + CounterData Collect(); + + CounterData* per_cpu_counter_data_storage_ = nullptr; + size_t num_cores_ = 0; }; // Handles channelz bookkeeping for channels diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h index f3528d527a..e90eb54cd3 100644 --- a/src/core/lib/iomgr/exec_ctx.h +++ b/src/core/lib/iomgr/exec_ctx.h @@ -116,12 +116,7 @@ class ExecCtx { ExecCtx(const ExecCtx&) = delete; ExecCtx& operator=(const ExecCtx&) = delete; - /** Return starting_cpu. This is only required for stats collection and is - * hence only defined if GRPC_COLLECT_STATS is enabled. - */ -#if defined(GRPC_COLLECT_STATS) || !defined(NDEBUG) unsigned starting_cpu() const { return starting_cpu_; } -#endif /* defined(GRPC_COLLECT_STATS) || !defined(NDEBUG) */ struct CombinerData { /* currently active combiner: updated only via combiner.c */ @@ -223,9 +218,7 @@ class ExecCtx { CombinerData combiner_data_ = {nullptr, nullptr}; uintptr_t flags_; -#if defined(GRPC_COLLECT_STATS) || !defined(NDEBUG) unsigned starting_cpu_ = gpr_cpu_current_cpu(); -#endif /* defined(GRPC_COLLECT_STATS) || !defined(NDEBUG) */ bool now_is_valid_ = false; grpc_millis now_ = 0; diff --git a/test/core/channel/channelz_test.cc b/test/core/channel/channelz_test.cc index bcda30d9f2..b7b35aede4 100644 --- a/test/core/channel/channelz_test.cc +++ b/test/core/channel/channelz_test.cc @@ -49,8 +49,9 @@ class CallCountingHelperPeer { public: explicit CallCountingHelperPeer(CallCountingHelper* node) : node_(node) {} grpc_millis last_call_started_millis() const { + CallCountingHelper::CounterData data = node_->Collect(); return (grpc_millis)gpr_atm_no_barrier_load( - &node_->last_call_started_millis_); + &data.last_call_started_millis_); } private: |