aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/server/load_reporter/load_reporter.h
blob: b2254d56016042450c928ad5a01b91b36cab3223 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
/*
 *
 * Copyright 2018 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

#ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H
#define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H

#include <grpc/support/port_platform.h>

#include <atomic>
#include <chrono>
#include <deque>
#include <vector>

#include <grpc/support/log.h>
#include <grpcpp/impl/codegen/config.h>

#include "src/cpp/server/load_reporter/load_data_store.h"
#include "src/proto/grpc/lb/v1/load_reporter.grpc.pb.h"

#include "opencensus/stats/stats.h"

namespace grpc {
namespace load_reporter {

// The interface to get the Census stats. Abstracted for mocking.
class CensusViewProvider {
 public:
  // Maps from the view name to the view data.
  using ViewDataMap =
      std::unordered_map<grpc::string, ::opencensus::stats::ViewData>;
  // Maps from the view name to the view descriptor.
  using ViewDescriptorMap =
      std::unordered_map<grpc::string, ::opencensus::stats::ViewDescriptor>;

  CensusViewProvider();
  virtual ~CensusViewProvider() = default;

  // Fetches the view data accumulated since last fetching, and returns it as a
  // map from the view name to the view data.
  virtual ViewDataMap FetchViewData() = 0;

  // A helper function that gets a row with the input tag values from the view
  // data. Only used when we know that row must exist because we have seen a row
  // with the same tag values in a related view data. Several ViewData's are
  // considered related if their views are based on the measures that are always
  // recorded at the same time.
  static double GetRelatedViewDataRowDouble(
      const ViewDataMap& view_data_map, const char* view_name,
      size_t view_name_len, const std::vector<grpc::string>& tag_values);
  static uint64_t GetRelatedViewDataRowInt(
      const ViewDataMap& view_data_map, const char* view_name,
      size_t view_name_len, const std::vector<grpc::string>& tag_values);

 protected:
  const ViewDescriptorMap& view_descriptor_map() const {
    return view_descriptor_map_;
  }

 private:
  ViewDescriptorMap view_descriptor_map_;
  // Tag keys.
  ::opencensus::stats::TagKey tag_key_token_;
  ::opencensus::stats::TagKey tag_key_host_;
  ::opencensus::stats::TagKey tag_key_user_id_;
  ::opencensus::stats::TagKey tag_key_status_;
  ::opencensus::stats::TagKey tag_key_metric_name_;
};

// The default implementation fetches the real stats from Census.
class CensusViewProviderDefaultImpl : public CensusViewProvider {
 public:
  CensusViewProviderDefaultImpl();

  ViewDataMap FetchViewData() override;

 private:
  std::unordered_map<grpc::string, ::opencensus::stats::View> view_map_;
};

// The interface to get the CPU stats. Abstracted for mocking.
class CpuStatsProvider {
 public:
  // The used and total amounts of CPU usage.
  using CpuStatsSample = std::pair<uint64_t, uint64_t>;

  virtual ~CpuStatsProvider() = default;

  // Gets the cumulative used CPU and total CPU resource.
  virtual CpuStatsSample GetCpuStats() = 0;
};

// The default implementation reads CPU jiffies from the system to calculate CPU
// utilization.
class CpuStatsProviderDefaultImpl : public CpuStatsProvider {
 public:
  CpuStatsSample GetCpuStats() override;
};

// Maintains all the load data and load reporting streams.
class LoadReporter {
 public:
  // TODO(juanlishen): Allow config for providers from users.
  LoadReporter(uint32_t feedback_sample_window_seconds,
               std::unique_ptr<CensusViewProvider> census_view_provider,
               std::unique_ptr<CpuStatsProvider> cpu_stats_provider)
      : feedback_sample_window_seconds_(feedback_sample_window_seconds),
        census_view_provider_(std::move(census_view_provider)),
        cpu_stats_provider_(std::move(cpu_stats_provider)) {
    // Append the initial record so that the next real record can have a base.
    AppendNewFeedbackRecord(0, 0);
  }

  // Fetches the latest data from Census and merge it into the data store.
  // Also adds a new sample to the LB feedback sliding window.
  // Thread-unsafe. (1). The access to the load data store and feedback records
  // has locking. (2). The access to the Census view provider and CPU stats
  // provider lacks locking, but we only access these two members in this method
  // (in testing, we also access them when setting up expectation). So the
  // invocations of this method must be serialized.
  void FetchAndSample();

  // Generates a report for that host and balancer. The report contains
  // all the stats data accumulated between the last report (i.e., the last
  // consumption) and the last fetch from Census (i.e., the last production).
  // Thread-safe.
  ::google::protobuf::RepeatedPtrField<::grpc::lb::v1::Load> GenerateLoads(
      const grpc::string& hostname, const grpc::string& lb_id);

  // The feedback is calculated from the stats data recorded in the sliding
  // window. Outdated records are discarded.
  // Thread-safe.
  ::grpc::lb::v1::LoadBalancingFeedback GenerateLoadBalancingFeedback();

  // Wrapper around LoadDataStore::ReportStreamCreated.
  // Thread-safe.
  void ReportStreamCreated(const grpc::string& hostname,
                           const grpc::string& lb_id,
                           const grpc::string& load_key);

  // Wrapper around LoadDataStore::ReportStreamClosed.
  // Thread-safe.
  void ReportStreamClosed(const grpc::string& hostname,
                          const grpc::string& lb_id);

  // Generates a unique LB ID of length kLbIdLength. Returns an empty string
  // upon failure. Thread-safe.
  grpc::string GenerateLbId();

  // Accessors only for testing.
  CensusViewProvider* census_view_provider() {
    return census_view_provider_.get();
  }
  CpuStatsProvider* cpu_stats_provider() { return cpu_stats_provider_.get(); }

 private:
  struct LoadBalancingFeedbackRecord {
    std::chrono::system_clock::time_point end_time;
    uint64_t rpcs;
    uint64_t errors;
    uint64_t cpu_usage;
    uint64_t cpu_limit;

    LoadBalancingFeedbackRecord(
        const std::chrono::system_clock::time_point& end_time, uint64_t rpcs,
        uint64_t errors, uint64_t cpu_usage, uint64_t cpu_limit)
        : end_time(end_time),
          rpcs(rpcs),
          errors(errors),
          cpu_usage(cpu_usage),
          cpu_limit(cpu_limit) {}
  };

  // Finds the view data about starting call from the view_data_map and merges
  // the data to the load data store.
  void ProcessViewDataCallStart(
      const CensusViewProvider::ViewDataMap& view_data_map);
  // Finds the view data about ending call from the view_data_map and merges the
  // data to the load data store.
  void ProcessViewDataCallEnd(
      const CensusViewProvider::ViewDataMap& view_data_map);
  // Finds the view data about the customized call metrics from the
  // view_data_map and merges the data to the load data store.
  void ProcessViewDataOtherCallMetrics(
      const CensusViewProvider::ViewDataMap& view_data_map);

  bool IsRecordInWindow(const LoadBalancingFeedbackRecord& record,
                        std::chrono::system_clock::time_point now) {
    return record.end_time > now - feedback_sample_window_seconds_;
  }

  void AppendNewFeedbackRecord(uint64_t rpcs, uint64_t errors);

  // Extracts an OrphanedLoadIdentifier from the per-balancer store and attaches
  // it to the load.
  void AttachOrphanLoadId(::grpc::lb::v1::Load* load,
                          const PerBalancerStore& per_balancer_store);

  std::atomic<int64_t> next_lb_id_{0};
  const std::chrono::seconds feedback_sample_window_seconds_;
  std::mutex feedback_mu_;
  std::deque<LoadBalancingFeedbackRecord> feedback_records_;
  // TODO(juanlishen): Lock in finer grain. Locking the whole store may be
  // too expensive.
  std::mutex store_mu_;
  LoadDataStore load_data_store_;
  std::unique_ptr<CensusViewProvider> census_view_provider_;
  std::unique_ptr<CpuStatsProvider> cpu_stats_provider_;
};

}  // namespace load_reporter
}  // namespace grpc

#endif  // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H