aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-10-15 22:22:27 +0000
committerGravatar Craig Tiller <ctiller@google.com>2017-10-15 22:22:27 +0000
commit1406f9c524029b5725dfc578551366e54c6494fb (patch)
treed9cfbd7df1cdcc6655717837749704724b25dc4b /src/core/lib
parentad059f70f8bccee3ae1a0ef1568c8d89c0c1004d (diff)
parent742ca098191c69ff23bf76b47989f21809c09cc3 (diff)
Merge branch 'bs2' into epexinf
Diffstat (limited to 'src/core/lib')
-rw-r--r--src/core/lib/debug/stats_data.cc9
-rw-r--r--src/core/lib/debug/stats_data.h15
-rw-r--r--src/core/lib/debug/stats_data.yaml11
-rw-r--r--src/core/lib/debug/stats_data_bq_schema.sql4
-rw-r--r--src/core/lib/iomgr/call_combiner.cc11
-rw-r--r--src/core/lib/iomgr/combiner.cc1
-rw-r--r--src/core/lib/iomgr/port.h10
-rw-r--r--src/core/lib/support/manual_constructor.h76
-rw-r--r--src/core/lib/support/time_posix.cc2
-rw-r--r--src/core/lib/transport/bdp_estimator.cc128
-rw-r--r--src/core/lib/transport/bdp_estimator.h132
11 files changed, 256 insertions, 143 deletions
diff --git a/src/core/lib/debug/stats_data.cc b/src/core/lib/debug/stats_data.cc
index b4ae8f312c..5d737c56cb 100644
--- a/src/core/lib/debug/stats_data.cc
+++ b/src/core/lib/debug/stats_data.cc
@@ -104,6 +104,10 @@ const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT] = {
"combiner_locks_scheduled_items",
"combiner_locks_scheduled_final_items",
"combiner_locks_offloaded",
+ "call_combiner_locks_initiated",
+ "call_combiner_locks_scheduled_items",
+ "call_combiner_set_notify_on_cancel",
+ "call_combiner_cancelled",
"executor_scheduled_short_items",
"executor_scheduled_long_items",
"executor_scheduled_to_self",
@@ -213,6 +217,11 @@ const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = {
"Number of items scheduled against combiner locks",
"Number of final items scheduled against combiner locks",
"Number of combiner locks offloaded to different threads",
+ "Number of call combiner lock entries by process (first items queued to a "
+ "call combiner)",
+ "Number of items scheduled against call combiner locks",
+ "Number of times a cancellation callback was set on a call combiner",
+ "Number of times a call combiner was cancelled",
"Number of finite runtime closures scheduled against the executor (gRPC "
"thread pool)",
"Number of potentially infinite runtime closures scheduled against the "
diff --git a/src/core/lib/debug/stats_data.h b/src/core/lib/debug/stats_data.h
index d4c4437ca0..031942df5c 100644
--- a/src/core/lib/debug/stats_data.h
+++ b/src/core/lib/debug/stats_data.h
@@ -110,6 +110,10 @@ typedef enum {
GRPC_STATS_COUNTER_COMBINER_LOCKS_SCHEDULED_ITEMS,
GRPC_STATS_COUNTER_COMBINER_LOCKS_SCHEDULED_FINAL_ITEMS,
GRPC_STATS_COUNTER_COMBINER_LOCKS_OFFLOADED,
+ GRPC_STATS_COUNTER_CALL_COMBINER_LOCKS_INITIATED,
+ GRPC_STATS_COUNTER_CALL_COMBINER_LOCKS_SCHEDULED_ITEMS,
+ GRPC_STATS_COUNTER_CALL_COMBINER_SET_NOTIFY_ON_CANCEL,
+ GRPC_STATS_COUNTER_CALL_COMBINER_CANCELLED,
GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_SHORT_ITEMS,
GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_LONG_ITEMS,
GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_TO_SELF,
@@ -407,6 +411,17 @@ typedef enum {
#define GRPC_STATS_INC_COMBINER_LOCKS_OFFLOADED(exec_ctx) \
GRPC_STATS_INC_COUNTER((exec_ctx), \
GRPC_STATS_COUNTER_COMBINER_LOCKS_OFFLOADED)
+#define GRPC_STATS_INC_CALL_COMBINER_LOCKS_INITIATED(exec_ctx) \
+ GRPC_STATS_INC_COUNTER((exec_ctx), \
+ GRPC_STATS_COUNTER_CALL_COMBINER_LOCKS_INITIATED)
+#define GRPC_STATS_INC_CALL_COMBINER_LOCKS_SCHEDULED_ITEMS(exec_ctx) \
+ GRPC_STATS_INC_COUNTER( \
+ (exec_ctx), GRPC_STATS_COUNTER_CALL_COMBINER_LOCKS_SCHEDULED_ITEMS)
+#define GRPC_STATS_INC_CALL_COMBINER_SET_NOTIFY_ON_CANCEL(exec_ctx) \
+ GRPC_STATS_INC_COUNTER( \
+ (exec_ctx), GRPC_STATS_COUNTER_CALL_COMBINER_SET_NOTIFY_ON_CANCEL)
+#define GRPC_STATS_INC_CALL_COMBINER_CANCELLED(exec_ctx) \
+ GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_CALL_COMBINER_CANCELLED)
#define GRPC_STATS_INC_EXECUTOR_SCHEDULED_SHORT_ITEMS(exec_ctx) \
GRPC_STATS_INC_COUNTER((exec_ctx), \
GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_SHORT_ITEMS)
diff --git a/src/core/lib/debug/stats_data.yaml b/src/core/lib/debug/stats_data.yaml
index 00e81e74e2..af4553028e 100644
--- a/src/core/lib/debug/stats_data.yaml
+++ b/src/core/lib/debug/stats_data.yaml
@@ -245,6 +245,16 @@
doc: Number of final items scheduled against combiner locks
- counter: combiner_locks_offloaded
doc: Number of combiner locks offloaded to different threads
+# call combiner locks
+- counter: call_combiner_locks_initiated
+ doc: Number of call combiner lock entries by process
+ (first items queued to a call combiner)
+- counter: call_combiner_locks_scheduled_items
+ doc: Number of items scheduled against call combiner locks
+- counter: call_combiner_set_notify_on_cancel
+ doc: Number of times a cancellation callback was set on a call combiner
+- counter: call_combiner_cancelled
+ doc: Number of times a call combiner was cancelled
# executor
- counter: executor_scheduled_short_items
doc: Number of finite runtime closures scheduled against the executor
@@ -282,4 +292,3 @@
- counter: cq_ev_queue_transient_pop_failures
doc: Number of times NULL was popped out of completion queue's event queue
even though the event queue was not empty
-
diff --git a/src/core/lib/debug/stats_data_bq_schema.sql b/src/core/lib/debug/stats_data_bq_schema.sql
index f34b0c25a0..04b6d471f6 100644
--- a/src/core/lib/debug/stats_data_bq_schema.sql
+++ b/src/core/lib/debug/stats_data_bq_schema.sql
@@ -79,6 +79,10 @@ combiner_locks_initiated_per_iteration:FLOAT,
combiner_locks_scheduled_items_per_iteration:FLOAT,
combiner_locks_scheduled_final_items_per_iteration:FLOAT,
combiner_locks_offloaded_per_iteration:FLOAT,
+call_combiner_locks_initiated_per_iteration:FLOAT,
+call_combiner_locks_scheduled_items_per_iteration:FLOAT,
+call_combiner_set_notify_on_cancel_per_iteration:FLOAT,
+call_combiner_cancelled_per_iteration:FLOAT,
executor_scheduled_short_items_per_iteration:FLOAT,
executor_scheduled_long_items_per_iteration:FLOAT,
executor_scheduled_to_self_per_iteration:FLOAT,
diff --git a/src/core/lib/iomgr/call_combiner.cc b/src/core/lib/iomgr/call_combiner.cc
index bab3df021a..d45719608b 100644
--- a/src/core/lib/iomgr/call_combiner.cc
+++ b/src/core/lib/iomgr/call_combiner.cc
@@ -21,6 +21,8 @@
#include <inttypes.h>
#include <grpc/support/log.h>
+#include "src/core/lib/debug/stats.h"
+#include "src/core/lib/profiling/timers.h"
grpc_tracer_flag grpc_call_combiner_trace =
GRPC_TRACER_INITIALIZER(false, "call_combiner");
@@ -60,6 +62,7 @@ void grpc_call_combiner_start(grpc_exec_ctx* exec_ctx,
grpc_closure* closure,
grpc_error* error DEBUG_ARGS,
const char* reason) {
+ GPR_TIMER_BEGIN("call_combiner_start", 0);
if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
gpr_log(GPR_DEBUG,
"==> grpc_call_combiner_start() [%p] closure=%p [" DEBUG_FMT_STR
@@ -73,7 +76,10 @@ void grpc_call_combiner_start(grpc_exec_ctx* exec_ctx,
gpr_log(GPR_DEBUG, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size,
prev_size + 1);
}
+ GRPC_STATS_INC_CALL_COMBINER_LOCKS_SCHEDULED_ITEMS(exec_ctx);
if (prev_size == 0) {
+ GRPC_STATS_INC_CALL_COMBINER_LOCKS_INITIATED(exec_ctx);
+ GPR_TIMER_MARK("call_combiner_initiate", 0);
if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
gpr_log(GPR_DEBUG, " EXECUTING IMMEDIATELY");
}
@@ -87,11 +93,13 @@ void grpc_call_combiner_start(grpc_exec_ctx* exec_ctx,
closure->error_data.error = error;
gpr_mpscq_push(&call_combiner->queue, (gpr_mpscq_node*)closure);
}
+ GPR_TIMER_END("call_combiner_start", 0);
}
void grpc_call_combiner_stop(grpc_exec_ctx* exec_ctx,
grpc_call_combiner* call_combiner DEBUG_ARGS,
const char* reason) {
+ GPR_TIMER_BEGIN("call_combiner_stop", 0);
if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
gpr_log(GPR_DEBUG,
"==> grpc_call_combiner_stop() [%p] [" DEBUG_FMT_STR "%s]",
@@ -130,11 +138,13 @@ void grpc_call_combiner_stop(grpc_exec_ctx* exec_ctx,
} else if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
gpr_log(GPR_DEBUG, " queue empty");
}
+ GPR_TIMER_END("call_combiner_stop", 0);
}
void grpc_call_combiner_set_notify_on_cancel(grpc_exec_ctx* exec_ctx,
grpc_call_combiner* call_combiner,
grpc_closure* closure) {
+ GRPC_STATS_INC_CALL_COMBINER_SET_NOTIFY_ON_CANCEL(exec_ctx);
while (true) {
// Decode original state.
gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state);
@@ -179,6 +189,7 @@ void grpc_call_combiner_set_notify_on_cancel(grpc_exec_ctx* exec_ctx,
void grpc_call_combiner_cancel(grpc_exec_ctx* exec_ctx,
grpc_call_combiner* call_combiner,
grpc_error* error) {
+ GRPC_STATS_INC_CALL_COMBINER_CANCELLED(exec_ctx);
while (true) {
gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state);
grpc_error* original_error = decode_cancel_state_error(original_state);
diff --git a/src/core/lib/iomgr/combiner.cc b/src/core/lib/iomgr/combiner.cc
index 0e707ef839..53f4b7eaa7 100644
--- a/src/core/lib/iomgr/combiner.cc
+++ b/src/core/lib/iomgr/combiner.cc
@@ -165,6 +165,7 @@ static void combiner_exec(grpc_exec_ctx *exec_ctx, grpc_closure *cl,
lock, cl, last));
if (last == 1) {
GRPC_STATS_INC_COMBINER_LOCKS_INITIATED(exec_ctx);
+ GPR_TIMER_MARK("combiner.initiated", 0);
gpr_atm_no_barrier_store(&lock->initiating_exec_ctx_or_null,
(gpr_atm)exec_ctx);
// first element on this list: add it to the list of combiner locks
diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h
index 42033d0ba4..1cc6d98491 100644
--- a/src/core/lib/iomgr/port.h
+++ b/src/core/lib/iomgr/port.h
@@ -109,6 +109,16 @@
#define GRPC_POSIX_SOCKETUTILS 1
#define GRPC_POSIX_WAKEUP_FD 1
#define GRPC_TIMER_USE_GENERIC 1
+#elif defined(GPR_OPENBSD)
+#define GRPC_HAVE_IFADDRS 1
+#define GRPC_HAVE_IPV6_RECVPKTINFO 1
+#define GRPC_HAVE_UNIX_SOCKET 1
+#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1
+#define GRPC_POSIX_SOCKET 1
+#define GRPC_POSIX_SOCKETADDR 1
+#define GRPC_POSIX_SOCKETUTILS 1
+#define GRPC_POSIX_WAKEUP_FD 1
+#define GRPC_TIMER_USE_GENERIC 1
#elif defined(GPR_NACL)
#define GRPC_HAVE_ARPA_NAMESER 1
#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1
diff --git a/src/core/lib/support/manual_constructor.h b/src/core/lib/support/manual_constructor.h
new file mode 100644
index 0000000000..d753cf98a0
--- /dev/null
+++ b/src/core/lib/support/manual_constructor.h
@@ -0,0 +1,76 @@
+/*
+ *
+ * Copyright 2016 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_SUPPORT_MANUAL_CONSTRUCTOR_H
+#define GRPC_CORE_LIB_SUPPORT_MANUAL_CONSTRUCTOR_H
+
+// manually construct a region of memory with some type
+
+#include <stddef.h>
+#include <new>
+#include <type_traits>
+#include <utility>
+
+namespace grpc_core {
+
+template <typename Type>
+class ManualConstructor {
+ public:
+ // No constructor or destructor because one of the most useful uses of
+ // this class is as part of a union, and members of a union could not have
+ // constructors or destructors till C++11. And, anyway, the whole point of
+ // this class is to bypass constructor and destructor.
+
+ Type* get() { return reinterpret_cast<Type*>(&space_); }
+ const Type* get() const { return reinterpret_cast<const Type*>(&space_); }
+
+ Type* operator->() { return get(); }
+ const Type* operator->() const { return get(); }
+
+ Type& operator*() { return *get(); }
+ const Type& operator*() const { return *get(); }
+
+ void Init() { new (&space_) Type; }
+
+ // Init() constructs the Type instance using the given arguments
+ // (which are forwarded to Type's constructor).
+ //
+ // Note that Init() with no arguments performs default-initialization,
+ // not zero-initialization (i.e it behaves the same as "new Type;", not
+ // "new Type();"), so it will leave non-class types uninitialized.
+ template <typename... Ts>
+ void Init(Ts&&... args) {
+ new (&space_) Type(std::forward<Ts>(args)...);
+ }
+
+ // Init() that is equivalent to copy and move construction.
+ // Enables usage like this:
+ // ManualConstructor<std::vector<int>> v;
+ // v.Init({1, 2, 3});
+ void Init(const Type& x) { new (&space_) Type(x); }
+ void Init(Type&& x) { new (&space_) Type(std::move(x)); }
+
+ void Destroy() { get()->~Type(); }
+
+ private:
+ typename std::aligned_storage<sizeof(Type), alignof(Type)>::type space_;
+};
+
+} // namespace grpc_core
+
+#endif
diff --git a/src/core/lib/support/time_posix.cc b/src/core/lib/support/time_posix.cc
index 3267ea6b54..3f8a9094fd 100644
--- a/src/core/lib/support/time_posix.cc
+++ b/src/core/lib/support/time_posix.cc
@@ -42,7 +42,7 @@ static struct timespec timespec_from_gpr(gpr_timespec gts) {
return rv;
}
-#if _POSIX_TIMERS > 0
+#if _POSIX_TIMERS > 0 || defined(__OpenBSD__)
static gpr_timespec gpr_from_timespec(struct timespec ts,
gpr_clock_type clock_type) {
/*
diff --git a/src/core/lib/transport/bdp_estimator.cc b/src/core/lib/transport/bdp_estimator.cc
index 6ed427ce5c..2a1c97c84e 100644
--- a/src/core/lib/transport/bdp_estimator.cc
+++ b/src/core/lib/transport/bdp_estimator.cc
@@ -21,117 +21,65 @@
#include <inttypes.h>
#include <stdlib.h>
-#include <grpc/support/log.h>
#include <grpc/support/useful.h>
grpc_tracer_flag grpc_bdp_estimator_trace =
GRPC_TRACER_INITIALIZER(false, "bdp_estimator");
-void grpc_bdp_estimator_init(grpc_bdp_estimator *estimator, const char *name) {
- estimator->estimate = 65536;
- estimator->ping_state = GRPC_BDP_PING_UNSCHEDULED;
- estimator->ping_start_time = gpr_time_0(GPR_CLOCK_MONOTONIC);
- estimator->next_ping_scheduled = 0;
- estimator->name = name;
- estimator->bw_est = 0;
- estimator->inter_ping_delay = 100.0; // start at 100ms
- estimator->stable_estimate_count = 0;
-}
-
-bool grpc_bdp_estimator_get_estimate(const grpc_bdp_estimator *estimator,
- int64_t *estimate) {
- *estimate = estimator->estimate;
- return true;
-}
-
-bool grpc_bdp_estimator_get_bw(const grpc_bdp_estimator *estimator,
- double *bw) {
- *bw = estimator->bw_est;
- return true;
-}
-
-void grpc_bdp_estimator_add_incoming_bytes(grpc_bdp_estimator *estimator,
- int64_t num_bytes) {
- estimator->accumulator += num_bytes;
-}
-
-bool grpc_bdp_estimator_need_ping(grpc_exec_ctx *exec_ctx,
- const grpc_bdp_estimator *estimator) {
- switch (estimator->ping_state) {
- case GRPC_BDP_PING_UNSCHEDULED:
- return grpc_exec_ctx_now(exec_ctx) >= estimator->next_ping_scheduled;
- case GRPC_BDP_PING_SCHEDULED:
- return false;
- case GRPC_BDP_PING_STARTED:
- return false;
- }
- GPR_UNREACHABLE_CODE(return false);
-}
+namespace grpc_core {
-void grpc_bdp_estimator_schedule_ping(grpc_bdp_estimator *estimator) {
- if (GRPC_TRACER_ON(grpc_bdp_estimator_trace)) {
- gpr_log(GPR_DEBUG, "bdp[%s]:sched acc=%" PRId64 " est=%" PRId64,
- estimator->name, estimator->accumulator, estimator->estimate);
- }
- GPR_ASSERT(estimator->ping_state == GRPC_BDP_PING_UNSCHEDULED);
- estimator->ping_state = GRPC_BDP_PING_SCHEDULED;
- estimator->accumulator = 0;
-}
+BdpEstimator::BdpEstimator(const char *name)
+ : ping_state_(PingState::UNSCHEDULED),
+ accumulator_(0),
+ estimate_(65536),
+ ping_start_time_(gpr_time_0(GPR_CLOCK_MONOTONIC)),
+ next_ping_scheduled_(0),
+ inter_ping_delay_(100.0), // start at 100ms
+ stable_estimate_count_(0),
+ bw_est_(0),
+ name_(name) {}
-void grpc_bdp_estimator_start_ping(grpc_bdp_estimator *estimator) {
- if (GRPC_TRACER_ON(grpc_bdp_estimator_trace)) {
- gpr_log(GPR_DEBUG, "bdp[%s]:start acc=%" PRId64 " est=%" PRId64,
- estimator->name, estimator->accumulator, estimator->estimate);
- }
- GPR_ASSERT(estimator->ping_state == GRPC_BDP_PING_SCHEDULED);
- estimator->ping_state = GRPC_BDP_PING_STARTED;
- estimator->accumulator = 0;
- estimator->ping_start_time = gpr_now(GPR_CLOCK_MONOTONIC);
-}
-
-void grpc_bdp_estimator_complete_ping(grpc_exec_ctx *exec_ctx,
- grpc_bdp_estimator *estimator) {
+void BdpEstimator::CompletePing(grpc_exec_ctx *exec_ctx) {
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
- gpr_timespec dt_ts = gpr_time_sub(now, estimator->ping_start_time);
+ gpr_timespec dt_ts = gpr_time_sub(now, ping_start_time_);
double dt = (double)dt_ts.tv_sec + 1e-9 * (double)dt_ts.tv_nsec;
- double bw = dt > 0 ? ((double)estimator->accumulator / dt) : 0;
- int start_inter_ping_delay = estimator->inter_ping_delay;
+ double bw = dt > 0 ? ((double)accumulator_ / dt) : 0;
+ int start_inter_ping_delay = inter_ping_delay_;
if (GRPC_TRACER_ON(grpc_bdp_estimator_trace)) {
gpr_log(GPR_DEBUG, "bdp[%s]:complete acc=%" PRId64 " est=%" PRId64
" dt=%lf bw=%lfMbs bw_est=%lfMbs",
- estimator->name, estimator->accumulator, estimator->estimate, dt,
- bw / 125000.0, estimator->bw_est / 125000.0);
+ name_, accumulator_, estimate_, dt, bw / 125000.0,
+ bw_est_ / 125000.0);
}
- GPR_ASSERT(estimator->ping_state == GRPC_BDP_PING_STARTED);
- if (estimator->accumulator > 2 * estimator->estimate / 3 &&
- bw > estimator->bw_est) {
- estimator->estimate =
- GPR_MAX(estimator->accumulator, estimator->estimate * 2);
- estimator->bw_est = bw;
+ GPR_ASSERT(ping_state_ == PingState::STARTED);
+ if (accumulator_ > 2 * estimate_ / 3 && bw > bw_est_) {
+ estimate_ = GPR_MAX(accumulator_, estimate_ * 2);
+ bw_est_ = bw;
if (GRPC_TRACER_ON(grpc_bdp_estimator_trace)) {
- gpr_log(GPR_DEBUG, "bdp[%s]: estimate increased to %" PRId64,
- estimator->name, estimator->estimate);
+ gpr_log(GPR_DEBUG, "bdp[%s]: estimate increased to %" PRId64, name_,
+ estimate_);
}
- estimator->inter_ping_delay /= 2; // if the ping estimate changes,
- // exponentially get faster at probing
- } else if (estimator->inter_ping_delay < 10000) {
- estimator->stable_estimate_count++;
- if (estimator->stable_estimate_count >= 2) {
- estimator->inter_ping_delay +=
+ inter_ping_delay_ /= 2; // if the ping estimate changes,
+ // exponentially get faster at probing
+ } else if (inter_ping_delay_ < 10000) {
+ stable_estimate_count_++;
+ if (stable_estimate_count_ >= 2) {
+ inter_ping_delay_ +=
100 +
(int)(rand() * 100.0 / RAND_MAX); // if the ping estimate is steady,
// slowly ramp down the probe time
}
}
- if (start_inter_ping_delay != estimator->inter_ping_delay) {
- estimator->stable_estimate_count = 0;
+ if (start_inter_ping_delay != inter_ping_delay_) {
+ stable_estimate_count_ = 0;
if (GRPC_TRACER_ON(grpc_bdp_estimator_trace)) {
- gpr_log(GPR_DEBUG, "bdp[%s]:update_inter_time to %dms", estimator->name,
- estimator->inter_ping_delay);
+ gpr_log(GPR_DEBUG, "bdp[%s]:update_inter_time to %dms", name_,
+ inter_ping_delay_);
}
}
- estimator->ping_state = GRPC_BDP_PING_UNSCHEDULED;
- estimator->accumulator = 0;
- estimator->next_ping_scheduled =
- grpc_exec_ctx_now(exec_ctx) + estimator->inter_ping_delay;
+ ping_state_ = PingState::UNSCHEDULED;
+ accumulator_ = 0;
+ next_ping_scheduled_ = grpc_exec_ctx_now(exec_ctx) + inter_ping_delay_;
}
+
+} // namespace grpc_core
diff --git a/src/core/lib/transport/bdp_estimator.h b/src/core/lib/transport/bdp_estimator.h
index a9d986782c..6447f2d62c 100644
--- a/src/core/lib/transport/bdp_estimator.h
+++ b/src/core/lib/transport/bdp_estimator.h
@@ -19,67 +19,97 @@
#ifndef GRPC_CORE_LIB_TRANSPORT_BDP_ESTIMATOR_H
#define GRPC_CORE_LIB_TRANSPORT_BDP_ESTIMATOR_H
-#include <grpc/support/time.h>
+#include <grpc/support/port_platform.h>
+
+#include <inttypes.h>
#include <stdbool.h>
#include <stdint.h>
+
+#include <grpc/support/log.h>
+#include <grpc/support/time.h>
+
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/exec_ctx.h"
-#define GRPC_BDP_SAMPLES 16
-#define GRPC_BDP_MIN_SAMPLES_FOR_ESTIMATE 3
+extern grpc_tracer_flag grpc_bdp_estimator_trace;
-#ifdef __cplusplus
-extern "C" {
-#endif
+namespace grpc_core {
-extern grpc_tracer_flag grpc_bdp_estimator_trace;
+class BdpEstimator {
+ public:
+ explicit BdpEstimator(const char *name);
+ ~BdpEstimator() {}
-typedef enum {
- GRPC_BDP_PING_UNSCHEDULED,
- GRPC_BDP_PING_SCHEDULED,
- GRPC_BDP_PING_STARTED
-} grpc_bdp_estimator_ping_state;
+ // Returns true if a reasonable estimate could be obtained
+ bool EstimateBdp(int64_t *estimate_out) const {
+ *estimate_out = estimate_;
+ return true;
+ }
+ bool EstimateBandwidth(double *bw_out) const {
+ *bw_out = bw_est_;
+ return true;
+ }
-typedef struct grpc_bdp_estimator {
- grpc_bdp_estimator_ping_state ping_state;
- int64_t accumulator;
- int64_t estimate;
+ void AddIncomingBytes(int64_t num_bytes) { accumulator_ += num_bytes; }
+
+ // Returns true if the user should schedule a ping
+ bool NeedPing(grpc_exec_ctx *exec_ctx) const {
+ switch (ping_state_) {
+ case PingState::UNSCHEDULED:
+ return grpc_exec_ctx_now(exec_ctx) >= next_ping_scheduled_;
+ case PingState::SCHEDULED:
+ case PingState::STARTED:
+ return false;
+ }
+ GPR_UNREACHABLE_CODE(return false);
+ }
+
+ // Schedule a ping: call in response to receiving a true from
+ // grpc_bdp_estimator_add_incoming_bytes once a ping has been scheduled by a
+ // transport (but not necessarily started)
+ void SchedulePing() {
+ if (GRPC_TRACER_ON(grpc_bdp_estimator_trace)) {
+ gpr_log(GPR_DEBUG, "bdp[%s]:sched acc=%" PRId64 " est=%" PRId64, name_,
+ accumulator_, estimate_);
+ }
+ GPR_ASSERT(ping_state_ == PingState::UNSCHEDULED);
+ ping_state_ = PingState::SCHEDULED;
+ accumulator_ = 0;
+ }
+
+ // Start a ping: call after calling grpc_bdp_estimator_schedule_ping and
+ // once
+ // the ping is on the wire
+ void StartPing() {
+ if (GRPC_TRACER_ON(grpc_bdp_estimator_trace)) {
+ gpr_log(GPR_DEBUG, "bdp[%s]:start acc=%" PRId64 " est=%" PRId64, name_,
+ accumulator_, estimate_);
+ }
+ GPR_ASSERT(ping_state_ == PingState::SCHEDULED);
+ ping_state_ = PingState::STARTED;
+ accumulator_ = 0;
+ ping_start_time_ = gpr_now(GPR_CLOCK_MONOTONIC);
+ }
+
+ // Completes a previously started ping
+ void CompletePing(grpc_exec_ctx *exec_ctx);
+
+ private:
+ enum class PingState { UNSCHEDULED, SCHEDULED, STARTED };
+
+ PingState ping_state_;
+ int64_t accumulator_;
+ int64_t estimate_;
// when was the current ping started?
- gpr_timespec ping_start_time;
+ gpr_timespec ping_start_time_;
// when should the next ping start?
- grpc_millis next_ping_scheduled;
- int inter_ping_delay;
- int stable_estimate_count;
- double bw_est;
- const char *name;
-} grpc_bdp_estimator;
-
-void grpc_bdp_estimator_init(grpc_bdp_estimator *estimator, const char *name);
-
-// Returns true if a reasonable estimate could be obtained
-bool grpc_bdp_estimator_get_estimate(const grpc_bdp_estimator *estimator,
- int64_t *estimate);
-// Tracks new bytes read.
-bool grpc_bdp_estimator_get_bw(const grpc_bdp_estimator *estimator, double *bw);
-// Returns true if the user should schedule a ping
-void grpc_bdp_estimator_add_incoming_bytes(grpc_bdp_estimator *estimator,
- int64_t num_bytes);
-// Returns true if the user should schedule a ping
-bool grpc_bdp_estimator_need_ping(grpc_exec_ctx *exec_ctx,
- const grpc_bdp_estimator *estimator);
-// Schedule a ping: call in response to receiving a true from
-// grpc_bdp_estimator_add_incoming_bytes once a ping has been scheduled by a
-// transport (but not necessarily started)
-void grpc_bdp_estimator_schedule_ping(grpc_bdp_estimator *estimator);
-// Start a ping: call after calling grpc_bdp_estimator_schedule_ping and once
-// the ping is on the wire
-void grpc_bdp_estimator_start_ping(grpc_bdp_estimator *estimator);
-// Completes a previously started ping
-void grpc_bdp_estimator_complete_ping(grpc_exec_ctx *exec_ctx,
- grpc_bdp_estimator *estimator);
-
-#ifdef __cplusplus
-}
-#endif
+ grpc_millis next_ping_scheduled_;
+ int inter_ping_delay_;
+ int stable_estimate_count_;
+ double bw_est_;
+ const char *name_;
+};
+
+} // namespace grpc_core
#endif /* GRPC_CORE_LIB_TRANSPORT_BDP_ESTIMATOR_H */