aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--BUILD5
-rw-r--r--bazel/grpc_build_system.bzl4
-rw-r--r--include/grpc++/impl/codegen/proto_utils.h33
-rw-r--r--include/grpc++/server_builder.h5
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc149
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc215
-rw-r--r--src/core/lib/iomgr/exec_ctx.cc70
-rw-r--r--src/core/lib/iomgr/exec_ctx.h2
-rw-r--r--src/core/lib/iomgr/timer_generic.cc28
-rw-r--r--src/core/lib/iomgr/timer_manager.cc5
-rw-r--r--test/cpp/codegen/proto_utils_test.cc110
11 files changed, 403 insertions, 223 deletions
diff --git a/BUILD b/BUILD
index a32aa8d8dd..6e8191af16 100644
--- a/BUILD
+++ b/BUILD
@@ -33,6 +33,11 @@ load(
"grpc_generate_one_off_targets",
)
+config_setting(
+ name = "grpc_no_ares",
+ values = {"define": "grpc_no_ares=true"},
+)
+
# This should be updated along with build.yaml
g_stands_for = "generous"
diff --git a/bazel/grpc_build_system.bzl b/bazel/grpc_build_system.bzl
index 25d7607e0f..8a8ec9bc84 100644
--- a/bazel/grpc_build_system.bzl
+++ b/bazel/grpc_build_system.bzl
@@ -33,6 +33,10 @@ def grpc_cc_library(name, srcs = [], public_hdrs = [], hdrs = [],
native.cc_library(
name = name,
srcs = srcs,
+ defines = select({
+ "//:grpc_no_ares": ["GRPC_ARES=0"],
+ "//conditions:default": [],
+ }),
hdrs = hdrs + public_hdrs,
deps = deps + ["//external:" + dep for dep in external_deps],
copts = copts,
diff --git a/include/grpc++/impl/codegen/proto_utils.h b/include/grpc++/impl/codegen/proto_utils.h
index 0f7e115c9a..b7636034d4 100644
--- a/include/grpc++/impl/codegen/proto_utils.h
+++ b/include/grpc++/impl/codegen/proto_utils.h
@@ -41,8 +41,11 @@ const int kGrpcBufferWriterMaxBufferLength = 1024 * 1024;
class GrpcBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream {
public:
- explicit GrpcBufferWriter(grpc_byte_buffer** bp, int block_size)
- : block_size_(block_size), byte_count_(0), have_backup_(false) {
+ GrpcBufferWriter(grpc_byte_buffer** bp, int block_size, int total_size)
+ : block_size_(block_size),
+ total_size_(total_size),
+ byte_count_(0),
+ have_backup_(false) {
*bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(NULL, 0);
slice_buffer_ = &(*bp)->data.raw.slice_buffer;
}
@@ -54,11 +57,20 @@ class GrpcBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream {
}
bool Next(void** data, int* size) override {
+ // Protobuf should not ask for more memory than total_size_.
+ GPR_CODEGEN_ASSERT(byte_count_ < total_size_);
if (have_backup_) {
slice_ = backup_slice_;
have_backup_ = false;
} else {
- slice_ = g_core_codegen_interface->grpc_slice_malloc(block_size_);
+ // When less than a whole block is needed, only allocate that much.
+ // But make sure the allocated slice is not inlined.
+ size_t remain = total_size_ - byte_count_ > block_size_
+ ? block_size_
+ : total_size_ - byte_count_;
+ slice_ = g_core_codegen_interface->grpc_slice_malloc(
+ remain > GRPC_SLICE_INLINED_SIZE ? remain
+ : GRPC_SLICE_INLINED_SIZE + 1);
}
*data = GRPC_SLICE_START_PTR(slice_);
// On win x64, int is only 32bit
@@ -70,7 +82,7 @@ class GrpcBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream {
void BackUp(int count) override {
g_core_codegen_interface->grpc_slice_buffer_pop(slice_buffer_);
- if (count == block_size_) {
+ if ((size_t)count == GRPC_SLICE_LENGTH(slice_)) {
backup_slice_ = slice_;
} else {
backup_slice_ = g_core_codegen_interface->grpc_slice_split_tail(
@@ -90,6 +102,7 @@ class GrpcBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream {
protected:
friend class GrpcBufferWriterPeer;
const int block_size_;
+ const int total_size_;
int64_t byte_count_;
grpc_slice_buffer* slice_buffer_;
bool have_backup_;
@@ -175,20 +188,20 @@ Status GenericSerialize(const grpc::protobuf::Message& msg,
"BufferWriter must be a subclass of io::ZeroCopyOutputStream");
*own_buffer = true;
int byte_size = msg.ByteSize();
- if (byte_size <= kGrpcBufferWriterMaxBufferLength) {
+ if ((size_t)byte_size <= GRPC_SLICE_INLINED_SIZE) {
grpc_slice slice = g_core_codegen_interface->grpc_slice_malloc(byte_size);
GPR_CODEGEN_ASSERT(
GRPC_SLICE_END_PTR(slice) ==
msg.SerializeWithCachedSizesToArray(GRPC_SLICE_START_PTR(slice)));
*bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(&slice, 1);
g_core_codegen_interface->grpc_slice_unref(slice);
+
return g_core_codegen_interface->ok();
- } else {
- BufferWriter writer(bp, kGrpcBufferWriterMaxBufferLength);
- return msg.SerializeToZeroCopyStream(&writer)
- ? g_core_codegen_interface->ok()
- : Status(StatusCode::INTERNAL, "Failed to serialize message");
}
+ BufferWriter writer(bp, kGrpcBufferWriterMaxBufferLength, byte_size);
+ return msg.SerializeToZeroCopyStream(&writer)
+ ? g_core_codegen_interface->ok()
+ : Status(StatusCode::INTERNAL, "Failed to serialize message");
}
// BufferReader must be a subclass of io::ZeroCopyInputStream.
diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h
index bf842baf6f..0888bef0d9 100644
--- a/include/grpc++/server_builder.h
+++ b/include/grpc++/server_builder.h
@@ -202,10 +202,7 @@ class ServerBuilder {
struct SyncServerSettings {
SyncServerSettings()
- : num_cqs(GPR_MAX(1, gpr_cpu_num_cores())),
- min_pollers(1),
- max_pollers(2),
- cq_timeout_msec(10000) {}
+ : num_cqs(1), min_pollers(1), max_pollers(2), cq_timeout_msec(10000) {}
/// Number of server completion queues to create to listen to incoming RPCs.
int num_cqs;
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index 73a198c62b..6675cacc41 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -175,6 +175,10 @@ typedef struct wrapped_rr_closure_arg {
/* The RR instance related to the closure */
grpc_lb_policy* rr_policy;
+ /* The grpclb instance that created the wrapping. This instance is not owned,
+ * reference counts are untouched. It's used only for logging purposes. */
+ grpc_lb_policy* glb_policy;
+
/* heap memory to be freed upon closure execution. */
void* free_when_done;
} wrapped_rr_closure_arg;
@@ -199,10 +203,11 @@ static void wrapped_rr_closure(grpc_exec_ctx* exec_ctx, void* arg,
wc_arg->lb_token_mdelem_storage,
GRPC_MDELEM_REF(wc_arg->lb_token));
} else {
- gpr_log(GPR_ERROR,
- "No LB token for connected subchannel pick %p (from RR "
- "instance %p).",
- (void*)*wc_arg->target, (void*)wc_arg->rr_policy);
+ gpr_log(
+ GPR_ERROR,
+ "[grpclb %p] No LB token for connected subchannel pick %p (from RR "
+ "instance %p).",
+ wc_arg->glb_policy, *wc_arg->target, wc_arg->rr_policy);
abort();
}
// Pass on client stats via context. Passes ownership of the reference.
@@ -213,7 +218,8 @@ static void wrapped_rr_closure(grpc_exec_ctx* exec_ctx, void* arg,
grpc_grpclb_client_stats_unref(wc_arg->client_stats);
}
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
- gpr_log(GPR_INFO, "Unreffing RR %p", (void*)wc_arg->rr_policy);
+ gpr_log(GPR_INFO, "[grpclb %p] Unreffing RR %p", wc_arg->glb_policy,
+ wc_arg->rr_policy);
}
GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "wrapped_rr_closure");
}
@@ -619,8 +625,10 @@ static void update_lb_connectivity_status_locked(
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(
- GPR_INFO, "Setting grpclb's state to %s from new RR policy %p state.",
- grpc_connectivity_state_name(rr_state), (void*)glb_policy->rr_policy);
+ GPR_INFO,
+ "[grpclb %p] Setting grpclb's state to %s from new RR policy %p state.",
+ glb_policy, grpc_connectivity_state_name(rr_state),
+ glb_policy->rr_policy);
}
grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, rr_state,
rr_state_error,
@@ -647,8 +655,8 @@ static bool pick_from_internal_rr_locked(
if (server->drop) {
// Not using the RR policy, so unref it.
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
- gpr_log(GPR_INFO, "Unreffing RR for drop (0x%" PRIxPTR ")",
- (intptr_t)wc_arg->rr_policy);
+ gpr_log(GPR_INFO, "[grpclb %p] Unreffing RR %p for drop", glb_policy,
+ wc_arg->rr_policy);
}
GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync");
// Update client load reporting stats to indicate the number of
@@ -656,6 +664,7 @@ static bool pick_from_internal_rr_locked(
// the client_load_reporting filter, because we do not create a
// subchannel call (and therefore no client_load_reporting filter)
// for dropped calls.
+ GPR_ASSERT(wc_arg->client_stats != NULL);
grpc_grpclb_client_stats_add_call_dropped_locked(
server->load_balance_token, wc_arg->client_stats);
grpc_grpclb_client_stats_unref(wc_arg->client_stats);
@@ -676,8 +685,8 @@ static bool pick_from_internal_rr_locked(
if (pick_done) {
/* synchronous grpc_lb_policy_pick call. Unref the RR policy. */
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
- gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")",
- (intptr_t)wc_arg->rr_policy);
+ gpr_log(GPR_INFO, "[grpclb %p] Unreffing RR %p", glb_policy,
+ wc_arg->rr_policy);
}
GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync");
/* add the load reporting initial metadata */
@@ -748,12 +757,13 @@ static void create_rr_locked(grpc_exec_ctx* exec_ctx, glb_lb_policy* glb_policy,
grpc_lb_policy_create(exec_ctx, "round_robin", args);
if (new_rr_policy == NULL) {
gpr_log(GPR_ERROR,
- "Failure creating a RoundRobin policy for serverlist update with "
- "%lu entries. The previous RR instance (%p), if any, will continue "
- "to be used. Future updates from the LB will attempt to create new "
+ "[grpclb %p] Failure creating a RoundRobin policy for serverlist "
+ "update with %" PRIuPTR
+ " entries. The previous RR instance (%p), if any, will continue to "
+ "be used. Future updates from the LB will attempt to create new "
"instances.",
- (unsigned long)glb_policy->serverlist->num_servers,
- (void*)glb_policy->rr_policy);
+ glb_policy, glb_policy->serverlist->num_servers,
+ glb_policy->rr_policy);
return;
}
glb_policy->rr_policy = new_rr_policy;
@@ -797,8 +807,9 @@ static void create_rr_locked(grpc_exec_ctx* exec_ctx, glb_lb_policy* glb_policy,
pp->wrapped_on_complete_arg.client_stats =
grpc_grpclb_client_stats_ref(glb_policy->client_stats);
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
- gpr_log(GPR_INFO, "Pending pick about to (async) PICK from %p",
- (void*)glb_policy->rr_policy);
+ gpr_log(GPR_INFO,
+ "[grpclb %p] Pending pick about to (async) PICK from RR %p",
+ glb_policy, glb_policy->rr_policy);
}
pick_from_internal_rr_locked(exec_ctx, glb_policy, &pp->pick_args,
true /* force_async */, pp->target,
@@ -811,8 +822,8 @@ static void create_rr_locked(grpc_exec_ctx* exec_ctx, glb_lb_policy* glb_policy,
GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping");
pping->wrapped_notify_arg.rr_policy = glb_policy->rr_policy;
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
- gpr_log(GPR_INFO, "Pending ping about to PING from 0x%" PRIxPTR "",
- (intptr_t)glb_policy->rr_policy);
+ gpr_log(GPR_INFO, "[grpclb %p] Pending ping about to PING from RR %p",
+ glb_policy, glb_policy->rr_policy);
}
grpc_lb_policy_ping_one_locked(exec_ctx, glb_policy->rr_policy,
&pping->wrapped_notify_arg.wrapper_closure);
@@ -827,15 +838,15 @@ static void rr_handover_locked(grpc_exec_ctx* exec_ctx,
GPR_ASSERT(args != NULL);
if (glb_policy->rr_policy != NULL) {
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
- gpr_log(GPR_DEBUG, "Updating Round Robin policy (%p)",
- (void*)glb_policy->rr_policy);
+ gpr_log(GPR_DEBUG, "[grpclb %p] Updating RR policy %p", glb_policy,
+ glb_policy->rr_policy);
}
grpc_lb_policy_update_locked(exec_ctx, glb_policy->rr_policy, args);
} else {
create_rr_locked(exec_ctx, glb_policy, args);
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
- gpr_log(GPR_DEBUG, "Created new Round Robin policy (%p)",
- (void*)glb_policy->rr_policy);
+ gpr_log(GPR_DEBUG, "[grpclb %p] Created new RR policy %p", glb_policy,
+ glb_policy->rr_policy);
}
}
lb_policy_args_destroy(exec_ctx, args);
@@ -1177,8 +1188,8 @@ static int glb_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
if (rr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO,
- "grpclb %p NOT picking from from RR %p: RR conn state=%s",
- (void*)glb_policy, (void*)glb_policy->rr_policy,
+ "[grpclb %p] NOT picking from from RR %p: RR conn state=%s",
+ glb_policy, glb_policy->rr_policy,
grpc_connectivity_state_name(rr_connectivity_state));
}
add_pending_pick(&glb_policy->pending_picks, pick_args, target, context,
@@ -1186,8 +1197,8 @@ static int glb_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
pick_done = false;
} else { // RR not in shutdown
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
- gpr_log(GPR_INFO, "grpclb %p about to PICK from RR %p",
- (void*)glb_policy, (void*)glb_policy->rr_policy);
+ gpr_log(GPR_INFO, "[grpclb %p] about to PICK from RR %p", glb_policy,
+ glb_policy->rr_policy);
}
GRPC_LB_POLICY_REF(glb_policy->rr_policy, "glb_pick");
wrapped_rr_closure_arg* wc_arg =
@@ -1204,6 +1215,7 @@ static int glb_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
wc_arg->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage;
wc_arg->initial_metadata = pick_args->initial_metadata;
wc_arg->free_when_done = wc_arg;
+ wc_arg->glb_policy = pol;
pick_done =
pick_from_internal_rr_locked(exec_ctx, glb_policy, pick_args,
false /* force_async */, target, wc_arg);
@@ -1211,9 +1223,8 @@ static int glb_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
} else { // glb_policy->rr_policy == NULL
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_DEBUG,
- "No RR policy in grpclb instance %p. Adding to grpclb's pending "
- "picks",
- (void*)(glb_policy));
+ "[grpclb %p] No RR policy. Adding to grpclb's pending picks",
+ glb_policy);
}
add_pending_pick(&glb_policy->pending_picks, pick_args, target, context,
on_complete);
@@ -1262,8 +1273,7 @@ static void lb_call_on_retry_timer_locked(grpc_exec_ctx* exec_ctx, void* arg,
if (!glb_policy->shutting_down && glb_policy->lb_call == NULL &&
error == GRPC_ERROR_NONE) {
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
- gpr_log(GPR_INFO, "Restaring call to LB server (grpclb %p)",
- (void*)glb_policy);
+ gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server", glb_policy);
}
query_for_backends_locked(exec_ctx, glb_policy);
}
@@ -1284,14 +1294,16 @@ static void maybe_restart_lb_call(grpc_exec_ctx* exec_ctx,
grpc_backoff_step(exec_ctx, &glb_policy->lb_call_backoff_state)
.next_attempt_start_time;
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
- gpr_log(GPR_DEBUG, "Connection to LB server lost (grpclb: %p)...",
- (void*)glb_policy);
+ gpr_log(GPR_DEBUG, "[grpclb %p] Connection to LB server lost...",
+ glb_policy);
grpc_millis timeout = next_try - grpc_exec_ctx_now(exec_ctx);
if (timeout > 0) {
- gpr_log(GPR_DEBUG, "... retry_timer_active in %" PRIdPTR "ms.",
- timeout);
+ gpr_log(GPR_DEBUG,
+ "[grpclb %p] ... retry_timer_active in %" PRIuPTR "ms.",
+ glb_policy, timeout);
} else {
- gpr_log(GPR_DEBUG, "... retry_timer_active immediately.");
+ gpr_log(GPR_DEBUG, "[grpclb %p] ... retry_timer_active immediately.",
+ glb_policy);
}
}
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_retry_timer");
@@ -1392,7 +1404,7 @@ static void send_client_load_report_locked(grpc_exec_ctx* exec_ctx, void* arg,
exec_ctx, glb_policy->lb_call, &op, 1,
&glb_policy->client_load_report_closure);
if (call_error != GRPC_CALL_OK) {
- gpr_log(GPR_ERROR, "call_error=%d", call_error);
+ gpr_log(GPR_ERROR, "[grpclb %p] call_error=%d", glb_policy, call_error);
GPR_ASSERT(GRPC_CALL_OK == call_error);
}
}
@@ -1486,9 +1498,8 @@ static void query_for_backends_locked(grpc_exec_ctx* exec_ctx,
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO,
- "Query for backends (grpclb: %p, lb_channel: %p, lb_call: %p)",
- (void*)glb_policy, (void*)glb_policy->lb_channel,
- (void*)glb_policy->lb_call);
+ "[grpclb %p] Query for backends (lb_channel: %p, lb_call: %p)",
+ glb_policy, glb_policy->lb_channel, glb_policy->lb_call);
}
GPR_ASSERT(glb_policy->lb_call != NULL);
@@ -1578,9 +1589,9 @@ static void lb_on_response_received_locked(grpc_exec_ctx* exec_ctx, void* arg,
&response->client_stats_report_interval));
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO,
- "received initial LB response message; "
+ "[grpclb %p] Received initial LB response message; "
"client load reporting interval = %" PRIdPTR " milliseconds",
- glb_policy->client_stats_report_interval);
+ glb_policy, glb_policy->client_stats_report_interval);
}
/* take a weak ref (won't prevent calling of \a glb_shutdown() if the
* strong ref count goes to zero) to be unref'd in
@@ -1590,8 +1601,9 @@ static void lb_on_response_received_locked(grpc_exec_ctx* exec_ctx, void* arg,
schedule_next_client_load_report(exec_ctx, glb_policy);
} else if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO,
- "received initial LB response message; "
- "client load reporting NOT enabled");
+ "[grpclb %p] Received initial LB response message; client load "
+ "reporting NOT enabled",
+ glb_policy);
}
grpc_grpclb_initial_response_destroy(response);
glb_policy->seen_initial_response = true;
@@ -1601,14 +1613,16 @@ static void lb_on_response_received_locked(grpc_exec_ctx* exec_ctx, void* arg,
if (serverlist != NULL) {
GPR_ASSERT(glb_policy->lb_call != NULL);
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
- gpr_log(GPR_INFO, "Serverlist with %lu servers received",
- (unsigned long)serverlist->num_servers);
+ gpr_log(GPR_INFO,
+ "[grpclb %p] Serverlist with %" PRIuPTR " servers received",
+ glb_policy, serverlist->num_servers);
for (size_t i = 0; i < serverlist->num_servers; ++i) {
grpc_resolved_address addr;
parse_server(serverlist->servers[i], &addr);
char* ipport;
grpc_sockaddr_to_string(&ipport, &addr, false);
- gpr_log(GPR_INFO, "Serverlist[%lu]: %s", (unsigned long)i, ipport);
+ gpr_log(GPR_INFO, "[grpclb %p] Serverlist[%" PRIuPTR "]: %s",
+ glb_policy, i, ipport);
gpr_free(ipport);
}
}
@@ -1618,7 +1632,9 @@ static void lb_on_response_received_locked(grpc_exec_ctx* exec_ctx, void* arg,
serverlist)) {
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO,
- "Incoming server list identical to current, ignoring.");
+ "[grpclb %p] Incoming server list identical to current, "
+ "ignoring.",
+ glb_policy);
}
grpc_grpclb_destroy_serverlist(serverlist);
} else { /* new serverlist */
@@ -1644,12 +1660,16 @@ static void lb_on_response_received_locked(grpc_exec_ctx* exec_ctx, void* arg,
}
} else {
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
- gpr_log(GPR_INFO, "Received empty server list, ignoring.");
+ gpr_log(GPR_INFO,
+ "[grpclb %p] Received empty server list, ignoring.",
+ glb_policy);
}
grpc_grpclb_destroy_serverlist(serverlist);
}
} else { /* serverlist == NULL */
- gpr_log(GPR_ERROR, "Invalid LB response received: '%s'. Ignoring.",
+ gpr_log(GPR_ERROR,
+ "[grpclb %p] Invalid LB response received: '%s'. Ignoring.",
+ glb_policy,
grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX));
}
}
@@ -1689,8 +1709,8 @@ static void lb_on_fallback_timer_locked(grpc_exec_ctx* exec_ctx, void* arg,
if (!glb_policy->shutting_down && error == GRPC_ERROR_NONE) {
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO,
- "Falling back to use backends from resolver (grpclb %p)",
- (void*)glb_policy);
+ "[grpclb %p] Falling back to use backends from resolver",
+ glb_policy);
}
GPR_ASSERT(glb_policy->fallback_backend_addresses != NULL);
rr_handover_locked(exec_ctx, glb_policy);
@@ -1708,10 +1728,10 @@ static void lb_on_server_status_received_locked(grpc_exec_ctx* exec_ctx,
char* status_details =
grpc_slice_to_c_string(glb_policy->lb_call_status_details);
gpr_log(GPR_INFO,
- "Status from LB server received. Status = %d, Details = '%s', "
- "(call: %p), error %p",
- glb_policy->lb_call_status, status_details,
- (void*)glb_policy->lb_call, (void*)error);
+ "[grpclb %p] Status from LB server received. Status = %d, Details "
+ "= '%s', (call: %p), error '%s'",
+ glb_policy, glb_policy->lb_call_status, status_details,
+ glb_policy->lb_call, grpc_error_string(error));
gpr_free(status_details);
}
/* We need to perform cleanups no matter what. */
@@ -1752,10 +1772,10 @@ static void glb_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
"glb_update_missing");
} else {
// otherwise, keep using the current LB channel (ignore this update).
- gpr_log(GPR_ERROR,
- "No valid LB addresses channel arg for grpclb %p update, "
- "ignoring.",
- (void*)glb_policy);
+ gpr_log(
+ GPR_ERROR,
+ "[grpclb %p] No valid LB addresses channel arg in update, ignoring.",
+ glb_policy);
}
return;
}
@@ -1887,8 +1907,9 @@ static grpc_lb_policy* glb_create(grpc_exec_ctx* exec_ctx,
glb_policy->server_name =
gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path);
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
- gpr_log(GPR_INFO, "Will use '%s' as the server name for LB request.",
- glb_policy->server_name);
+ gpr_log(GPR_INFO,
+ "[grpclb %p] Will use '%s' as the server name for LB request.",
+ glb_policy, glb_policy->server_name);
}
grpc_uri_destroy(uri);
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index 125a4186aa..c79ee5687d 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -440,128 +440,111 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg,
// for a subchannel in p->latest_pending_subchannel_list. The
// goal here is to find a subchannel from the update that we can
// select in place of the current one.
- if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
- sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
- grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd);
- }
- while (true) {
- switch (sd->curr_connectivity_state) {
- case GRPC_CHANNEL_READY: {
- // Case 2. Promote p->latest_pending_subchannel_list to
- // p->subchannel_list.
- if (sd->subchannel_list == p->latest_pending_subchannel_list) {
- GPR_ASSERT(p->subchannel_list != NULL);
- grpc_lb_subchannel_list_shutdown_and_unref(
- exec_ctx, p->subchannel_list, "finish_update");
- p->subchannel_list = p->latest_pending_subchannel_list;
- p->latest_pending_subchannel_list = NULL;
- }
- // Cases 1 and 2.
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_READY, GRPC_ERROR_NONE,
- "connecting_ready");
- sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
- grpc_subchannel_get_connected_subchannel(sd->subchannel),
- "connected");
- p->selected = sd;
+ switch (sd->curr_connectivity_state) {
+ case GRPC_CHANNEL_READY: {
+ // Case 2. Promote p->latest_pending_subchannel_list to
+ // p->subchannel_list.
+ if (sd->subchannel_list == p->latest_pending_subchannel_list) {
+ GPR_ASSERT(p->subchannel_list != NULL);
+ grpc_lb_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list,
+ "finish_update");
+ p->subchannel_list = p->latest_pending_subchannel_list;
+ p->latest_pending_subchannel_list = NULL;
+ }
+ // Cases 1 and 2.
+ grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
+ GRPC_CHANNEL_READY, GRPC_ERROR_NONE,
+ "connecting_ready");
+ sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
+ grpc_subchannel_get_connected_subchannel(sd->subchannel),
+ "connected");
+ p->selected = sd;
+ if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+ gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", (void*)p,
+ (void*)sd->subchannel);
+ }
+ // Drop all other subchannels, since we are now connected.
+ destroy_unselected_subchannels_locked(exec_ctx, p);
+ // Update any calls that were waiting for a pick.
+ pending_pick* pp;
+ while ((pp = p->pending_picks)) {
+ p->pending_picks = pp->next;
+ *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(
+ p->selected->connected_subchannel, "picked");
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
- gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", (void*)p,
- (void*)sd->subchannel);
- }
- // Drop all other subchannels, since we are now connected.
- destroy_unselected_subchannels_locked(exec_ctx, p);
- // Update any calls that were waiting for a pick.
- pending_pick* pp;
- while ((pp = p->pending_picks)) {
- p->pending_picks = pp->next;
- *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(
- p->selected->connected_subchannel, "picked");
- if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
- gpr_log(GPR_INFO,
- "Servicing pending pick with selected subchannel %p",
- (void*)p->selected);
- }
- GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
- gpr_free(pp);
+ gpr_log(GPR_INFO,
+ "Servicing pending pick with selected subchannel %p",
+ (void*)p->selected);
}
- // Renew notification.
- grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
- return;
+ GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
+ gpr_free(pp);
}
- case GRPC_CHANNEL_TRANSIENT_FAILURE: {
- do {
- sd->subchannel_list->checking_subchannel =
- (sd->subchannel_list->checking_subchannel + 1) %
- sd->subchannel_list->num_subchannels;
- sd = &sd->subchannel_list
- ->subchannels[sd->subchannel_list->checking_subchannel];
- } while (sd->subchannel == NULL);
- // Case 1: Only set state to TRANSIENT_FAILURE if we've tried
- // all subchannels.
- if (sd->subchannel_list->checking_subchannel == 0 &&
- sd->subchannel_list == p->subchannel_list) {
- grpc_connectivity_state_set(
- exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
- GRPC_ERROR_REF(error), "connecting_transient_failure");
- }
- sd->curr_connectivity_state =
- grpc_subchannel_check_connectivity(sd->subchannel, &error);
- GRPC_ERROR_UNREF(error);
- if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
- // Reuses the connectivity refs from the previous watch.
- grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
- return;
- }
- break; // Go back to top of loop.
+ // Renew notification.
+ grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
+ break;
+ }
+ case GRPC_CHANNEL_TRANSIENT_FAILURE: {
+ grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd);
+ do {
+ sd->subchannel_list->checking_subchannel =
+ (sd->subchannel_list->checking_subchannel + 1) %
+ sd->subchannel_list->num_subchannels;
+ sd = &sd->subchannel_list
+ ->subchannels[sd->subchannel_list->checking_subchannel];
+ } while (sd->subchannel == NULL);
+ // Case 1: Only set state to TRANSIENT_FAILURE if we've tried
+ // all subchannels.
+ if (sd->subchannel_list->checking_subchannel == 0 &&
+ sd->subchannel_list == p->subchannel_list) {
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_REF(error), "connecting_transient_failure");
}
- case GRPC_CHANNEL_CONNECTING:
- case GRPC_CHANNEL_IDLE: {
- // Only update connectivity state in case 1.
- if (sd->subchannel_list == p->subchannel_list) {
- grpc_connectivity_state_set(
- exec_ctx, &p->state_tracker, GRPC_CHANNEL_CONNECTING,
- GRPC_ERROR_REF(error), "connecting_changed");
- }
- // Renew notification.
- grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
- return;
+ // Reuses the connectivity refs from the previous watch.
+ grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
+ break;
+ }
+ case GRPC_CHANNEL_CONNECTING:
+ case GRPC_CHANNEL_IDLE: {
+ // Only update connectivity state in case 1.
+ if (sd->subchannel_list == p->subchannel_list) {
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_CONNECTING,
+ GRPC_ERROR_REF(error), "connecting_changed");
}
- case GRPC_CHANNEL_SHUTDOWN: {
- grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd,
- "pf_candidate_shutdown");
- // Advance to next subchannel and check its state.
- grpc_lb_subchannel_data* original_sd = sd;
- do {
- sd->subchannel_list->checking_subchannel =
- (sd->subchannel_list->checking_subchannel + 1) %
- sd->subchannel_list->num_subchannels;
- sd = &sd->subchannel_list
- ->subchannels[sd->subchannel_list->checking_subchannel];
- } while (sd->subchannel == NULL && sd != original_sd);
- if (sd == original_sd) {
- grpc_lb_subchannel_list_unref_for_connectivity_watch(
- exec_ctx, sd->subchannel_list, "pf_candidate_shutdown");
- shutdown_locked(exec_ctx, p,
- GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
- "Pick first exhausted channels", &error, 1));
- return;
- }
- if (sd->subchannel_list == p->subchannel_list) {
- grpc_connectivity_state_set(
- exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
- GRPC_ERROR_REF(error), "subchannel_failed");
- }
- sd->curr_connectivity_state =
- grpc_subchannel_check_connectivity(sd->subchannel, &error);
- GRPC_ERROR_UNREF(error);
- if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
- // Reuses the connectivity refs from the previous watch.
- grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
- return;
- }
- // For any other state, go back to top of loop.
- // We will reuse the connectivity refs from the previous watch.
+ // Renew notification.
+ grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
+ break;
+ }
+ case GRPC_CHANNEL_SHUTDOWN: {
+ grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd);
+ grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd,
+ "pf_candidate_shutdown");
+ // Advance to next subchannel and check its state.
+ grpc_lb_subchannel_data* original_sd = sd;
+ do {
+ sd->subchannel_list->checking_subchannel =
+ (sd->subchannel_list->checking_subchannel + 1) %
+ sd->subchannel_list->num_subchannels;
+ sd = &sd->subchannel_list
+ ->subchannels[sd->subchannel_list->checking_subchannel];
+ } while (sd->subchannel == NULL && sd != original_sd);
+ if (sd == original_sd) {
+ grpc_lb_subchannel_list_unref_for_connectivity_watch(
+ exec_ctx, sd->subchannel_list, "pf_candidate_shutdown");
+ shutdown_locked(exec_ctx, p,
+ GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "Pick first exhausted channels", &error, 1));
+ break;
+ }
+ if (sd->subchannel_list == p->subchannel_list) {
+ grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
+ GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_REF(error), "subchannel_failed");
}
+ // Reuses the connectivity refs from the previous watch.
+ grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
+ break;
}
}
}
diff --git a/src/core/lib/iomgr/exec_ctx.cc b/src/core/lib/iomgr/exec_ctx.cc
index 0a0ed8a055..423e8d059f 100644
--- a/src/core/lib/iomgr/exec_ctx.cc
+++ b/src/core/lib/iomgr/exec_ctx.cc
@@ -25,6 +25,9 @@
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/profiling/timers.h"
+#define GRPC_START_TIME_UPDATE_INTERVAL 10000
+extern "C" grpc_tracer_flag grpc_timer_check_trace;
+
bool grpc_exec_ctx_ready_to_finish(grpc_exec_ctx* exec_ctx) {
if ((exec_ctx->flags & GRPC_EXEC_CTX_FLAG_IS_FINISHED) == 0) {
if (exec_ctx->check_ready_to_finish(exec_ctx,
@@ -104,23 +107,49 @@ static void exec_ctx_sched(grpc_exec_ctx* exec_ctx, grpc_closure* closure,
grpc_closure_list_append(&exec_ctx->closure_list, closure, error);
}
-static gpr_timespec
+/* This time pair is not entirely thread-safe as store/load of tv_sec and
+ * tv_nsec are performed separately. However g_start_time do not need to have
+ * sub-second precision, so it is ok if the value of tv_nsec is off in this
+ * case. */
+typedef struct time_atm_pair {
+ gpr_atm tv_sec;
+ gpr_atm tv_nsec;
+} time_atm_pair;
+
+static time_atm_pair
g_start_time[GPR_TIMESPAN + 1]; // assumes GPR_TIMESPAN is the
// last enum value in
// gpr_clock_type
+static grpc_millis g_last_start_time_update;
+
+static gpr_timespec timespec_from_time_atm_pair(const time_atm_pair* src,
+ gpr_clock_type clock_type) {
+ gpr_timespec time;
+ time.tv_nsec = (int32_t)gpr_atm_no_barrier_load(&src->tv_nsec);
+ time.tv_sec = (int64_t)gpr_atm_no_barrier_load(&src->tv_sec);
+ time.clock_type = clock_type;
+ return time;
+}
+
+static void time_atm_pair_store(time_atm_pair* dst, const gpr_timespec src) {
+ gpr_atm_no_barrier_store(&dst->tv_sec, src.tv_sec);
+ gpr_atm_no_barrier_store(&dst->tv_nsec, src.tv_nsec);
+}
void grpc_exec_ctx_global_init(void) {
for (int i = 0; i < GPR_TIMESPAN; i++) {
- g_start_time[i] = gpr_now((gpr_clock_type)i);
+ time_atm_pair_store(&g_start_time[i], gpr_now((gpr_clock_type)i));
}
// allows uniform treatment in conversion functions
- g_start_time[GPR_TIMESPAN] = gpr_time_0(GPR_TIMESPAN);
+ time_atm_pair_store(&g_start_time[GPR_TIMESPAN], gpr_time_0(GPR_TIMESPAN));
}
void grpc_exec_ctx_global_shutdown(void) {}
static gpr_atm timespec_to_atm_round_down(gpr_timespec ts) {
- ts = gpr_time_sub(ts, g_start_time[ts.clock_type]);
+ gpr_timespec start_time =
+ timespec_from_time_atm_pair(&g_start_time[ts.clock_type], ts.clock_type);
+ ts = gpr_time_sub(ts, start_time);
double x =
GPR_MS_PER_SEC * (double)ts.tv_sec + (double)ts.tv_nsec / GPR_NS_PER_MS;
if (x < 0) return 0;
@@ -129,7 +158,9 @@ static gpr_atm timespec_to_atm_round_down(gpr_timespec ts) {
}
static gpr_atm timespec_to_atm_round_up(gpr_timespec ts) {
- ts = gpr_time_sub(ts, g_start_time[ts.clock_type]);
+ gpr_timespec start_time =
+ timespec_from_time_atm_pair(&g_start_time[ts.clock_type], ts.clock_type);
+ ts = gpr_time_sub(ts, start_time);
double x = GPR_MS_PER_SEC * (double)ts.tv_sec +
(double)ts.tv_nsec / GPR_NS_PER_MS +
(double)(GPR_NS_PER_SEC - 1) / (double)GPR_NS_PER_SEC;
@@ -164,8 +195,9 @@ gpr_timespec grpc_millis_to_timespec(grpc_millis millis,
if (clock_type == GPR_TIMESPAN) {
return gpr_time_from_millis(millis, GPR_TIMESPAN);
}
- return gpr_time_add(g_start_time[clock_type],
- gpr_time_from_millis(millis, GPR_TIMESPAN));
+ gpr_timespec start_time =
+ timespec_from_time_atm_pair(&g_start_time[clock_type], clock_type);
+ return gpr_time_add(start_time, gpr_time_from_millis(millis, GPR_TIMESPAN));
}
grpc_millis grpc_timespec_to_millis_round_down(gpr_timespec ts) {
@@ -176,6 +208,30 @@ grpc_millis grpc_timespec_to_millis_round_up(gpr_timespec ts) {
return timespec_to_atm_round_up(ts);
}
+void grpc_exec_ctx_maybe_update_start_time(grpc_exec_ctx* exec_ctx) {
+ grpc_millis now = grpc_exec_ctx_now(exec_ctx);
+ grpc_millis last_start_time_update =
+ gpr_atm_no_barrier_load(&g_last_start_time_update);
+
+ if (now > last_start_time_update &&
+ now - last_start_time_update > GRPC_START_TIME_UPDATE_INTERVAL) {
+ /* Get the current system time and subtract \a now from it, where \a now is
+ * the relative time from grpc_init() from monotonic clock. This calibrates
+ * the time when grpc_exec_ctx_global_init was called based on current
+ * system clock. */
+ gpr_atm_no_barrier_store(&g_last_start_time_update, now);
+ gpr_timespec real_now = gpr_now(GPR_CLOCK_REALTIME);
+ gpr_timespec real_start_time =
+ gpr_time_sub(real_now, gpr_time_from_millis(now, GPR_TIMESPAN));
+ time_atm_pair_store(&g_start_time[GPR_CLOCK_REALTIME], real_start_time);
+
+ if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
+ gpr_log(GPR_DEBUG, "Update realtime clock start time: %" PRId64 "s %dns",
+ real_start_time.tv_sec, real_start_time.tv_nsec);
+ }
+ }
+}
+
static const grpc_closure_scheduler_vtable exec_ctx_scheduler_vtable = {
exec_ctx_run, exec_ctx_sched, "exec_ctx"};
static grpc_closure_scheduler exec_ctx_scheduler = {&exec_ctx_scheduler_vtable};
diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h
index bd27506152..6035e08361 100644
--- a/src/core/lib/iomgr/exec_ctx.h
+++ b/src/core/lib/iomgr/exec_ctx.h
@@ -124,6 +124,8 @@ gpr_timespec grpc_millis_to_timespec(grpc_millis millis, gpr_clock_type clock);
grpc_millis grpc_timespec_to_millis_round_down(gpr_timespec timespec);
grpc_millis grpc_timespec_to_millis_round_up(gpr_timespec timespec);
+void grpc_exec_ctx_maybe_update_start_time(grpc_exec_ctx* exec_ctx);
+
#ifdef __cplusplus
}
#endif
diff --git a/src/core/lib/iomgr/timer_generic.cc b/src/core/lib/iomgr/timer_generic.cc
index 2333f180d4..38ac66ea2f 100644
--- a/src/core/lib/iomgr/timer_generic.cc
+++ b/src/core/lib/iomgr/timer_generic.cc
@@ -25,6 +25,7 @@
#include "src/core/lib/iomgr/timer.h"
#include <grpc/support/alloc.h>
+#include <grpc/support/cpu.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
@@ -37,8 +38,6 @@
#define INVALID_HEAP_INDEX 0xffffffffu
-#define LOG2_NUM_SHARDS 5
-#define NUM_SHARDS (1 << LOG2_NUM_SHARDS)
#define ADD_DEADLINE_SCALE 0.33
#define MIN_QUEUE_WINDOW_DURATION 0.01
#define MAX_QUEUE_WINDOW_DURATION 1
@@ -74,14 +73,16 @@ typedef struct {
grpc_timer list;
} timer_shard;
+static size_t g_num_shards;
+
/* Array of timer shards. Whenever a timer (grpc_timer *) is added, its address
* is hashed to select the timer shard to add the timer to */
-static timer_shard g_shards[NUM_SHARDS];
+static timer_shard* g_shards;
/* Maintains a sorted list of timer shards (sorted by their min_deadline, i.e
* the deadline of the next timer in each shard).
* Access to this is protected by g_shared_mutables.mu */
-static timer_shard* g_shard_queue[NUM_SHARDS];
+static timer_shard** g_shard_queue;
#ifndef NDEBUG
@@ -241,6 +242,11 @@ static gpr_atm compute_min_deadline(timer_shard* shard) {
void grpc_timer_list_init(grpc_exec_ctx* exec_ctx) {
uint32_t i;
+ g_num_shards = GPR_MIN(1, 2 * gpr_cpu_num_cores());
+ g_shards = (timer_shard*)gpr_zalloc(g_num_shards * sizeof(*g_shards));
+ g_shard_queue =
+ (timer_shard**)gpr_zalloc(g_num_shards * sizeof(*g_shard_queue));
+
g_shared_mutables.initialized = true;
g_shared_mutables.checker_mu = GPR_SPINLOCK_INITIALIZER;
gpr_mu_init(&g_shared_mutables.mu);
@@ -250,7 +256,7 @@ void grpc_timer_list_init(grpc_exec_ctx* exec_ctx) {
grpc_register_tracer(&grpc_timer_trace);
grpc_register_tracer(&grpc_timer_check_trace);
- for (i = 0; i < NUM_SHARDS; i++) {
+ for (i = 0; i < g_num_shards; i++) {
timer_shard* shard = &g_shards[i];
gpr_mu_init(&shard->mu);
grpc_time_averaged_stats_init(&shard->stats, 1.0 / ADD_DEADLINE_SCALE, 0.1,
@@ -267,17 +273,19 @@ void grpc_timer_list_init(grpc_exec_ctx* exec_ctx) {
}
void grpc_timer_list_shutdown(grpc_exec_ctx* exec_ctx) {
- int i;
+ size_t i;
run_some_expired_timers(
exec_ctx, GPR_ATM_MAX, NULL,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Timer list shutdown"));
- for (i = 0; i < NUM_SHARDS; i++) {
+ for (i = 0; i < g_num_shards; i++) {
timer_shard* shard = &g_shards[i];
gpr_mu_destroy(&shard->mu);
grpc_timer_heap_destroy(&shard->heap);
}
gpr_mu_destroy(&g_shared_mutables.mu);
gpr_tls_destroy(&g_last_seen_min_timer);
+ gpr_free(g_shards);
+ gpr_free(g_shard_queue);
g_shared_mutables.initialized = false;
}
@@ -311,7 +319,7 @@ static void note_deadline_change(timer_shard* shard) {
g_shard_queue[shard->shard_queue_index - 1]->min_deadline) {
swap_adjacent_shards_in_queue(shard->shard_queue_index - 1);
}
- while (shard->shard_queue_index < NUM_SHARDS - 1 &&
+ while (shard->shard_queue_index < g_num_shards - 1 &&
shard->min_deadline >
g_shard_queue[shard->shard_queue_index + 1]->min_deadline) {
swap_adjacent_shards_in_queue(shard->shard_queue_index);
@@ -323,7 +331,7 @@ void grpc_timer_init_unset(grpc_timer* timer) { timer->pending = false; }
void grpc_timer_init(grpc_exec_ctx* exec_ctx, grpc_timer* timer,
grpc_millis deadline, grpc_closure* closure) {
int is_first_timer = 0;
- timer_shard* shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)];
+ timer_shard* shard = &g_shards[GPR_HASH_POINTER(timer, g_num_shards)];
timer->closure = closure;
timer->deadline = deadline;
@@ -417,7 +425,7 @@ void grpc_timer_cancel(grpc_exec_ctx* exec_ctx, grpc_timer* timer) {
return;
}
- timer_shard* shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)];
+ timer_shard* shard = &g_shards[GPR_HASH_POINTER(timer, g_num_shards)];
gpr_mu_lock(&shard->mu);
if (GRPC_TRACER_ON(grpc_timer_trace)) {
gpr_log(GPR_DEBUG, "TIMER %p: CANCEL pending=%s", timer,
diff --git a/src/core/lib/iomgr/timer_manager.cc b/src/core/lib/iomgr/timer_manager.cc
index 16e9e7e707..bb2a0e0d44 100644
--- a/src/core/lib/iomgr/timer_manager.cc
+++ b/src/core/lib/iomgr/timer_manager.cc
@@ -225,6 +225,11 @@ static void timer_main_loop(grpc_exec_ctx* exec_ctx) {
for (;;) {
grpc_millis next = GRPC_MILLIS_INF_FUTURE;
grpc_exec_ctx_invalidate_now(exec_ctx);
+
+ /* Calibrate g_start_time in exec_ctx.cc with a regular interval in case the
+ * system clock has changed */
+ grpc_exec_ctx_maybe_update_start_time(exec_ctx);
+
// check timer state, updates next to the next time to run a check
switch (grpc_timer_check(exec_ctx, &next)) {
case GRPC_TIMERS_FIRED:
diff --git a/test/cpp/codegen/proto_utils_test.cc b/test/cpp/codegen/proto_utils_test.cc
index fd05c90e9d..ba89b299ef 100644
--- a/test/cpp/codegen/proto_utils_test.cc
+++ b/test/cpp/codegen/proto_utils_test.cc
@@ -16,15 +16,16 @@
*
*/
+#include <grpc++/impl/codegen/grpc_library.h>
#include <grpc++/impl/codegen/proto_utils.h>
#include <grpc++/impl/grpc_library.h>
+#include <grpc/impl/codegen/byte_buffer.h>
+#include <grpc/slice.h>
#include <gtest/gtest.h>
namespace grpc {
namespace internal {
-static GrpcLibraryInitializer g_gli_initializer;
-
// Provide access to GrpcBufferWriter internals.
class GrpcBufferWriterPeer {
public:
@@ -44,35 +45,120 @@ class ProtoUtilsTest : public ::testing::Test {};
// GrpcBufferWriter Next()/Backup() invocations could result in a dangling
// pointer returned by Next() due to the interaction between grpc_slice inlining
// and GRPC_SLICE_START_PTR.
-TEST_F(ProtoUtilsTest, BackupNext) {
- // Ensure the GrpcBufferWriter internals are initialized.
- g_gli_initializer.summon();
-
+TEST_F(ProtoUtilsTest, TinyBackupThenNext) {
grpc_byte_buffer* bp;
- GrpcBufferWriter writer(&bp, 8192);
+ const int block_size = 1024;
+ GrpcBufferWriter writer(&bp, block_size, 8192);
GrpcBufferWriterPeer peer(&writer);
void* data;
int size;
// Allocate a slice.
ASSERT_TRUE(writer.Next(&data, &size));
- EXPECT_EQ(8192, size);
- // Return a single byte. Before the fix that this test acts as a regression
- // for, this would have resulted in an inlined backup slice.
+ EXPECT_EQ(block_size, size);
+ // Return a single byte.
writer.BackUp(1);
- EXPECT_TRUE(!peer.have_backup());
- // On the next allocation, the slice is non-inlined.
+ EXPECT_FALSE(peer.have_backup());
+ // On the next allocation, the returned slice is non-inlined.
ASSERT_TRUE(writer.Next(&data, &size));
EXPECT_TRUE(peer.slice().refcount != NULL);
+ EXPECT_EQ(block_size, size);
// Cleanup.
g_core_codegen_interface->grpc_byte_buffer_destroy(bp);
}
+namespace {
+
+// Set backup_size to 0 to indicate no backup is needed.
+void BufferWriterTest(int block_size, int total_size, int backup_size) {
+ grpc_byte_buffer* bp;
+ GrpcBufferWriter writer(&bp, block_size, total_size);
+
+ int written_size = 0;
+ void* data;
+ int size = 0;
+ bool backed_up_entire_slice = false;
+
+ while (written_size < total_size) {
+ EXPECT_TRUE(writer.Next(&data, &size));
+ EXPECT_GT(size, 0);
+ EXPECT_TRUE(data);
+ int write_size = size;
+ bool should_backup = false;
+ if (backup_size > 0 && size > backup_size) {
+ write_size = size - backup_size;
+ should_backup = true;
+ } else if (size == backup_size && !backed_up_entire_slice) {
+ // only backup entire slice once.
+ backed_up_entire_slice = true;
+ should_backup = true;
+ write_size = 0;
+ }
+ // May need a last backup.
+ if (write_size + written_size > total_size) {
+ write_size = total_size - written_size;
+ should_backup = true;
+ backup_size = size - write_size;
+ ASSERT_GT(backup_size, 0);
+ }
+ for (int i = 0; i < write_size; i++) {
+ ((uint8_t*)data)[i] = written_size % 128;
+ written_size++;
+ }
+ if (should_backup) {
+ writer.BackUp(backup_size);
+ }
+ }
+ EXPECT_EQ(grpc_byte_buffer_length(bp), (size_t)total_size);
+
+ grpc_byte_buffer_reader reader;
+ grpc_byte_buffer_reader_init(&reader, bp);
+ int read_bytes = 0;
+ while (read_bytes < total_size) {
+ grpc_slice s;
+ EXPECT_TRUE(grpc_byte_buffer_reader_next(&reader, &s));
+ for (size_t i = 0; i < GRPC_SLICE_LENGTH(s); i++) {
+ EXPECT_EQ(GRPC_SLICE_START_PTR(s)[i], read_bytes % 128);
+ read_bytes++;
+ }
+ grpc_slice_unref(s);
+ }
+ EXPECT_EQ(read_bytes, total_size);
+ grpc_byte_buffer_reader_destroy(&reader);
+ grpc_byte_buffer_destroy(bp);
+}
+
+TEST(WriterTest, TinyBlockTinyBackup) {
+ for (int i = 2; i < (int)GRPC_SLICE_INLINED_SIZE; i++) {
+ BufferWriterTest(i, 256, 1);
+ }
+}
+
+TEST(WriterTest, SmallBlockTinyBackup) { BufferWriterTest(64, 256, 1); }
+
+TEST(WriterTest, SmallBlockNoBackup) { BufferWriterTest(64, 256, 0); }
+
+TEST(WriterTest, SmallBlockFullBackup) { BufferWriterTest(64, 256, 64); }
+
+TEST(WriterTest, LargeBlockTinyBackup) { BufferWriterTest(4096, 8192, 1); }
+
+TEST(WriterTest, LargeBlockNoBackup) { BufferWriterTest(4096, 8192, 0); }
+
+TEST(WriterTest, LargeBlockFullBackup) { BufferWriterTest(4096, 8192, 4096); }
+
+TEST(WriterTest, LargeBlockLargeBackup) { BufferWriterTest(4096, 8192, 4095); }
+
+} // namespace
} // namespace internal
} // namespace grpc
int main(int argc, char** argv) {
+ // Ensure the GrpcBufferWriter internals are initialized.
+ grpc::internal::GrpcLibraryInitializer init;
+ init.summon();
+ grpc::GrpcLibraryCodegen lib;
+
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}