aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel/lb_policy
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy')
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc20
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc51
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc138
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h69
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc41
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h2
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc2
7 files changed, 131 insertions, 192 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc
index 18ef1f6ff5..cc259bcdbf 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc
@@ -35,9 +35,10 @@ static grpc_error* init_channel_elem(grpc_channel_element* elem,
static void destroy_channel_elem(grpc_channel_element* elem) {}
namespace {
+
struct call_data {
// Stats object to update.
- grpc_grpclb_client_stats* client_stats;
+ grpc_core::RefCountedPtr<grpc_core::GrpcLbClientStats> client_stats;
// State for intercepting send_initial_metadata.
grpc_closure on_complete_for_send;
grpc_closure* original_on_complete_for_send;
@@ -47,6 +48,7 @@ struct call_data {
grpc_closure* original_recv_initial_metadata_ready;
bool recv_initial_metadata_succeeded;
};
+
} // namespace
static void on_complete_for_send(void* arg, grpc_error* error) {
@@ -72,11 +74,11 @@ static grpc_error* init_call_elem(grpc_call_element* elem,
// Get stats object from context and take a ref.
GPR_ASSERT(args->context != nullptr);
if (args->context[GRPC_GRPCLB_CLIENT_STATS].value != nullptr) {
- calld->client_stats =
- grpc_grpclb_client_stats_ref(static_cast<grpc_grpclb_client_stats*>(
- args->context[GRPC_GRPCLB_CLIENT_STATS].value));
+ calld->client_stats = static_cast<grpc_core::GrpcLbClientStats*>(
+ args->context[GRPC_GRPCLB_CLIENT_STATS].value)
+ ->Ref();
// Record call started.
- grpc_grpclb_client_stats_add_call_started(calld->client_stats);
+ calld->client_stats->AddCallStarted();
}
return GRPC_ERROR_NONE;
}
@@ -88,12 +90,12 @@ static void destroy_call_elem(grpc_call_element* elem,
if (calld->client_stats != nullptr) {
// Record call finished, optionally setting client_failed_to_send and
// received.
- grpc_grpclb_client_stats_add_call_finished(
+ calld->client_stats->AddCallFinished(
!calld->send_initial_metadata_succeeded /* client_failed_to_send */,
- calld->recv_initial_metadata_succeeded /* known_received */,
- calld->client_stats);
+ calld->recv_initial_metadata_succeeded /* known_received */);
// All done, so unref the stats object.
- grpc_grpclb_client_stats_unref(calld->client_stats);
+ // TODO(roth): Eliminate this once filter stack is converted to C++.
+ calld->client_stats.reset();
}
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index a31474391e..263b51ae89 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -159,9 +159,8 @@ class GrpcLb : public LoadBalancingPolicy {
// The LB token associated with the pick. This is set via user_data in
// the pick.
grpc_mdelem lb_token;
- // Stats for client-side load reporting. Note that this holds a
- // reference, which must be either passed on via context or unreffed.
- grpc_grpclb_client_stats* client_stats = nullptr;
+ // Stats for client-side load reporting.
+ RefCountedPtr<GrpcLbClientStats> client_stats;
// Next pending pick.
PendingPick* next = nullptr;
};
@@ -186,7 +185,8 @@ class GrpcLb : public LoadBalancingPolicy {
void StartQuery();
- grpc_grpclb_client_stats* client_stats() const { return client_stats_; }
+ GrpcLbClientStats* client_stats() const { return client_stats_.get(); }
+
bool seen_initial_response() const { return seen_initial_response_; }
private:
@@ -237,7 +237,7 @@ class GrpcLb : public LoadBalancingPolicy {
// The stats for client-side load reporting associated with this LB call.
// Created after the first serverlist is received.
- grpc_grpclb_client_stats* client_stats_ = nullptr;
+ RefCountedPtr<GrpcLbClientStats> client_stats_;
grpc_millis client_stats_report_interval_ = 0;
grpc_timer client_load_report_timer_;
bool client_load_report_timer_callback_pending_ = false;
@@ -399,7 +399,7 @@ grpc_lb_addresses* ExtractBackendAddresses(const grpc_lb_addresses* addresses) {
bool IsServerValid(const grpc_grpclb_server* server, size_t idx, bool log) {
if (server->drop) return false;
const grpc_grpclb_ip_address* ip = &server->ip_address;
- if (server->port >> 16 != 0) {
+ if (GPR_UNLIKELY(server->port >> 16 != 0)) {
if (log) {
gpr_log(GPR_ERROR,
"Invalid port '%d' at index %lu of serverlist. Ignoring.",
@@ -407,7 +407,7 @@ bool IsServerValid(const grpc_grpclb_server* server, size_t idx, bool log) {
}
return false;
}
- if (ip->size != 4 && ip->size != 16) {
+ if (GPR_UNLIKELY(ip->size != 4 && ip->size != 16)) {
if (log) {
gpr_log(GPR_ERROR,
"Expected IP to be 4 or 16 bytes, got %d at index %lu of "
@@ -548,9 +548,6 @@ GrpcLb::BalancerCallState::~BalancerCallState() {
grpc_byte_buffer_destroy(send_message_payload_);
grpc_byte_buffer_destroy(recv_message_payload_);
grpc_slice_unref_internal(lb_call_status_details_);
- if (client_stats_ != nullptr) {
- grpc_grpclb_client_stats_unref(client_stats_);
- }
}
void GrpcLb::BalancerCallState::Orphan() {
@@ -673,22 +670,22 @@ void GrpcLb::BalancerCallState::MaybeSendClientLoadReportLocked(
bool GrpcLb::BalancerCallState::LoadReportCountersAreZero(
grpc_grpclb_request* request) {
- grpc_grpclb_dropped_call_counts* drop_entries =
- static_cast<grpc_grpclb_dropped_call_counts*>(
+ GrpcLbClientStats::DroppedCallCounts* drop_entries =
+ static_cast<GrpcLbClientStats::DroppedCallCounts*>(
request->client_stats.calls_finished_with_drop.arg);
return request->client_stats.num_calls_started == 0 &&
request->client_stats.num_calls_finished == 0 &&
request->client_stats.num_calls_finished_with_client_failed_to_send ==
0 &&
request->client_stats.num_calls_finished_known_received == 0 &&
- (drop_entries == nullptr || drop_entries->num_entries == 0);
+ (drop_entries == nullptr || drop_entries->size() == 0);
}
void GrpcLb::BalancerCallState::SendClientLoadReportLocked() {
// Construct message payload.
GPR_ASSERT(send_message_payload_ == nullptr);
grpc_grpclb_request* request =
- grpc_grpclb_load_report_request_create_locked(client_stats_);
+ grpc_grpclb_load_report_request_create_locked(client_stats_.get());
// Skip client load report if the counters were all zero in the last
// report and they are still zero in this one.
if (LoadReportCountersAreZero(request)) {
@@ -715,7 +712,7 @@ void GrpcLb::BalancerCallState::SendClientLoadReportLocked() {
this, grpc_combiner_scheduler(grpclb_policy()->combiner()));
grpc_call_error call_error = grpc_call_start_batch_and_execute(
lb_call_, &op, 1, &client_load_report_closure_);
- if (call_error != GRPC_CALL_OK) {
+ if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
gpr_log(GPR_ERROR, "[grpclb %p] call_error=%d", grpclb_policy_.get(),
call_error);
GPR_ASSERT(GRPC_CALL_OK == call_error);
@@ -814,7 +811,7 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked(
// serverlist returned from the current LB call.
if (lb_calld->client_stats_report_interval_ > 0 &&
lb_calld->client_stats_ == nullptr) {
- lb_calld->client_stats_ = grpc_grpclb_client_stats_create();
+ lb_calld->client_stats_.reset(New<GrpcLbClientStats>());
// TODO(roth): We currently track this ref manually. Once the
// ClosureRef API is ready, we should pass the RefCountedPtr<> along
// with the callback.
@@ -937,7 +934,7 @@ grpc_lb_addresses* ExtractBalancerAddresses(
size_t lb_addresses_idx = 0;
for (size_t i = 0; i < addresses->num_addresses; ++i) {
if (!addresses->addresses[i].is_balancer) continue;
- if (addresses->addresses[i].user_data != nullptr) {
+ if (GPR_UNLIKELY(addresses->addresses[i].user_data != nullptr)) {
gpr_log(GPR_ERROR,
"This LB policy doesn't support user data. It will be ignored");
}
@@ -1288,7 +1285,7 @@ void GrpcLb::NotifyOnStateChangeLocked(grpc_connectivity_state* current,
void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) {
const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
- if (arg == nullptr || arg->type != GRPC_ARG_POINTER) {
+ if (GPR_UNLIKELY(arg == nullptr || arg->type != GRPC_ARG_POINTER)) {
// Ignore this update.
gpr_log(
GPR_ERROR,
@@ -1516,7 +1513,7 @@ grpc_error* AddLbTokenToInitialMetadata(
// Destroy function used when embedding client stats in call context.
void DestroyClientStats(void* arg) {
- grpc_grpclb_client_stats_unref(static_cast<grpc_grpclb_client_stats*>(arg));
+ static_cast<GrpcLbClientStats*>(arg)->Unref();
}
void GrpcLb::PendingPickSetMetadataAndContext(PendingPick* pp) {
@@ -1524,7 +1521,7 @@ void GrpcLb::PendingPickSetMetadataAndContext(PendingPick* pp) {
* policy (e.g., all addresses failed to connect). There won't be any
* user_data/token available */
if (pp->pick->connected_subchannel != nullptr) {
- if (!GRPC_MDISNULL(pp->lb_token)) {
+ if (GPR_LIKELY(!GRPC_MDISNULL(pp->lb_token))) {
AddLbTokenToInitialMetadata(GRPC_MDELEM_REF(pp->lb_token),
&pp->pick->lb_token_mdelem_storage,
pp->pick->initial_metadata);
@@ -1537,14 +1534,12 @@ void GrpcLb::PendingPickSetMetadataAndContext(PendingPick* pp) {
// Pass on client stats via context. Passes ownership of the reference.
if (pp->client_stats != nullptr) {
pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].value =
- pp->client_stats;
+ pp->client_stats.release();
pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].destroy =
DestroyClientStats;
}
} else {
- if (pp->client_stats != nullptr) {
- grpc_grpclb_client_stats_unref(pp->client_stats);
- }
+ pp->client_stats.reset();
}
}
@@ -1610,8 +1605,8 @@ bool GrpcLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp) {
// subchannel call (and therefore no client_load_reporting filter)
// for dropped calls.
if (lb_calld_ != nullptr && lb_calld_->client_stats() != nullptr) {
- grpc_grpclb_client_stats_add_call_dropped_locked(
- server->load_balance_token, lb_calld_->client_stats());
+ lb_calld_->client_stats()->AddCallDroppedLocked(
+ server->load_balance_token);
}
if (force_async) {
GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE);
@@ -1624,7 +1619,7 @@ bool GrpcLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp) {
}
// Set client_stats and user_data.
if (lb_calld_ != nullptr && lb_calld_->client_stats() != nullptr) {
- pp->client_stats = grpc_grpclb_client_stats_ref(lb_calld_->client_stats());
+ pp->client_stats = lb_calld_->client_stats()->Ref();
}
GPR_ASSERT(pp->pick->user_data == nullptr);
pp->pick->user_data = (void**)&pp->lb_token;
@@ -1649,7 +1644,7 @@ void GrpcLb::CreateRoundRobinPolicyLocked(const Args& args) {
GPR_ASSERT(rr_policy_ == nullptr);
rr_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
"round_robin", args);
- if (rr_policy_ == nullptr) {
+ if (GPR_UNLIKELY(rr_policy_ == nullptr)) {
gpr_log(GPR_ERROR, "[grpclb %p] Failure creating a RoundRobin policy",
this);
return;
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc
index dfbaead7d5..087cd8f276 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc
@@ -22,131 +22,65 @@
#include <string.h>
-#include <grpc/support/alloc.h>
#include <grpc/support/atm.h>
#include <grpc/support/string_util.h>
-#include <grpc/support/sync.h>
-#include "src/core/lib/channel/channel_args.h"
+namespace grpc_core {
-#define GRPC_ARG_GRPCLB_CLIENT_STATS "grpc.grpclb_client_stats"
-
-struct grpc_grpclb_client_stats {
- gpr_refcount refs;
- // This field must only be accessed via *_locked() methods.
- grpc_grpclb_dropped_call_counts* drop_token_counts;
- // These fields may be accessed from multiple threads at a time.
- gpr_atm num_calls_started;
- gpr_atm num_calls_finished;
- gpr_atm num_calls_finished_with_client_failed_to_send;
- gpr_atm num_calls_finished_known_received;
-};
-
-grpc_grpclb_client_stats* grpc_grpclb_client_stats_create() {
- grpc_grpclb_client_stats* client_stats =
- static_cast<grpc_grpclb_client_stats*>(gpr_zalloc(sizeof(*client_stats)));
- gpr_ref_init(&client_stats->refs, 1);
- return client_stats;
-}
-
-grpc_grpclb_client_stats* grpc_grpclb_client_stats_ref(
- grpc_grpclb_client_stats* client_stats) {
- gpr_ref(&client_stats->refs);
- return client_stats;
-}
-
-void grpc_grpclb_client_stats_unref(grpc_grpclb_client_stats* client_stats) {
- if (gpr_unref(&client_stats->refs)) {
- grpc_grpclb_dropped_call_counts_destroy(client_stats->drop_token_counts);
- gpr_free(client_stats);
- }
-}
-
-void grpc_grpclb_client_stats_add_call_started(
- grpc_grpclb_client_stats* client_stats) {
- gpr_atm_full_fetch_add(&client_stats->num_calls_started, (gpr_atm)1);
+void GrpcLbClientStats::AddCallStarted() {
+ gpr_atm_full_fetch_add(&num_calls_started_, (gpr_atm)1);
}
-void grpc_grpclb_client_stats_add_call_finished(
- bool finished_with_client_failed_to_send, bool finished_known_received,
- grpc_grpclb_client_stats* client_stats) {
- gpr_atm_full_fetch_add(&client_stats->num_calls_finished, (gpr_atm)1);
+void GrpcLbClientStats::AddCallFinished(
+ bool finished_with_client_failed_to_send, bool finished_known_received) {
+ gpr_atm_full_fetch_add(&num_calls_finished_, (gpr_atm)1);
if (finished_with_client_failed_to_send) {
- gpr_atm_full_fetch_add(
- &client_stats->num_calls_finished_with_client_failed_to_send,
- (gpr_atm)1);
+ gpr_atm_full_fetch_add(&num_calls_finished_with_client_failed_to_send_,
+ (gpr_atm)1);
}
if (finished_known_received) {
- gpr_atm_full_fetch_add(&client_stats->num_calls_finished_known_received,
- (gpr_atm)1);
+ gpr_atm_full_fetch_add(&num_calls_finished_known_received_, (gpr_atm)1);
}
}
-void grpc_grpclb_client_stats_add_call_dropped_locked(
- char* token, grpc_grpclb_client_stats* client_stats) {
+void GrpcLbClientStats::AddCallDroppedLocked(char* token) {
// Increment num_calls_started and num_calls_finished.
- gpr_atm_full_fetch_add(&client_stats->num_calls_started, (gpr_atm)1);
- gpr_atm_full_fetch_add(&client_stats->num_calls_finished, (gpr_atm)1);
+ gpr_atm_full_fetch_add(&num_calls_started_, (gpr_atm)1);
+ gpr_atm_full_fetch_add(&num_calls_finished_, (gpr_atm)1);
// Record the drop.
- if (client_stats->drop_token_counts == nullptr) {
- client_stats->drop_token_counts =
- static_cast<grpc_grpclb_dropped_call_counts*>(
- gpr_zalloc(sizeof(grpc_grpclb_dropped_call_counts)));
+ if (drop_token_counts_ == nullptr) {
+ drop_token_counts_.reset(New<DroppedCallCounts>());
}
- grpc_grpclb_dropped_call_counts* drop_token_counts =
- client_stats->drop_token_counts;
- for (size_t i = 0; i < drop_token_counts->num_entries; ++i) {
- if (strcmp(drop_token_counts->token_counts[i].token, token) == 0) {
- ++drop_token_counts->token_counts[i].count;
+ for (size_t i = 0; i < drop_token_counts_->size(); ++i) {
+ if (strcmp((*drop_token_counts_)[i].token.get(), token) == 0) {
+ ++(*drop_token_counts_)[i].count;
return;
}
}
- // Not found, so add a new entry. We double the size of the array each time.
- size_t new_num_entries = 2;
- while (new_num_entries < drop_token_counts->num_entries + 1) {
- new_num_entries *= 2;
- }
- drop_token_counts->token_counts = static_cast<grpc_grpclb_drop_token_count*>(
- gpr_realloc(drop_token_counts->token_counts,
- new_num_entries * sizeof(grpc_grpclb_drop_token_count)));
- grpc_grpclb_drop_token_count* new_entry =
- &drop_token_counts->token_counts[drop_token_counts->num_entries++];
- new_entry->token = gpr_strdup(token);
- new_entry->count = 1;
+ // Not found, so add a new entry.
+ drop_token_counts_->emplace_back(UniquePtr<char>(gpr_strdup(token)), 1);
}
-static void atomic_get_and_reset_counter(int64_t* value, gpr_atm* counter) {
- *value = static_cast<int64_t>(gpr_atm_acq_load(counter));
- gpr_atm_full_fetch_add(counter, (gpr_atm)(-*value));
+namespace {
+
+void AtomicGetAndResetCounter(int64_t* value, gpr_atm* counter) {
+ *value = static_cast<int64_t>(gpr_atm_full_xchg(counter, (gpr_atm)0));
}
-void grpc_grpclb_client_stats_get_locked(
- grpc_grpclb_client_stats* client_stats, int64_t* num_calls_started,
- int64_t* num_calls_finished,
+} // namespace
+
+void GrpcLbClientStats::GetLocked(
+ int64_t* num_calls_started, int64_t* num_calls_finished,
int64_t* num_calls_finished_with_client_failed_to_send,
int64_t* num_calls_finished_known_received,
- grpc_grpclb_dropped_call_counts** drop_token_counts) {
- atomic_get_and_reset_counter(num_calls_started,
- &client_stats->num_calls_started);
- atomic_get_and_reset_counter(num_calls_finished,
- &client_stats->num_calls_finished);
- atomic_get_and_reset_counter(
- num_calls_finished_with_client_failed_to_send,
- &client_stats->num_calls_finished_with_client_failed_to_send);
- atomic_get_and_reset_counter(
- num_calls_finished_known_received,
- &client_stats->num_calls_finished_known_received);
- *drop_token_counts = client_stats->drop_token_counts;
- client_stats->drop_token_counts = nullptr;
+ UniquePtr<DroppedCallCounts>* drop_token_counts) {
+ AtomicGetAndResetCounter(num_calls_started, &num_calls_started_);
+ AtomicGetAndResetCounter(num_calls_finished, &num_calls_finished_);
+ AtomicGetAndResetCounter(num_calls_finished_with_client_failed_to_send,
+ &num_calls_finished_with_client_failed_to_send_);
+ AtomicGetAndResetCounter(num_calls_finished_known_received,
+ &num_calls_finished_known_received_);
+ *drop_token_counts = std::move(drop_token_counts_);
}
-void grpc_grpclb_dropped_call_counts_destroy(
- grpc_grpclb_dropped_call_counts* drop_entries) {
- if (drop_entries != nullptr) {
- for (size_t i = 0; i < drop_entries->num_entries; ++i) {
- gpr_free(drop_entries->token_counts[i].token);
- }
- gpr_free(drop_entries->token_counts);
- gpr_free(drop_entries);
- }
-}
+} // namespace grpc_core
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h
index c971e56883..18ab2c9452 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h
@@ -21,47 +21,52 @@
#include <grpc/support/port_platform.h>
-#include <stdbool.h>
+#include <grpc/support/atm.h>
-#include <grpc/impl/codegen/grpc_types.h>
+#include "src/core/lib/gprpp/inlined_vector.h"
+#include "src/core/lib/gprpp/memory.h"
+#include "src/core/lib/gprpp/ref_counted.h"
-typedef struct grpc_grpclb_client_stats grpc_grpclb_client_stats;
+namespace grpc_core {
-typedef struct {
- char* token;
- int64_t count;
-} grpc_grpclb_drop_token_count;
+class GrpcLbClientStats : public RefCounted<GrpcLbClientStats> {
+ public:
+ struct DropTokenCount {
+ UniquePtr<char> token;
+ int64_t count;
-typedef struct {
- grpc_grpclb_drop_token_count* token_counts;
- size_t num_entries;
-} grpc_grpclb_dropped_call_counts;
+ DropTokenCount(UniquePtr<char> token, int64_t count)
+ : token(std::move(token)), count(count) {}
+ };
-grpc_grpclb_client_stats* grpc_grpclb_client_stats_create();
-grpc_grpclb_client_stats* grpc_grpclb_client_stats_ref(
- grpc_grpclb_client_stats* client_stats);
-void grpc_grpclb_client_stats_unref(grpc_grpclb_client_stats* client_stats);
+ typedef InlinedVector<DropTokenCount, 10> DroppedCallCounts;
-void grpc_grpclb_client_stats_add_call_started(
- grpc_grpclb_client_stats* client_stats);
-void grpc_grpclb_client_stats_add_call_finished(
- bool finished_with_client_failed_to_send, bool finished_known_received,
- grpc_grpclb_client_stats* client_stats);
+ GrpcLbClientStats() {}
-// This method is not thread-safe; caller must synchronize.
-void grpc_grpclb_client_stats_add_call_dropped_locked(
- char* token, grpc_grpclb_client_stats* client_stats);
+ void AddCallStarted();
+ void AddCallFinished(bool finished_with_client_failed_to_send,
+ bool finished_known_received);
-// This method is not thread-safe; caller must synchronize.
-void grpc_grpclb_client_stats_get_locked(
- grpc_grpclb_client_stats* client_stats, int64_t* num_calls_started,
- int64_t* num_calls_finished,
- int64_t* num_calls_finished_with_client_failed_to_send,
- int64_t* num_calls_finished_known_received,
- grpc_grpclb_dropped_call_counts** drop_token_counts);
+ // This method is not thread-safe; caller must synchronize.
+ void AddCallDroppedLocked(char* token);
-void grpc_grpclb_dropped_call_counts_destroy(
- grpc_grpclb_dropped_call_counts* drop_entries);
+ // This method is not thread-safe; caller must synchronize.
+ void GetLocked(int64_t* num_calls_started, int64_t* num_calls_finished,
+ int64_t* num_calls_finished_with_client_failed_to_send,
+ int64_t* num_calls_finished_known_received,
+ UniquePtr<DroppedCallCounts>* drop_token_counts);
+
+ private:
+ // This field must only be accessed via *_locked() methods.
+ UniquePtr<DroppedCallCounts> drop_token_counts_;
+ // These fields may be accessed from multiple threads at a time.
+ gpr_atm num_calls_started_ = 0;
+ gpr_atm num_calls_finished_ = 0;
+ gpr_atm num_calls_finished_with_client_failed_to_send_ = 0;
+ gpr_atm num_calls_finished_known_received_ = 0;
+};
+
+} // namespace grpc_core
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_CLIENT_STATS_H \
*/
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc
index 7ef3bcf24f..ed246273c9 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc
@@ -29,7 +29,7 @@ static bool count_serverlist(pb_istream_t* stream, const pb_field_t* field,
void** arg) {
grpc_grpclb_serverlist* sl = static_cast<grpc_grpclb_serverlist*>(*arg);
grpc_grpclb_server server;
- if (!pb_decode(stream, grpc_lb_v1_Server_fields, &server)) {
+ if (GPR_UNLIKELY(!pb_decode(stream, grpc_lb_v1_Server_fields, &server))) {
gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream));
return false;
}
@@ -52,7 +52,7 @@ static bool decode_serverlist(pb_istream_t* stream, const pb_field_t* field,
GPR_ASSERT(dec_arg->serverlist->num_servers >= dec_arg->decoding_idx);
grpc_grpclb_server* server =
static_cast<grpc_grpclb_server*>(gpr_zalloc(sizeof(grpc_grpclb_server)));
- if (!pb_decode(stream, grpc_lb_v1_Server_fields, server)) {
+ if (GPR_UNLIKELY(!pb_decode(stream, grpc_lb_v1_Server_fields, server))) {
gpr_free(server);
gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream));
return false;
@@ -89,16 +89,16 @@ static bool encode_string(pb_ostream_t* stream, const pb_field_t* field,
static bool encode_drops(pb_ostream_t* stream, const pb_field_t* field,
void* const* arg) {
- grpc_grpclb_dropped_call_counts* drop_entries =
- static_cast<grpc_grpclb_dropped_call_counts*>(*arg);
+ grpc_core::GrpcLbClientStats::DroppedCallCounts* drop_entries =
+ static_cast<grpc_core::GrpcLbClientStats::DroppedCallCounts*>(*arg);
if (drop_entries == nullptr) return true;
- for (size_t i = 0; i < drop_entries->num_entries; ++i) {
+ for (size_t i = 0; i < drop_entries->size(); ++i) {
if (!pb_encode_tag_for_field(stream, field)) return false;
grpc_lb_v1_ClientStatsPerToken drop_message;
drop_message.load_balance_token.funcs.encode = encode_string;
- drop_message.load_balance_token.arg = drop_entries->token_counts[i].token;
+ drop_message.load_balance_token.arg = (*drop_entries)[i].token.get();
drop_message.has_num_calls = true;
- drop_message.num_calls = drop_entries->token_counts[i].count;
+ drop_message.num_calls = (*drop_entries)[i].count;
if (!pb_encode_submessage(stream, grpc_lb_v1_ClientStatsPerToken_fields,
&drop_message)) {
return false;
@@ -108,7 +108,7 @@ static bool encode_drops(pb_ostream_t* stream, const pb_field_t* field,
}
grpc_grpclb_request* grpc_grpclb_load_report_request_create_locked(
- grpc_grpclb_client_stats* client_stats) {
+ grpc_core::GrpcLbClientStats* client_stats) {
grpc_grpclb_request* req = static_cast<grpc_grpclb_request*>(
gpr_zalloc(sizeof(grpc_grpclb_request)));
req->has_client_stats = true;
@@ -120,13 +120,15 @@ grpc_grpclb_request* grpc_grpclb_load_report_request_create_locked(
req->client_stats.has_num_calls_finished_with_client_failed_to_send = true;
req->client_stats.has_num_calls_finished_known_received = true;
req->client_stats.calls_finished_with_drop.funcs.encode = encode_drops;
- grpc_grpclb_client_stats_get_locked(
- client_stats, &req->client_stats.num_calls_started,
+ grpc_core::UniquePtr<grpc_core::GrpcLbClientStats::DroppedCallCounts>
+ drop_counts;
+ client_stats->GetLocked(
+ &req->client_stats.num_calls_started,
&req->client_stats.num_calls_finished,
&req->client_stats.num_calls_finished_with_client_failed_to_send,
- &req->client_stats.num_calls_finished_known_received,
- reinterpret_cast<grpc_grpclb_dropped_call_counts**>(
- &req->client_stats.calls_finished_with_drop.arg));
+ &req->client_stats.num_calls_finished_known_received, &drop_counts);
+ // Will be deleted in grpc_grpclb_request_destroy().
+ req->client_stats.calls_finished_with_drop.arg = drop_counts.release();
return req;
}
@@ -149,10 +151,10 @@ grpc_slice grpc_grpclb_request_encode(const grpc_grpclb_request* request) {
void grpc_grpclb_request_destroy(grpc_grpclb_request* request) {
if (request->has_client_stats) {
- grpc_grpclb_dropped_call_counts* drop_entries =
- static_cast<grpc_grpclb_dropped_call_counts*>(
+ grpc_core::GrpcLbClientStats::DroppedCallCounts* drop_entries =
+ static_cast<grpc_core::GrpcLbClientStats::DroppedCallCounts*>(
request->client_stats.calls_finished_with_drop.arg);
- grpc_grpclb_dropped_call_counts_destroy(drop_entries);
+ grpc_core::Delete(drop_entries);
}
gpr_free(request);
}
@@ -165,7 +167,8 @@ grpc_grpclb_initial_response* grpc_grpclb_initial_response_parse(
GRPC_SLICE_LENGTH(encoded_grpc_grpclb_response));
grpc_grpclb_response res;
memset(&res, 0, sizeof(grpc_grpclb_response));
- if (!pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res)) {
+ if (GPR_UNLIKELY(
+ !pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res))) {
gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream));
return nullptr;
}
@@ -195,7 +198,7 @@ grpc_grpclb_serverlist* grpc_grpclb_response_parse_serverlist(
res.server_list.servers.funcs.decode = count_serverlist;
res.server_list.servers.arg = sl;
bool status = pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res);
- if (!status) {
+ if (GPR_UNLIKELY(!status)) {
gpr_free(sl);
gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream));
return nullptr;
@@ -211,7 +214,7 @@ grpc_grpclb_serverlist* grpc_grpclb_response_parse_serverlist(
res.server_list.servers.arg = &decode_arg;
status = pb_decode(&stream_at_start, grpc_lb_v1_LoadBalanceResponse_fields,
&res);
- if (!status) {
+ if (GPR_UNLIKELY(!status)) {
grpc_grpclb_destroy_serverlist(sl);
gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream));
return nullptr;
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
index d4270f2536..06810a9fe8 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
@@ -42,7 +42,7 @@ typedef struct {
/** Create a request for a gRPC LB service under \a lb_service_name */
grpc_grpclb_request* grpc_grpclb_request_create(const char* lb_service_name);
grpc_grpclb_request* grpc_grpclb_load_report_request_create_locked(
- grpc_grpclb_client_stats* client_stats);
+ grpc_core::GrpcLbClientStats* client_stats);
/** Protocol Buffers v3-encode \a request */
grpc_slice grpc_grpclb_request_encode(const grpc_grpclb_request* request);
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index 79e8ad5663..b177385065 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -608,7 +608,7 @@ void RoundRobin::PingOneLocked(grpc_closure* on_initiate,
void RoundRobin::UpdateLocked(const grpc_channel_args& args) {
const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
- if (arg == nullptr || arg->type != GRPC_ARG_POINTER) {
+ if (GPR_UNLIKELY(arg == nullptr || arg->type != GRPC_ARG_POINTER)) {
gpr_log(GPR_ERROR, "[RR %p] update provided no addresses; ignoring", this);
// If we don't have a current subchannel list, go into TRANSIENT_FAILURE.
// Otherwise, keep using the current subchannel list (ignore this update).