aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters
diff options
context:
space:
mode:
authorGravatar Muxi Yan <mxyan@google.com>2018-01-19 12:20:15 -0800
committerGravatar Muxi Yan <mxyan@google.com>2018-01-19 12:20:15 -0800
commitfb061c329ac5086b62619b118191574f3fd27a79 (patch)
tree6a855770d7218425a2cd53bc378800c30d6c8525 /src/core/ext/filters
parent032e9b32dc5978a042bdda5c3031ae6cbd928973 (diff)
parent9813858908da2e7deaa58d91f4030e6881e364e3 (diff)
Merge remote-tracking branch 'upstream/master' into fix-stream-compression-config-interface
Diffstat (limited to 'src/core/ext/filters')
-rw-r--r--src/core/ext/filters/client_channel/OWNERS2
-rw-r--r--src/core/ext/filters/client_channel/backup_poller.cc10
-rw-r--r--src/core/ext/filters/client_channel/channel_connectivity.cc6
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc77
-rw-r--r--src/core/ext/filters/client_channel/http_connect_handshaker.cc4
-rw-r--r--src/core/ext/filters/client_channel/http_proxy.cc4
-rw-r--r--src/core/ext/filters/client_channel/lb_policy.cc95
-rw-r--r--src/core/ext/filters/client_channel/lb_policy.h94
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc6
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc743
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.cc2
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc2
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc154
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc172
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc9
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/subchannel_list.h3
-rw-r--r--src/core/ext/filters/client_channel/lb_policy_registry.cc2
-rw-r--r--src/core/ext/filters/client_channel/parse_address.cc2
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc27
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc2
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc2
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc27
-rw-r--r--src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc2
-rw-r--r--src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc2
-rw-r--r--src/core/ext/filters/client_channel/subchannel.cc419
-rw-r--r--src/core/ext/filters/client_channel/subchannel.h81
-rw-r--r--src/core/ext/filters/client_channel/uri_parser.cc2
-rw-r--r--src/core/ext/filters/http/client/http_client_filter.cc12
-rw-r--r--src/core/ext/filters/http/message_compress/message_compress_filter.cc16
-rw-r--r--src/core/ext/filters/http/server/http_server_filter.cc10
-rw-r--r--src/core/ext/filters/load_reporting/server_load_reporting_filter.cc10
-rw-r--r--src/core/ext/filters/max_age/max_age_filter.cc6
-rw-r--r--src/core/ext/filters/message_size/message_size_filter.cc12
-rw-r--r--src/core/ext/filters/workarounds/workaround_cronet_compression_filter.cc6
34 files changed, 903 insertions, 1120 deletions
diff --git a/src/core/ext/filters/client_channel/OWNERS b/src/core/ext/filters/client_channel/OWNERS
index 773bc73179..8f5e92808e 100644
--- a/src/core/ext/filters/client_channel/OWNERS
+++ b/src/core/ext/filters/client_channel/OWNERS
@@ -1,4 +1,4 @@
set noparent
@markdroth
@dgquintas
-@ctiller
+@a11r
diff --git a/src/core/ext/filters/client_channel/backup_poller.cc b/src/core/ext/filters/client_channel/backup_poller.cc
index bfc549e709..906a72b662 100644
--- a/src/core/ext/filters/client_channel/backup_poller.cc
+++ b/src/core/ext/filters/client_channel/backup_poller.cc
@@ -23,17 +23,18 @@
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include "src/core/ext/filters/client_channel/client_channel.h"
+#include "src/core/lib/gpr/env.h"
+#include "src/core/lib/gpr/string.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/pollset.h"
#include "src/core/lib/iomgr/timer.h"
-#include "src/core/lib/support/env.h"
-#include "src/core/lib/support/string.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/completion_queue.h"
#define DEFAULT_POLL_INTERVAL_MS 5000
-typedef struct backup_poller {
+namespace {
+struct backup_poller {
grpc_timer polling_timer;
grpc_closure run_poller_closure;
grpc_closure shutdown_closure;
@@ -42,7 +43,8 @@ typedef struct backup_poller {
bool shutting_down; // guarded by pollset_mu
gpr_refcount refs;
gpr_refcount shutdown_refs;
-} backup_poller;
+};
+} // namespace
static gpr_once g_once = GPR_ONCE_INIT;
static gpr_mu g_poller_mu;
diff --git a/src/core/ext/filters/client_channel/channel_connectivity.cc b/src/core/ext/filters/client_channel/channel_connectivity.cc
index 20693ba419..a827aa30ec 100644
--- a/src/core/ext/filters/client_channel/channel_connectivity.cc
+++ b/src/core/ext/filters/client_channel/channel_connectivity.cc
@@ -58,7 +58,8 @@ typedef enum {
CALLING_BACK_AND_FINISHED,
} callback_phase;
-typedef struct {
+namespace {
+struct state_watcher {
gpr_mu mu;
callback_phase phase;
grpc_closure on_complete;
@@ -71,7 +72,8 @@ typedef struct {
grpc_channel* channel;
grpc_error* error;
void* tag;
-} state_watcher;
+};
+} // namespace
static void delete_state_watcher(state_watcher* w) {
grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element(
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index e99022a91b..a8a7a37be0 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -41,12 +41,12 @@
#include "src/core/ext/filters/deadline/deadline_filter.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/connected_channel.h"
+#include "src/core/lib/gpr/string.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
-#include "src/core/lib/support/string.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/metadata.h"
@@ -553,6 +553,7 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
}
grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties,
chand->interested_parties);
+ grpc_lb_policy_shutdown_locked(chand->lb_policy, new_lb_policy);
GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel");
}
chand->lb_policy = new_lb_policy;
@@ -658,6 +659,7 @@ static void start_transport_op_locked(void* arg, grpc_error* error_ignored) {
if (chand->lb_policy != nullptr) {
grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties,
chand->interested_parties);
+ grpc_lb_policy_shutdown_locked(chand->lb_policy, nullptr);
GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel");
chand->lb_policy = nullptr;
}
@@ -792,6 +794,7 @@ static void cc_destroy_channel_elem(grpc_channel_element* elem) {
if (chand->lb_policy != nullptr) {
grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties,
chand->interested_parties);
+ grpc_lb_policy_shutdown_locked(chand->lb_policy, nullptr);
GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel");
}
gpr_free(chand->info_lb_policy_name);
@@ -852,12 +855,10 @@ typedef struct client_channel_call_data {
grpc_subchannel_call* subchannel_call;
grpc_error* error;
- grpc_lb_policy* lb_policy; // Holds ref while LB pick is pending.
+ grpc_lb_policy_pick_state pick;
grpc_closure lb_pick_closure;
grpc_closure lb_pick_cancel_closure;
- grpc_connected_subchannel* connected_subchannel;
- grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT];
grpc_polling_entity* pollent;
grpc_transport_stream_op_batch* waiting_for_pick_batches[MAX_WAITING_BATCHES];
@@ -866,8 +867,6 @@ typedef struct client_channel_call_data {
grpc_transport_stream_op_batch* initial_metadata_batch;
- grpc_linked_mdelem lb_token_mdelem;
-
grpc_closure on_complete;
grpc_closure* original_on_complete;
} call_data;
@@ -1004,17 +1003,17 @@ static void create_subchannel_call_locked(grpc_call_element* elem,
grpc_error* error) {
channel_data* chand = (channel_data*)elem->channel_data;
call_data* calld = (call_data*)elem->call_data;
- const grpc_connected_subchannel_call_args call_args = {
- calld->pollent, // pollent
- calld->path, // path
- calld->call_start_time, // start_time
- calld->deadline, // deadline
- calld->arena, // arena
- calld->subchannel_call_context, // context
- calld->call_combiner // call_combiner
+ const grpc_core::ConnectedSubchannel::CallArgs call_args = {
+ calld->pollent, // pollent
+ calld->path, // path
+ calld->call_start_time, // start_time
+ calld->deadline, // deadline
+ calld->arena, // arena
+ calld->pick.subchannel_call_context, // context
+ calld->call_combiner // call_combiner
};
- grpc_error* new_error = grpc_connected_subchannel_create_call(
- calld->connected_subchannel, &call_args, &calld->subchannel_call);
+ grpc_error* new_error = calld->pick.connected_subchannel->CreateCall(
+ call_args, &calld->subchannel_call);
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_DEBUG, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
chand, calld, calld->subchannel_call, grpc_error_string(new_error));
@@ -1032,7 +1031,7 @@ static void create_subchannel_call_locked(grpc_call_element* elem,
static void pick_done_locked(grpc_call_element* elem, grpc_error* error) {
call_data* calld = (call_data*)elem->call_data;
channel_data* chand = (channel_data*)elem->channel_data;
- if (calld->connected_subchannel == nullptr) {
+ if (calld->pick.connected_subchannel == nullptr) {
// Failed to create subchannel.
GRPC_ERROR_UNREF(calld->error);
calld->error = error == GRPC_ERROR_NONE
@@ -1071,13 +1070,16 @@ static void pick_callback_cancel_locked(void* arg, grpc_error* error) {
grpc_call_element* elem = (grpc_call_element*)arg;
channel_data* chand = (channel_data*)elem->channel_data;
call_data* calld = (call_data*)elem->call_data;
- if (calld->lb_policy != nullptr) {
+ // Note: chand->lb_policy may have changed since we started our pick,
+ // in which case we will be cancelling the pick on a policy other than
+ // the one we started it on. However, this will just be a no-op.
+ if (error != GRPC_ERROR_NONE && chand->lb_policy != nullptr) {
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_DEBUG, "chand=%p calld=%p: cancelling pick from LB policy %p",
- chand, calld, calld->lb_policy);
+ chand, calld, chand->lb_policy);
}
- grpc_lb_policy_cancel_pick_locked(
- calld->lb_policy, &calld->connected_subchannel, GRPC_ERROR_REF(error));
+ grpc_lb_policy_cancel_pick_locked(chand->lb_policy, &calld->pick,
+ GRPC_ERROR_REF(error));
}
GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback_cancel");
}
@@ -1092,9 +1094,6 @@ static void pick_callback_done_locked(void* arg, grpc_error* error) {
gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed asynchronously",
chand, calld);
}
- GPR_ASSERT(calld->lb_policy != nullptr);
- GRPC_LB_POLICY_UNREF(calld->lb_policy, "pick_subchannel");
- calld->lb_policy = nullptr;
async_pick_done_locked(elem, GRPC_ERROR_REF(error));
}
@@ -1128,26 +1127,21 @@ static bool pick_callback_start_locked(grpc_call_element* elem) {
initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
}
}
- const grpc_lb_policy_pick_args inputs = {
+ calld->pick.initial_metadata =
calld->initial_metadata_batch->payload->send_initial_metadata
- .send_initial_metadata,
- initial_metadata_flags, &calld->lb_token_mdelem};
- // Keep a ref to the LB policy in calld while the pick is pending.
- GRPC_LB_POLICY_REF(chand->lb_policy, "pick_subchannel");
- calld->lb_policy = chand->lb_policy;
+ .send_initial_metadata;
+ calld->pick.initial_metadata_flags = initial_metadata_flags;
GRPC_CLOSURE_INIT(&calld->lb_pick_closure, pick_callback_done_locked, elem,
grpc_combiner_scheduler(chand->combiner));
- const bool pick_done = grpc_lb_policy_pick_locked(
- chand->lb_policy, &inputs, &calld->connected_subchannel,
- calld->subchannel_call_context, nullptr, &calld->lb_pick_closure);
+ calld->pick.on_complete = &calld->lb_pick_closure;
+ const bool pick_done =
+ grpc_lb_policy_pick_locked(chand->lb_policy, &calld->pick);
if (pick_done) {
/* synchronous grpc_lb_policy_pick call. Unref the LB policy. */
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed synchronously",
chand, calld);
}
- GRPC_LB_POLICY_UNREF(calld->lb_policy, "pick_subchannel");
- calld->lb_policy = nullptr;
} else {
GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback_cancel");
grpc_call_combiner_set_notify_on_cancel(
@@ -1289,7 +1283,7 @@ static void start_pick_locked(void* arg, grpc_error* ignored) {
grpc_call_element* elem = (grpc_call_element*)arg;
call_data* calld = (call_data*)elem->call_data;
channel_data* chand = (channel_data*)elem->channel_data;
- GPR_ASSERT(calld->connected_subchannel == nullptr);
+ GPR_ASSERT(calld->pick.connected_subchannel == nullptr);
if (chand->lb_policy != nullptr) {
// We already have an LB policy, so ask it for a pick.
if (pick_callback_start_locked(elem)) {
@@ -1467,15 +1461,14 @@ static void cc_destroy_call_elem(grpc_call_element* elem,
GRPC_SUBCHANNEL_CALL_UNREF(calld->subchannel_call,
"client_channel_destroy_call");
}
- GPR_ASSERT(calld->lb_policy == nullptr);
GPR_ASSERT(calld->waiting_for_pick_batches_count == 0);
- if (calld->connected_subchannel != nullptr) {
- GRPC_CONNECTED_SUBCHANNEL_UNREF(calld->connected_subchannel, "picked");
+ if (calld->pick.connected_subchannel != nullptr) {
+ calld->pick.connected_subchannel.reset();
}
for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) {
- if (calld->subchannel_call_context[i].value != nullptr) {
- calld->subchannel_call_context[i].destroy(
- calld->subchannel_call_context[i].value);
+ if (calld->pick.subchannel_call_context[i].value != nullptr) {
+ calld->pick.subchannel_call_context[i].destroy(
+ calld->pick.subchannel_call_context[i].value);
}
}
GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
diff --git a/src/core/ext/filters/client_channel/http_connect_handshaker.cc b/src/core/ext/filters/client_channel/http_connect_handshaker.cc
index 556a3bc6a1..6bfd038887 100644
--- a/src/core/ext/filters/client_channel/http_connect_handshaker.cc
+++ b/src/core/ext/filters/client_channel/http_connect_handshaker.cc
@@ -30,11 +30,11 @@
#include "src/core/ext/filters/client_channel/uri_parser.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/handshaker_registry.h"
+#include "src/core/lib/gpr/env.h"
+#include "src/core/lib/gpr/string.h"
#include "src/core/lib/http/format_request.h"
#include "src/core/lib/http/parser.h"
#include "src/core/lib/slice/slice_internal.h"
-#include "src/core/lib/support/env.h"
-#include "src/core/lib/support/string.h"
typedef struct http_connect_handshaker {
// Base class. Must be first.
diff --git a/src/core/ext/filters/client_channel/http_proxy.cc b/src/core/ext/filters/client_channel/http_proxy.cc
index 2eafeee702..037c65822a 100644
--- a/src/core/ext/filters/client_channel/http_proxy.cc
+++ b/src/core/ext/filters/client_channel/http_proxy.cc
@@ -30,9 +30,9 @@
#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
#include "src/core/ext/filters/client_channel/uri_parser.h"
#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/gpr/env.h"
+#include "src/core/lib/gpr/string.h"
#include "src/core/lib/slice/b64.h"
-#include "src/core/lib/support/env.h"
-#include "src/core/lib/support/string.h"
/**
* Parses the 'http_proxy' env var and returns the proxy hostname to resolve or
diff --git a/src/core/ext/filters/client_channel/lb_policy.cc b/src/core/ext/filters/client_channel/lb_policy.cc
index 7a5a8dec34..cc4fe7ec62 100644
--- a/src/core/ext/filters/client_channel/lb_policy.cc
+++ b/src/core/ext/filters/client_channel/lb_policy.cc
@@ -19,8 +19,6 @@
#include "src/core/ext/filters/client_channel/lb_policy.h"
#include "src/core/lib/iomgr/combiner.h"
-#define WEAK_REF_BITS 16
-
grpc_core::DebugOnlyTraceFlag grpc_trace_lb_policy_refcount(
false, "lb_policy_refcount");
@@ -28,91 +26,60 @@ void grpc_lb_policy_init(grpc_lb_policy* policy,
const grpc_lb_policy_vtable* vtable,
grpc_combiner* combiner) {
policy->vtable = vtable;
- gpr_atm_no_barrier_store(&policy->ref_pair, 1 << WEAK_REF_BITS);
+ gpr_ref_init(&policy->refs, 1);
policy->interested_parties = grpc_pollset_set_create();
policy->combiner = GRPC_COMBINER_REF(combiner, "lb_policy");
}
#ifndef NDEBUG
-#define REF_FUNC_EXTRA_ARGS , const char *file, int line, const char *reason
-#define REF_MUTATE_EXTRA_ARGS REF_FUNC_EXTRA_ARGS, const char* purpose
-#define REF_FUNC_PASS_ARGS(new_reason) , file, line, new_reason
-#define REF_MUTATE_PASS_ARGS(purpose) , file, line, reason, purpose
+void grpc_lb_policy_ref(grpc_lb_policy* lb_policy, const char* file, int line,
+ const char* reason) {
+ if (grpc_trace_lb_policy_refcount.enabled()) {
+ gpr_atm old_refs = gpr_atm_no_barrier_load(&lb_policy->refs.count);
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
+ "LB_POLICY:%p ref %" PRIdPTR " -> %" PRIdPTR " %s", lb_policy,
+ old_refs, old_refs + 1, reason);
+ }
#else
-#define REF_FUNC_EXTRA_ARGS
-#define REF_MUTATE_EXTRA_ARGS
-#define REF_FUNC_PASS_ARGS(new_reason)
-#define REF_MUTATE_PASS_ARGS(x)
+void grpc_lb_policy_ref(grpc_lb_policy* lb_policy) {
#endif
+ gpr_ref(&lb_policy->refs);
+}
-static gpr_atm ref_mutate(grpc_lb_policy* c, gpr_atm delta,
- int barrier REF_MUTATE_EXTRA_ARGS) {
- gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta)
- : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta);
#ifndef NDEBUG
+void grpc_lb_policy_unref(grpc_lb_policy* lb_policy, const char* file, int line,
+ const char* reason) {
if (grpc_trace_lb_policy_refcount.enabled()) {
+ gpr_atm old_refs = gpr_atm_no_barrier_load(&lb_policy->refs.count);
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
- "LB_POLICY: %p %12s 0x%" PRIxPTR " -> 0x%" PRIxPTR " [%s]", c,
- purpose, old_val, old_val + delta, reason);
+ "LB_POLICY:%p unref %" PRIdPTR " -> %" PRIdPTR " %s", lb_policy,
+ old_refs, old_refs - 1, reason);
}
+#else
+void grpc_lb_policy_unref(grpc_lb_policy* lb_policy) {
#endif
- return old_val;
-}
-
-void grpc_lb_policy_ref(grpc_lb_policy* policy REF_FUNC_EXTRA_ARGS) {
- ref_mutate(policy, 1 << WEAK_REF_BITS, 0 REF_MUTATE_PASS_ARGS("STRONG_REF"));
-}
-
-static void shutdown_locked(void* arg, grpc_error* error) {
- grpc_lb_policy* policy = (grpc_lb_policy*)arg;
- policy->vtable->shutdown_locked(policy);
- GRPC_LB_POLICY_WEAK_UNREF(policy, "strong-unref");
-}
-
-void grpc_lb_policy_unref(grpc_lb_policy* policy REF_FUNC_EXTRA_ARGS) {
- gpr_atm old_val =
- ref_mutate(policy, (gpr_atm)1 - (gpr_atm)(1 << WEAK_REF_BITS),
- 1 REF_MUTATE_PASS_ARGS("STRONG_UNREF"));
- gpr_atm mask = ~(gpr_atm)((1 << WEAK_REF_BITS) - 1);
- gpr_atm check = 1 << WEAK_REF_BITS;
- if ((old_val & mask) == check) {
- GRPC_CLOSURE_SCHED(
- GRPC_CLOSURE_CREATE(shutdown_locked, policy,
- grpc_combiner_scheduler(policy->combiner)),
- GRPC_ERROR_NONE);
- } else {
- grpc_lb_policy_weak_unref(policy REF_FUNC_PASS_ARGS("strong-unref"));
+ if (gpr_unref(&lb_policy->refs)) {
+ grpc_pollset_set_destroy(lb_policy->interested_parties);
+ grpc_combiner* combiner = lb_policy->combiner;
+ lb_policy->vtable->destroy(lb_policy);
+ GRPC_COMBINER_UNREF(combiner, "lb_policy");
}
}
-void grpc_lb_policy_weak_ref(grpc_lb_policy* policy REF_FUNC_EXTRA_ARGS) {
- ref_mutate(policy, 1, 0 REF_MUTATE_PASS_ARGS("WEAK_REF"));
-}
-
-void grpc_lb_policy_weak_unref(grpc_lb_policy* policy REF_FUNC_EXTRA_ARGS) {
- gpr_atm old_val =
- ref_mutate(policy, -(gpr_atm)1, 1 REF_MUTATE_PASS_ARGS("WEAK_UNREF"));
- if (old_val == 1) {
- grpc_pollset_set_destroy(policy->interested_parties);
- grpc_combiner* combiner = policy->combiner;
- policy->vtable->destroy(policy);
- GRPC_COMBINER_UNREF(combiner, "lb_policy");
- }
+void grpc_lb_policy_shutdown_locked(grpc_lb_policy* policy,
+ grpc_lb_policy* new_policy) {
+ policy->vtable->shutdown_locked(policy, new_policy);
}
int grpc_lb_policy_pick_locked(grpc_lb_policy* policy,
- const grpc_lb_policy_pick_args* pick_args,
- grpc_connected_subchannel** target,
- grpc_call_context_element* context,
- void** user_data, grpc_closure* on_complete) {
- return policy->vtable->pick_locked(policy, pick_args, target, context,
- user_data, on_complete);
+ grpc_lb_policy_pick_state* pick) {
+ return policy->vtable->pick_locked(policy, pick);
}
void grpc_lb_policy_cancel_pick_locked(grpc_lb_policy* policy,
- grpc_connected_subchannel** target,
+ grpc_lb_policy_pick_state* pick,
grpc_error* error) {
- policy->vtable->cancel_pick_locked(policy, target, error);
+ policy->vtable->cancel_pick_locked(policy, pick, error);
}
void grpc_lb_policy_cancel_picks_locked(grpc_lb_policy* policy,
diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h
index 3572c97ed1..e19726efb3 100644
--- a/src/core/ext/filters/client_channel/lb_policy.h
+++ b/src/core/ext/filters/client_channel/lb_policy.h
@@ -20,6 +20,7 @@
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_H
#include "src/core/ext/filters/client_channel/subchannel.h"
+#include "src/core/lib/gpr++/ref_counted_ptr.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/transport/connectivity_state.h"
@@ -33,7 +34,7 @@ extern grpc_core::DebugOnlyTraceFlag grpc_trace_lb_policy_refcount;
struct grpc_lb_policy {
const grpc_lb_policy_vtable* vtable;
- gpr_atm ref_pair;
+ gpr_refcount refs;
/* owned pointer to interested parties in load balancing decisions */
grpc_pollset_set* interested_parties;
/* combiner under which lb_policy actions take place */
@@ -42,32 +43,42 @@ struct grpc_lb_policy {
grpc_closure* request_reresolution;
};
-/** Extra arguments for an LB pick */
-typedef struct grpc_lb_policy_pick_args {
- /** Initial metadata associated with the picking call. */
+/// State used for an LB pick.
+typedef struct grpc_lb_policy_pick_state {
+ /// Initial metadata associated with the picking call.
grpc_metadata_batch* initial_metadata;
- /** Bitmask used for selective cancelling. See \a
- * grpc_lb_policy_cancel_picks() and \a GRPC_INITIAL_METADATA_* in
- * grpc_types.h */
+ /// Bitmask used for selective cancelling. See \a
+ /// grpc_lb_policy_cancel_picks() and \a GRPC_INITIAL_METADATA_* in
+ /// grpc_types.h.
uint32_t initial_metadata_flags;
- /** Storage for LB token in \a initial_metadata, or NULL if not used */
- grpc_linked_mdelem* lb_token_mdelem_storage;
-} grpc_lb_policy_pick_args;
+ /// Storage for LB token in \a initial_metadata, or NULL if not used.
+ grpc_linked_mdelem lb_token_mdelem_storage;
+ /// Closure to run when pick is complete, if not completed synchronously.
+ grpc_closure* on_complete;
+ /// Will be set to the selected subchannel, or nullptr on failure or when
+ /// the LB policy decides to drop the call.
+ grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> connected_subchannel;
+ /// Will be populated with context to pass to the subchannel call, if needed.
+ grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT];
+ /// Upon success, \a *user_data will be set to whatever opaque information
+ /// may need to be propagated from the LB policy, or NULL if not needed.
+ void** user_data;
+ /// Next pointer. For internal use by LB policy.
+ struct grpc_lb_policy_pick_state* next;
+} grpc_lb_policy_pick_state;
struct grpc_lb_policy_vtable {
void (*destroy)(grpc_lb_policy* policy);
- void (*shutdown_locked)(grpc_lb_policy* policy);
+
+ /// \see grpc_lb_policy_shutdown_locked().
+ void (*shutdown_locked)(grpc_lb_policy* policy, grpc_lb_policy* new_policy);
/** \see grpc_lb_policy_pick */
- int (*pick_locked)(grpc_lb_policy* policy,
- const grpc_lb_policy_pick_args* pick_args,
- grpc_connected_subchannel** target,
- grpc_call_context_element* context, void** user_data,
- grpc_closure* on_complete);
+ int (*pick_locked)(grpc_lb_policy* policy, grpc_lb_policy_pick_state* pick);
/** \see grpc_lb_policy_cancel_pick */
void (*cancel_pick_locked)(grpc_lb_policy* policy,
- grpc_connected_subchannel** target,
+ grpc_lb_policy_pick_state* pick,
grpc_error* error);
/** \see grpc_lb_policy_cancel_picks */
@@ -103,37 +114,19 @@ struct grpc_lb_policy_vtable {
};
#ifndef NDEBUG
-
-/* Strong references: the policy will shutdown when they reach zero */
#define GRPC_LB_POLICY_REF(p, r) \
grpc_lb_policy_ref((p), __FILE__, __LINE__, (r))
#define GRPC_LB_POLICY_UNREF(p, r) \
grpc_lb_policy_unref((p), __FILE__, __LINE__, (r))
-
-/* Weak references: they don't prevent the shutdown of the LB policy. When no
- * strong references are left but there are still weak ones, shutdown is called.
- * Once the weak reference also reaches zero, the LB policy is destroyed. */
-#define GRPC_LB_POLICY_WEAK_REF(p, r) \
- grpc_lb_policy_weak_ref((p), __FILE__, __LINE__, (r))
-#define GRPC_LB_POLICY_WEAK_UNREF(p, r) \
- grpc_lb_policy_weak_unref((p), __FILE__, __LINE__, (r))
void grpc_lb_policy_ref(grpc_lb_policy* policy, const char* file, int line,
const char* reason);
void grpc_lb_policy_unref(grpc_lb_policy* policy, const char* file, int line,
const char* reason);
-void grpc_lb_policy_weak_ref(grpc_lb_policy* policy, const char* file, int line,
- const char* reason);
-void grpc_lb_policy_weak_unref(grpc_lb_policy* policy, const char* file,
- int line, const char* reason);
-#else
+#else // !NDEBUG
#define GRPC_LB_POLICY_REF(p, r) grpc_lb_policy_ref((p))
#define GRPC_LB_POLICY_UNREF(p, r) grpc_lb_policy_unref((p))
-#define GRPC_LB_POLICY_WEAK_REF(p, r) grpc_lb_policy_weak_ref((p))
-#define GRPC_LB_POLICY_WEAK_UNREF(p, r) grpc_lb_policy_weak_unref((p))
void grpc_lb_policy_ref(grpc_lb_policy* policy);
void grpc_lb_policy_unref(grpc_lb_policy* policy);
-void grpc_lb_policy_weak_ref(grpc_lb_policy* policy);
-void grpc_lb_policy_weak_unref(grpc_lb_policy* policy);
#endif
/** called by concrete implementations to initialize the base struct */
@@ -141,40 +134,37 @@ void grpc_lb_policy_init(grpc_lb_policy* policy,
const grpc_lb_policy_vtable* vtable,
grpc_combiner* combiner);
-/** Finds an appropriate subchannel for a call, based on \a pick_args.
-
- \a target will be set to the selected subchannel, or NULL on failure
- or when the LB policy decides to drop the call.
+/// Shuts down \a policy.
+/// If \a new_policy is non-null, any pending picks will be restarted
+/// on that policy; otherwise, they will be failed.
+void grpc_lb_policy_shutdown_locked(grpc_lb_policy* policy,
+ grpc_lb_policy* new_policy);
- Upon success, \a user_data will be set to whatever opaque information
- may need to be propagated from the LB policy, or NULL if not needed.
- \a context will be populated with context to pass to the subchannel
- call, if needed.
+/** Finds an appropriate subchannel for a call, based on data in \a pick.
+ \a pick must remain alive until the pick is complete.
If the pick succeeds and a result is known immediately, a non-zero
- value will be returned. Otherwise, \a on_complete will be invoked
+ value will be returned. Otherwise, \a pick->on_complete will be invoked
once the pick is complete with its error argument set to indicate
success or failure.
Any IO should be done under the \a interested_parties \a grpc_pollset_set
in the \a grpc_lb_policy struct. */
int grpc_lb_policy_pick_locked(grpc_lb_policy* policy,
- const grpc_lb_policy_pick_args* pick_args,
- grpc_connected_subchannel** target,
- grpc_call_context_element* context,
- void** user_data, grpc_closure* on_complete);
+ grpc_lb_policy_pick_state* pick);
-/** Perform a connected subchannel ping (see \a grpc_connected_subchannel_ping)
+/** Perform a connected subchannel ping (see \a
+ grpc_core::ConnectedSubchannel::Ping)
against one of the connected subchannels managed by \a policy. */
void grpc_lb_policy_ping_one_locked(grpc_lb_policy* policy,
grpc_closure* on_initiate,
grpc_closure* on_ack);
-/** Cancel picks for \a target.
+/** Cancel picks for \a pick.
The \a on_complete callback of the pending picks will be invoked with \a
*target set to NULL. */
void grpc_lb_policy_cancel_pick_locked(grpc_lb_policy* policy,
- grpc_connected_subchannel** target,
+ grpc_lb_policy_pick_state* pick,
grpc_error* error);
/** Cancel all pending picks for which their \a initial_metadata_flags (as given
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc
index 3eedb08ecc..1708d81e61 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc
@@ -32,7 +32,8 @@ static grpc_error* init_channel_elem(grpc_channel_element* elem,
static void destroy_channel_elem(grpc_channel_element* elem) {}
-typedef struct {
+namespace {
+struct call_data {
// Stats object to update.
grpc_grpclb_client_stats* client_stats;
// State for intercepting send_initial_metadata.
@@ -43,7 +44,8 @@ typedef struct {
grpc_closure recv_initial_metadata_ready;
grpc_closure* original_recv_initial_metadata_ready;
bool recv_initial_metadata_succeeded;
-} call_data;
+};
+} // namespace
static void on_complete_for_send(void* arg, grpc_error* error) {
call_data* calld = (call_data*)arg;
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index 3c64213fb9..6c29cd8218 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -54,7 +54,7 @@
* operations in progress over the old RR instance. This is done by
* decreasing the reference count on the old policy. The moment no more
* references are held on the old RR policy, it'll be destroyed and \a
- * glb_rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN
+ * on_rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN
* state. At this point we can transition to a new RR instance safely, which
* is done once again via \a rr_handover_locked().
*
@@ -106,6 +106,8 @@
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
+#include "src/core/lib/gpr++/manual_constructor.h"
+#include "src/core/lib/gpr/string.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
@@ -113,13 +115,11 @@
#include "src/core/lib/slice/slice_hash_table.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
-#include "src/core/lib/support/string.h"
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/transport/static_metadata.h"
-#define GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS 20
#define GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS 1
#define GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER 1.6
#define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120
@@ -128,185 +128,48 @@
grpc_core::TraceFlag grpc_lb_glb_trace(false, "glb");
-/* add lb_token of selected subchannel (address) to the call's initial
- * metadata */
-static grpc_error* initial_metadata_add_lb_token(
- grpc_metadata_batch* initial_metadata,
- grpc_linked_mdelem* lb_token_mdelem_storage, grpc_mdelem lb_token) {
- GPR_ASSERT(lb_token_mdelem_storage != nullptr);
- GPR_ASSERT(!GRPC_MDISNULL(lb_token));
- return grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage,
- lb_token);
-}
-
-static void destroy_client_stats(void* arg) {
- grpc_grpclb_client_stats_unref((grpc_grpclb_client_stats*)arg);
-}
-
-typedef struct wrapped_rr_closure_arg {
- /* the closure instance using this struct as argument */
- grpc_closure wrapper_closure;
-
- /* the original closure. Usually a on_complete/notify cb for pick() and ping()
- * calls against the internal RR instance, respectively. */
- grpc_closure* wrapped_closure;
-
- /* the pick's initial metadata, kept in order to append the LB token for the
- * pick */
- grpc_metadata_batch* initial_metadata;
-
- /* the picked target, used to determine which LB token to add to the pick's
- * initial metadata */
- grpc_connected_subchannel** target;
-
- /* the context to be populated for the subchannel call */
- grpc_call_context_element* context;
-
- /* Stats for client-side load reporting. Note that this holds a
- * reference, which must be either passed on via context or unreffed. */
+struct glb_lb_policy;
+
+namespace {
+
+/// Linked list of pending pick requests. It stores all information needed to
+/// eventually call (Round Robin's) pick() on them. They mainly stay pending
+/// waiting for the RR policy to be created.
+///
+/// Note that when a pick is sent to the RR policy, we inject our own
+/// on_complete callback, so that we can intercept the result before
+/// invoking the original on_complete callback. This allows us to set the
+/// LB token metadata and add client_stats to the call context.
+/// See \a pending_pick_complete() for details.
+struct pending_pick {
+ // Our on_complete closure and the original one.
+ grpc_closure on_complete;
+ grpc_closure* original_on_complete;
+ // The original pick.
+ grpc_lb_policy_pick_state* pick;
+ // Stats for client-side load reporting. Note that this holds a
+ // reference, which must be either passed on via context or unreffed.
grpc_grpclb_client_stats* client_stats;
-
- /* the LB token associated with the pick */
+ // The LB token associated with the pick. This is set via user_data in
+ // the pick.
grpc_mdelem lb_token;
-
- /* storage for the lb token initial metadata mdelem */
- grpc_linked_mdelem* lb_token_mdelem_storage;
-
- /* The RR instance related to the closure */
- grpc_lb_policy* rr_policy;
-
- /* The grpclb instance that created the wrapping. This instance is not owned,
- * reference counts are untouched. It's used only for logging purposes. */
- grpc_lb_policy* glb_policy;
-
- /* heap memory to be freed upon closure execution. */
- void* free_when_done;
-} wrapped_rr_closure_arg;
-
-/* The \a on_complete closure passed as part of the pick requires keeping a
- * reference to its associated round robin instance. We wrap this closure in
- * order to unref the round robin instance upon its invocation */
-static void wrapped_rr_closure(void* arg, grpc_error* error) {
- wrapped_rr_closure_arg* wc_arg = (wrapped_rr_closure_arg*)arg;
-
- GPR_ASSERT(wc_arg->wrapped_closure != nullptr);
- GRPC_CLOSURE_SCHED(wc_arg->wrapped_closure, GRPC_ERROR_REF(error));
-
- if (wc_arg->rr_policy != nullptr) {
- /* if *target is nullptr, no pick has been made by the RR policy (eg, all
- * addresses failed to connect). There won't be any user_data/token
- * available */
- if (*wc_arg->target != nullptr) {
- if (!GRPC_MDISNULL(wc_arg->lb_token)) {
- initial_metadata_add_lb_token(wc_arg->initial_metadata,
- wc_arg->lb_token_mdelem_storage,
- GRPC_MDELEM_REF(wc_arg->lb_token));
- } else {
- gpr_log(
- GPR_ERROR,
- "[grpclb %p] No LB token for connected subchannel pick %p (from RR "
- "instance %p).",
- wc_arg->glb_policy, *wc_arg->target, wc_arg->rr_policy);
- abort();
- }
- // Pass on client stats via context. Passes ownership of the reference.
- GPR_ASSERT(wc_arg->client_stats != nullptr);
- wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].value = wc_arg->client_stats;
- wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].destroy = destroy_client_stats;
- } else {
- grpc_grpclb_client_stats_unref(wc_arg->client_stats);
- }
- if (grpc_lb_glb_trace.enabled()) {
- gpr_log(GPR_INFO, "[grpclb %p] Unreffing RR %p", wc_arg->glb_policy,
- wc_arg->rr_policy);
- }
- GRPC_LB_POLICY_UNREF(wc_arg->rr_policy, "wrapped_rr_closure");
- }
- GPR_ASSERT(wc_arg->free_when_done != nullptr);
- gpr_free(wc_arg->free_when_done);
-}
-
-/* Linked list of pending pick requests. It stores all information needed to
- * eventually call (Round Robin's) pick() on them. They mainly stay pending
- * waiting for the RR policy to be created/updated.
- *
- * One particularity is the wrapping of the user-provided \a on_complete closure
- * (in \a wrapped_on_complete and \a wrapped_on_complete_arg). This is needed in
- * order to correctly unref the RR policy instance upon completion of the pick.
- * See \a wrapped_rr_closure for details. */
-typedef struct pending_pick {
+ // The grpclb instance that created the wrapping. This instance is not owned,
+ // reference counts are untouched. It's used only for logging purposes.
+ glb_lb_policy* glb_policy;
+ // Next pending pick.
struct pending_pick* next;
+};
- /* original pick()'s arguments */
- grpc_lb_policy_pick_args pick_args;
-
- /* output argument where to store the pick()ed connected subchannel, or
- * nullptr upon error. */
- grpc_connected_subchannel** target;
-
- /* args for wrapped_on_complete */
- wrapped_rr_closure_arg wrapped_on_complete_arg;
-} pending_pick;
-
-static void add_pending_pick(pending_pick** root,
- const grpc_lb_policy_pick_args* pick_args,
- grpc_connected_subchannel** target,
- grpc_call_context_element* context,
- grpc_closure* on_complete) {
- pending_pick* pp = (pending_pick*)gpr_zalloc(sizeof(*pp));
- pp->next = *root;
- pp->pick_args = *pick_args;
- pp->target = target;
- pp->wrapped_on_complete_arg.wrapped_closure = on_complete;
- pp->wrapped_on_complete_arg.target = target;
- pp->wrapped_on_complete_arg.context = context;
- pp->wrapped_on_complete_arg.initial_metadata = pick_args->initial_metadata;
- pp->wrapped_on_complete_arg.lb_token_mdelem_storage =
- pick_args->lb_token_mdelem_storage;
- pp->wrapped_on_complete_arg.free_when_done = pp;
- GRPC_CLOSURE_INIT(&pp->wrapped_on_complete_arg.wrapper_closure,
- wrapped_rr_closure, &pp->wrapped_on_complete_arg,
- grpc_schedule_on_exec_ctx);
- *root = pp;
-}
-
-/* Same as the \a pending_pick struct but for ping operations */
-typedef struct pending_ping {
+/// A linked list of pending pings waiting for the RR policy to be created.
+struct pending_ping {
+ grpc_closure* on_initiate;
+ grpc_closure* on_ack;
struct pending_ping* next;
+};
- /* args for sending the ping */
- wrapped_rr_closure_arg* on_initiate;
- wrapped_rr_closure_arg* on_ack;
-} pending_ping;
-
-static void add_pending_ping(pending_ping** root, grpc_closure* on_initiate,
- grpc_closure* on_ack) {
- pending_ping* pping = (pending_ping*)gpr_zalloc(sizeof(*pping));
- if (on_initiate != nullptr) {
- pping->on_initiate =
- (wrapped_rr_closure_arg*)gpr_zalloc(sizeof(*pping->on_initiate));
- pping->on_initiate->wrapped_closure = on_initiate;
- pping->on_initiate->free_when_done = pping->on_initiate;
- GRPC_CLOSURE_INIT(&pping->on_initiate->wrapper_closure, wrapped_rr_closure,
- &pping->on_initiate, grpc_schedule_on_exec_ctx);
- }
- if (on_ack != nullptr) {
- pping->on_ack = (wrapped_rr_closure_arg*)gpr_zalloc(sizeof(*pping->on_ack));
- pping->on_ack->wrapped_closure = on_ack;
- pping->on_ack->free_when_done = pping->on_ack;
- GRPC_CLOSURE_INIT(&pping->on_ack->wrapper_closure, wrapped_rr_closure,
- &pping->on_ack, grpc_schedule_on_exec_ctx);
- }
- pping->next = *root;
- *root = pping;
-}
+} // namespace
-/*
- * glb_lb_policy
- */
-typedef struct rr_connectivity_data rr_connectivity_data;
-
-typedef struct glb_lb_policy {
+struct glb_lb_policy {
/** base policy: must be first */
grpc_lb_policy base;
@@ -331,6 +194,9 @@ typedef struct glb_lb_policy {
/** the RR policy to use of the backend servers returned by the LB server */
grpc_lb_policy* rr_policy;
+ grpc_closure on_rr_connectivity_changed;
+ grpc_connectivity_state rr_connectivity_state;
+
bool started_picking;
/** our connectivity state tracker */
@@ -365,11 +231,11 @@ typedef struct glb_lb_policy {
/** are we already watching the LB channel's connectivity? */
bool watching_lb_channel;
- /** is \a lb_call_retry_timer active? */
- bool retry_timer_active;
+ /** is the callback associated with \a lb_call_retry_timer pending? */
+ bool retry_timer_callback_pending;
- /** is \a lb_fallback_timer active? */
- bool fallback_timer_active;
+ /** is the callback associated with \a lb_fallback_timer pending? */
+ bool fallback_timer_callback_pending;
/** called upon changes to the LB channel's connectivity. */
grpc_closure lb_channel_on_connectivity_changed;
@@ -377,6 +243,9 @@ typedef struct glb_lb_policy {
/************************************************************/
/* client data associated with the LB server communication */
/************************************************************/
+ /* Finished sending initial request. */
+ grpc_closure lb_on_sent_initial_request;
+
/* Status from the LB server has been received. This signals the end of the LB
* call. */
grpc_closure lb_on_server_status_received;
@@ -408,7 +277,7 @@ typedef struct glb_lb_policy {
grpc_slice lb_call_status_details;
/** LB call retry backoff state */
- grpc_backoff lb_call_backoff_state;
+ grpc_core::ManualConstructor<grpc_core::BackOff> lb_call_backoff;
/** LB call retry timer */
grpc_timer lb_call_retry_timer;
@@ -416,6 +285,7 @@ typedef struct glb_lb_policy {
/** LB fallback timer */
grpc_timer lb_fallback_timer;
+ bool initial_request_sent;
bool seen_initial_response;
/* Stats for client-side load reporting. Should be unreffed and
@@ -424,22 +294,94 @@ typedef struct glb_lb_policy {
/* Interval and timer for next client load report. */
grpc_millis client_stats_report_interval;
grpc_timer client_load_report_timer;
- bool client_load_report_timer_pending;
+ bool client_load_report_timer_callback_pending;
bool last_client_load_report_counters_were_zero;
/* Closure used for either the load report timer or the callback for
* completion of sending the load report. */
grpc_closure client_load_report_closure;
/* Client load report message payload. */
grpc_byte_buffer* client_load_report_payload;
-} glb_lb_policy;
-
-/* Keeps track and reacts to changes in connectivity of the RR instance */
-struct rr_connectivity_data {
- grpc_closure on_change;
- grpc_connectivity_state state;
- glb_lb_policy* glb_policy;
};
+/* add lb_token of selected subchannel (address) to the call's initial
+ * metadata */
+static grpc_error* initial_metadata_add_lb_token(
+ grpc_metadata_batch* initial_metadata,
+ grpc_linked_mdelem* lb_token_mdelem_storage, grpc_mdelem lb_token) {
+ GPR_ASSERT(lb_token_mdelem_storage != nullptr);
+ GPR_ASSERT(!GRPC_MDISNULL(lb_token));
+ return grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage,
+ lb_token);
+}
+
+static void destroy_client_stats(void* arg) {
+ grpc_grpclb_client_stats_unref((grpc_grpclb_client_stats*)arg);
+}
+
+static void pending_pick_set_metadata_and_context(pending_pick* pp) {
+ /* if connected_subchannel is nullptr, no pick has been made by the RR
+ * policy (e.g., all addresses failed to connect). There won't be any
+ * user_data/token available */
+ if (pp->pick->connected_subchannel != nullptr) {
+ if (!GRPC_MDISNULL(pp->lb_token)) {
+ initial_metadata_add_lb_token(pp->pick->initial_metadata,
+ &pp->pick->lb_token_mdelem_storage,
+ GRPC_MDELEM_REF(pp->lb_token));
+ } else {
+ gpr_log(GPR_ERROR,
+ "[grpclb %p] No LB token for connected subchannel pick %p",
+ pp->glb_policy, pp->pick);
+ abort();
+ }
+ // Pass on client stats via context. Passes ownership of the reference.
+ GPR_ASSERT(pp->client_stats != nullptr);
+ pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].value =
+ pp->client_stats;
+ pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].destroy =
+ destroy_client_stats;
+ } else {
+ if (pp->client_stats != nullptr) {
+ grpc_grpclb_client_stats_unref(pp->client_stats);
+ }
+ }
+}
+
+/* The \a on_complete closure passed as part of the pick requires keeping a
+ * reference to its associated round robin instance. We wrap this closure in
+ * order to unref the round robin instance upon its invocation */
+static void pending_pick_complete(void* arg, grpc_error* error) {
+ pending_pick* pp = (pending_pick*)arg;
+ pending_pick_set_metadata_and_context(pp);
+ GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_REF(error));
+ gpr_free(pp);
+}
+
+static pending_pick* pending_pick_create(glb_lb_policy* glb_policy,
+ grpc_lb_policy_pick_state* pick) {
+ pending_pick* pp = (pending_pick*)gpr_zalloc(sizeof(*pp));
+ pp->pick = pick;
+ pp->glb_policy = glb_policy;
+ GRPC_CLOSURE_INIT(&pp->on_complete, pending_pick_complete, pp,
+ grpc_schedule_on_exec_ctx);
+ pp->original_on_complete = pick->on_complete;
+ pp->pick->on_complete = &pp->on_complete;
+ return pp;
+}
+
+static void pending_pick_add(pending_pick** root, pending_pick* new_pp) {
+ new_pp->next = *root;
+ *root = new_pp;
+}
+
+static void pending_ping_add(pending_ping** root, grpc_closure* on_initiate,
+ grpc_closure* on_ack) {
+ pending_ping* pping = (pending_ping*)gpr_zalloc(sizeof(*pping));
+ pping->on_initiate = on_initiate;
+ pping->on_ack = on_ack;
+ pping->next = *root;
+ *root = pping;
+}
+
static bool is_server_valid(const grpc_grpclb_server* server, size_t idx,
bool log) {
if (server->drop) return false;
@@ -551,7 +493,6 @@ static grpc_lb_addresses* process_serverlist_locked(
gpr_free(uri);
user_data = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload;
}
-
grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr, addr.len,
false /* is_balancer */,
nullptr /* balancer_name */, user_data);
@@ -592,7 +533,6 @@ static void update_lb_connectivity_status_locked(
grpc_error* rr_state_error) {
const grpc_connectivity_state curr_glb_state =
grpc_connectivity_state_check(&glb_policy->state_tracker);
-
/* The new connectivity status is a function of the previous one and the new
* input coming from the status of the RR policy.
*
@@ -622,7 +562,6 @@ static void update_lb_connectivity_status_locked(
*
* (*) This function mustn't be called during shutting down. */
GPR_ASSERT(curr_glb_state != GRPC_CHANNEL_SHUTDOWN);
-
switch (rr_state) {
case GRPC_CHANNEL_TRANSIENT_FAILURE:
case GRPC_CHANNEL_SHUTDOWN:
@@ -633,7 +572,6 @@ static void update_lb_connectivity_status_locked(
case GRPC_CHANNEL_READY:
GPR_ASSERT(rr_state_error == GRPC_ERROR_NONE);
}
-
if (grpc_lb_glb_trace.enabled()) {
gpr_log(
GPR_INFO,
@@ -651,10 +589,8 @@ static void update_lb_connectivity_status_locked(
* cleanups this callback would otherwise be responsible for.
* If \a force_async is true, then we will manually schedule the
* completion callback even if the pick is available immediately. */
-static bool pick_from_internal_rr_locked(
- glb_lb_policy* glb_policy, const grpc_lb_policy_pick_args* pick_args,
- bool force_async, grpc_connected_subchannel** target,
- wrapped_rr_closure_arg* wc_arg) {
+static bool pick_from_internal_rr_locked(glb_lb_policy* glb_policy,
+ bool force_async, pending_pick* pp) {
// Check for drops if we are not using fallback backend addresses.
if (glb_policy->serverlist != nullptr) {
// Look at the index into the serverlist to see if we should drop this call.
@@ -664,57 +600,36 @@ static bool pick_from_internal_rr_locked(
glb_policy->serverlist_index = 0; // Wrap-around.
}
if (server->drop) {
- // Not using the RR policy, so unref it.
- if (grpc_lb_glb_trace.enabled()) {
- gpr_log(GPR_INFO, "[grpclb %p] Unreffing RR %p for drop", glb_policy,
- wc_arg->rr_policy);
- }
- GRPC_LB_POLICY_UNREF(wc_arg->rr_policy, "glb_pick_sync");
// Update client load reporting stats to indicate the number of
// dropped calls. Note that we have to do this here instead of in
// the client_load_reporting filter, because we do not create a
// subchannel call (and therefore no client_load_reporting filter)
// for dropped calls.
- GPR_ASSERT(wc_arg->client_stats != nullptr);
+ GPR_ASSERT(glb_policy->client_stats != nullptr);
grpc_grpclb_client_stats_add_call_dropped_locked(
- server->load_balance_token, wc_arg->client_stats);
- grpc_grpclb_client_stats_unref(wc_arg->client_stats);
+ server->load_balance_token, glb_policy->client_stats);
if (force_async) {
- GPR_ASSERT(wc_arg->wrapped_closure != nullptr);
- GRPC_CLOSURE_SCHED(wc_arg->wrapped_closure, GRPC_ERROR_NONE);
- gpr_free(wc_arg->free_when_done);
+ GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE);
+ gpr_free(pp);
return false;
}
- gpr_free(wc_arg->free_when_done);
+ gpr_free(pp);
return true;
}
}
+ // Set client_stats and user_data.
+ pp->client_stats = grpc_grpclb_client_stats_ref(glb_policy->client_stats);
+ GPR_ASSERT(pp->pick->user_data == nullptr);
+ pp->pick->user_data = (void**)&pp->lb_token;
// Pick via the RR policy.
- const bool pick_done = grpc_lb_policy_pick_locked(
- wc_arg->rr_policy, pick_args, target, wc_arg->context,
- (void**)&wc_arg->lb_token, &wc_arg->wrapper_closure);
+ bool pick_done = grpc_lb_policy_pick_locked(glb_policy->rr_policy, pp->pick);
if (pick_done) {
- /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */
- if (grpc_lb_glb_trace.enabled()) {
- gpr_log(GPR_INFO, "[grpclb %p] Unreffing RR %p", glb_policy,
- wc_arg->rr_policy);
- }
- GRPC_LB_POLICY_UNREF(wc_arg->rr_policy, "glb_pick_sync");
- /* add the load reporting initial metadata */
- initial_metadata_add_lb_token(pick_args->initial_metadata,
- pick_args->lb_token_mdelem_storage,
- GRPC_MDELEM_REF(wc_arg->lb_token));
- // Pass on client stats via context. Passes ownership of the reference.
- GPR_ASSERT(wc_arg->client_stats != nullptr);
- wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].value = wc_arg->client_stats;
- wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].destroy = destroy_client_stats;
+ pending_pick_set_metadata_and_context(pp);
if (force_async) {
- GPR_ASSERT(wc_arg->wrapped_closure != nullptr);
- GRPC_CLOSURE_SCHED(wc_arg->wrapped_closure, GRPC_ERROR_NONE);
- gpr_free(wc_arg->free_when_done);
- return false;
+ GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE);
+ pick_done = false;
}
- gpr_free(wc_arg->free_when_done);
+ gpr_free(pp);
}
/* else, the pending pick will be registered and taken care of by the
* pending pick list inside the RR policy (glb_policy->rr_policy).
@@ -756,7 +671,7 @@ static void lb_policy_args_destroy(grpc_lb_policy_args* args) {
gpr_free(args);
}
-static void glb_rr_connectivity_changed_locked(void* arg, grpc_error* error);
+static void on_rr_connectivity_changed_locked(void* arg, grpc_error* error);
static void create_rr_locked(glb_lb_policy* glb_policy,
grpc_lb_policy_args* args) {
GPR_ASSERT(glb_policy->rr_policy == nullptr);
@@ -778,72 +693,46 @@ static void create_rr_locked(glb_lb_policy* glb_policy,
glb_policy->base.request_reresolution = nullptr;
glb_policy->rr_policy = new_rr_policy;
grpc_error* rr_state_error = nullptr;
- const grpc_connectivity_state rr_state =
- grpc_lb_policy_check_connectivity_locked(glb_policy->rr_policy,
- &rr_state_error);
+ glb_policy->rr_connectivity_state = grpc_lb_policy_check_connectivity_locked(
+ glb_policy->rr_policy, &rr_state_error);
/* Connectivity state is a function of the RR policy updated/created */
- update_lb_connectivity_status_locked(glb_policy, rr_state, rr_state_error);
+ update_lb_connectivity_status_locked(
+ glb_policy, glb_policy->rr_connectivity_state, rr_state_error);
/* Add the gRPC LB's interested_parties pollset_set to that of the newly
* created RR policy. This will make the RR policy progress upon activity on
* gRPC LB, which in turn is tied to the application's call */
grpc_pollset_set_add_pollset_set(glb_policy->rr_policy->interested_parties,
glb_policy->base.interested_parties);
-
- /* Allocate the data for the tracking of the new RR policy's connectivity.
- * It'll be deallocated in glb_rr_connectivity_changed() */
- rr_connectivity_data* rr_connectivity =
- (rr_connectivity_data*)gpr_zalloc(sizeof(rr_connectivity_data));
- GRPC_CLOSURE_INIT(&rr_connectivity->on_change,
- glb_rr_connectivity_changed_locked, rr_connectivity,
+ GRPC_CLOSURE_INIT(&glb_policy->on_rr_connectivity_changed,
+ on_rr_connectivity_changed_locked, glb_policy,
grpc_combiner_scheduler(glb_policy->base.combiner));
- rr_connectivity->glb_policy = glb_policy;
- rr_connectivity->state = rr_state;
-
/* Subscribe to changes to the connectivity of the new RR */
- GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "glb_rr_connectivity_cb");
- grpc_lb_policy_notify_on_state_change_locked(glb_policy->rr_policy,
- &rr_connectivity->state,
- &rr_connectivity->on_change);
+ GRPC_LB_POLICY_REF(&glb_policy->base, "glb_rr_connectivity_cb");
+ grpc_lb_policy_notify_on_state_change_locked(
+ glb_policy->rr_policy, &glb_policy->rr_connectivity_state,
+ &glb_policy->on_rr_connectivity_changed);
grpc_lb_policy_exit_idle_locked(glb_policy->rr_policy);
-
- /* Update picks and pings in wait */
+ // Send pending picks to RR policy.
pending_pick* pp;
while ((pp = glb_policy->pending_picks)) {
glb_policy->pending_picks = pp->next;
- GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_pick");
- pp->wrapped_on_complete_arg.rr_policy = glb_policy->rr_policy;
- pp->wrapped_on_complete_arg.client_stats =
- grpc_grpclb_client_stats_ref(glb_policy->client_stats);
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
"[grpclb %p] Pending pick about to (async) PICK from RR %p",
glb_policy, glb_policy->rr_policy);
}
- pick_from_internal_rr_locked(glb_policy, &pp->pick_args,
- true /* force_async */, pp->target,
- &pp->wrapped_on_complete_arg);
+ pick_from_internal_rr_locked(glb_policy, true /* force_async */, pp);
}
-
+ // Send pending pings to RR policy.
pending_ping* pping;
while ((pping = glb_policy->pending_pings)) {
glb_policy->pending_pings = pping->next;
- grpc_closure* on_initiate = nullptr;
- grpc_closure* on_ack = nullptr;
- if (pping->on_initiate != nullptr) {
- GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping");
- pping->on_initiate->rr_policy = glb_policy->rr_policy;
- on_initiate = &pping->on_initiate->wrapper_closure;
- }
- if (pping->on_ack != nullptr) {
- GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping");
- pping->on_ack->rr_policy = glb_policy->rr_policy;
- on_ack = &pping->on_ack->wrapper_closure;
- }
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO, "[grpclb %p] Pending ping about to PING from RR %p",
glb_policy, glb_policy->rr_policy);
}
- grpc_lb_policy_ping_one_locked(glb_policy->rr_policy, on_initiate, on_ack);
+ grpc_lb_policy_ping_one_locked(glb_policy->rr_policy, pping->on_initiate,
+ pping->on_ack);
gpr_free(pping);
}
}
@@ -869,31 +758,28 @@ static void rr_handover_locked(glb_lb_policy* glb_policy) {
lb_policy_args_destroy(args);
}
-static void glb_rr_connectivity_changed_locked(void* arg, grpc_error* error) {
- rr_connectivity_data* rr_connectivity = (rr_connectivity_data*)arg;
- glb_lb_policy* glb_policy = rr_connectivity->glb_policy;
+static void on_rr_connectivity_changed_locked(void* arg, grpc_error* error) {
+ glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
if (glb_policy->shutting_down) {
- GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "glb_rr_connectivity_cb");
- gpr_free(rr_connectivity);
+ GRPC_LB_POLICY_UNREF(&glb_policy->base, "glb_rr_connectivity_cb");
return;
}
- if (rr_connectivity->state == GRPC_CHANNEL_SHUTDOWN) {
+ if (glb_policy->rr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
/* An RR policy that has transitioned into the SHUTDOWN connectivity state
* should not be considered for picks or updates: the SHUTDOWN state is a
* sink, policies can't transition back from it. .*/
GRPC_LB_POLICY_UNREF(glb_policy->rr_policy, "rr_connectivity_shutdown");
glb_policy->rr_policy = nullptr;
- GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "glb_rr_connectivity_cb");
- gpr_free(rr_connectivity);
+ GRPC_LB_POLICY_UNREF(&glb_policy->base, "glb_rr_connectivity_cb");
return;
}
/* rr state != SHUTDOWN && !glb_policy->shutting down: biz as usual */
- update_lb_connectivity_status_locked(glb_policy, rr_connectivity->state,
- GRPC_ERROR_REF(error));
- /* Resubscribe. Reuse the "glb_rr_connectivity_cb" weak ref. */
- grpc_lb_policy_notify_on_state_change_locked(glb_policy->rr_policy,
- &rr_connectivity->state,
- &rr_connectivity->on_change);
+ update_lb_connectivity_status_locked(
+ glb_policy, glb_policy->rr_connectivity_state, GRPC_ERROR_REF(error));
+ /* Resubscribe. Reuse the "glb_rr_connectivity_cb" ref. */
+ grpc_lb_policy_notify_on_state_change_locked(
+ glb_policy->rr_policy, &glb_policy->rr_connectivity_state,
+ &glb_policy->on_rr_connectivity_changed);
}
static void destroy_balancer_name(void* balancer_name) {
@@ -1001,38 +887,27 @@ static void glb_destroy(grpc_lb_policy* pol) {
gpr_free(glb_policy);
}
-static void glb_shutdown_locked(grpc_lb_policy* pol) {
+static void glb_shutdown_locked(grpc_lb_policy* pol,
+ grpc_lb_policy* new_policy) {
glb_lb_policy* glb_policy = (glb_lb_policy*)pol;
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
glb_policy->shutting_down = true;
-
- /* We need a copy of the lb_call pointer because we can't cancell the call
- * while holding glb_policy->mu: lb_on_server_status_received, invoked due to
- * the cancel, needs to acquire that same lock */
- grpc_call* lb_call = glb_policy->lb_call;
-
/* glb_policy->lb_call and this local lb_call must be consistent at this point
* because glb_policy->lb_call is only assigned in lb_call_init_locked as part
* of query_for_backends_locked, which can only be invoked while
* glb_policy->shutting_down is false. */
- if (lb_call != nullptr) {
- grpc_call_cancel(lb_call, nullptr);
+ if (glb_policy->lb_call != nullptr) {
+ grpc_call_cancel(glb_policy->lb_call, nullptr);
/* lb_on_server_status_received will pick up the cancel and clean up */
}
- if (glb_policy->retry_timer_active) {
+ if (glb_policy->retry_timer_callback_pending) {
grpc_timer_cancel(&glb_policy->lb_call_retry_timer);
- glb_policy->retry_timer_active = false;
}
- if (glb_policy->fallback_timer_active) {
+ if (glb_policy->fallback_timer_callback_pending) {
grpc_timer_cancel(&glb_policy->lb_fallback_timer);
- glb_policy->fallback_timer_active = false;
}
-
- pending_pick* pp = glb_policy->pending_picks;
- glb_policy->pending_picks = nullptr;
- pending_ping* pping = glb_policy->pending_pings;
- glb_policy->pending_pings = nullptr;
if (glb_policy->rr_policy != nullptr) {
+ grpc_lb_policy_shutdown_locked(glb_policy->rr_policy, nullptr);
GRPC_LB_POLICY_UNREF(glb_policy->rr_policy, "glb_shutdown");
} else {
grpc_lb_policy_try_reresolve(pol, &grpc_lb_glb_trace, GRPC_ERROR_CANCELLED);
@@ -1047,28 +922,35 @@ static void glb_shutdown_locked(grpc_lb_policy* pol) {
}
grpc_connectivity_state_set(&glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "glb_shutdown");
-
+ // Clear pending picks.
+ pending_pick* pp = glb_policy->pending_picks;
+ glb_policy->pending_picks = nullptr;
while (pp != nullptr) {
pending_pick* next = pp->next;
- *pp->target = nullptr;
- GRPC_CLOSURE_SCHED(&pp->wrapped_on_complete_arg.wrapper_closure,
- GRPC_ERROR_REF(error));
- gpr_free(pp);
+ if (new_policy != nullptr) {
+ // Hand pick over to new policy.
+ if (pp->client_stats != nullptr) {
+ grpc_grpclb_client_stats_unref(pp->client_stats);
+ }
+ pp->pick->on_complete = pp->original_on_complete;
+ if (grpc_lb_policy_pick_locked(new_policy, pp->pick)) {
+ // Synchronous return; schedule callback.
+ GRPC_CLOSURE_SCHED(pp->pick->on_complete, GRPC_ERROR_NONE);
+ }
+ gpr_free(pp);
+ } else {
+ pp->pick->connected_subchannel.reset();
+ GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_REF(error));
+ }
pp = next;
}
-
+ // Clear pending pings.
+ pending_ping* pping = glb_policy->pending_pings;
+ glb_policy->pending_pings = nullptr;
while (pping != nullptr) {
pending_ping* next = pping->next;
- if (pping->on_initiate != nullptr) {
- GRPC_CLOSURE_SCHED(&pping->on_initiate->wrapper_closure,
- GRPC_ERROR_REF(error));
- gpr_free(pping->on_initiate);
- }
- if (pping->on_ack != nullptr) {
- GRPC_CLOSURE_SCHED(&pping->on_ack->wrapper_closure,
- GRPC_ERROR_REF(error));
- gpr_free(pping->on_ack);
- }
+ GRPC_CLOSURE_SCHED(pping->on_initiate, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(pping->on_ack, GRPC_ERROR_REF(error));
gpr_free(pping);
pping = next;
}
@@ -1086,16 +968,16 @@ static void glb_shutdown_locked(grpc_lb_policy* pol) {
// level (grpclb), inside the glb_policy->pending_picks list. To cancel these,
// we invoke the completion closure and set *target to nullptr right here.
static void glb_cancel_pick_locked(grpc_lb_policy* pol,
- grpc_connected_subchannel** target,
+ grpc_lb_policy_pick_state* pick,
grpc_error* error) {
glb_lb_policy* glb_policy = (glb_lb_policy*)pol;
pending_pick* pp = glb_policy->pending_picks;
glb_policy->pending_picks = nullptr;
while (pp != nullptr) {
pending_pick* next = pp->next;
- if (pp->target == target) {
- *target = nullptr;
- GRPC_CLOSURE_SCHED(&pp->wrapped_on_complete_arg.wrapper_closure,
+ if (pp->pick == pick) {
+ pick->connected_subchannel.reset();
+ GRPC_CLOSURE_SCHED(&pp->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick Cancelled", &error, 1));
} else {
@@ -1105,7 +987,7 @@ static void glb_cancel_pick_locked(grpc_lb_policy* pol,
pp = next;
}
if (glb_policy->rr_policy != nullptr) {
- grpc_lb_policy_cancel_pick_locked(glb_policy->rr_policy, target,
+ grpc_lb_policy_cancel_pick_locked(glb_policy->rr_policy, pick,
GRPC_ERROR_REF(error));
}
GRPC_ERROR_UNREF(error);
@@ -1130,9 +1012,9 @@ static void glb_cancel_picks_locked(grpc_lb_policy* pol,
glb_policy->pending_picks = nullptr;
while (pp != nullptr) {
pending_pick* next = pp->next;
- if ((pp->pick_args.initial_metadata_flags & initial_metadata_flags_mask) ==
+ if ((pp->pick->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
- GRPC_CLOSURE_SCHED(&pp->wrapped_on_complete_arg.wrapper_closure,
+ GRPC_CLOSURE_SCHED(&pp->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick Cancelled", &error, 1));
} else {
@@ -1154,20 +1036,21 @@ static void query_for_backends_locked(glb_lb_policy* glb_policy);
static void start_picking_locked(glb_lb_policy* glb_policy) {
/* start a timer to fall back */
if (glb_policy->lb_fallback_timeout_ms > 0 &&
- glb_policy->serverlist == nullptr && !glb_policy->fallback_timer_active) {
+ glb_policy->serverlist == nullptr &&
+ !glb_policy->fallback_timer_callback_pending) {
grpc_millis deadline =
grpc_core::ExecCtx::Get()->Now() + glb_policy->lb_fallback_timeout_ms;
- GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_fallback_timer");
+ GRPC_LB_POLICY_REF(&glb_policy->base, "grpclb_fallback_timer");
GRPC_CLOSURE_INIT(&glb_policy->lb_on_fallback, lb_on_fallback_timer_locked,
glb_policy,
grpc_combiner_scheduler(glb_policy->base.combiner));
- glb_policy->fallback_timer_active = true;
+ glb_policy->fallback_timer_callback_pending = true;
grpc_timer_init(&glb_policy->lb_fallback_timer, deadline,
&glb_policy->lb_on_fallback);
}
glb_policy->started_picking = true;
- grpc_backoff_reset(&glb_policy->lb_call_backoff_state);
+ glb_policy->lb_call_backoff->Reset();
query_for_backends_locked(glb_policy);
}
@@ -1179,19 +1062,9 @@ static void glb_exit_idle_locked(grpc_lb_policy* pol) {
}
static int glb_pick_locked(grpc_lb_policy* pol,
- const grpc_lb_policy_pick_args* pick_args,
- grpc_connected_subchannel** target,
- grpc_call_context_element* context, void** user_data,
- grpc_closure* on_complete) {
- if (pick_args->lb_token_mdelem_storage == nullptr) {
- *target = nullptr;
- GRPC_CLOSURE_SCHED(on_complete,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "No mdelem storage for the LB token. Load reporting "
- "won't work without it. Failing"));
- return 0;
- }
+ grpc_lb_policy_pick_state* pick) {
glb_lb_policy* glb_policy = (glb_lb_policy*)pol;
+ pending_pick* pp = pending_pick_create(glb_policy, pick);
bool pick_done = false;
if (glb_policy->rr_policy != nullptr) {
const grpc_connectivity_state rr_connectivity_state =
@@ -1199,7 +1072,7 @@ static int glb_pick_locked(grpc_lb_policy* pol,
nullptr);
// The glb_policy->rr_policy may have transitioned to SHUTDOWN but the
// callback registered to capture this event
- // (glb_rr_connectivity_changed_locked) may not have been invoked yet. We
+ // (on_rr_connectivity_changed_locked) may not have been invoked yet. We
// need to make sure we aren't trying to pick from a RR policy instance
// that's in shutdown.
if (rr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
@@ -1209,32 +1082,16 @@ static int glb_pick_locked(grpc_lb_policy* pol,
glb_policy, glb_policy->rr_policy,
grpc_connectivity_state_name(rr_connectivity_state));
}
- add_pending_pick(&glb_policy->pending_picks, pick_args, target, context,
- on_complete);
+ pending_pick_add(&glb_policy->pending_picks, pp);
pick_done = false;
} else { // RR not in shutdown
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO, "[grpclb %p] about to PICK from RR %p", glb_policy,
glb_policy->rr_policy);
}
- GRPC_LB_POLICY_REF(glb_policy->rr_policy, "glb_pick");
- wrapped_rr_closure_arg* wc_arg =
- (wrapped_rr_closure_arg*)gpr_zalloc(sizeof(wrapped_rr_closure_arg));
- GRPC_CLOSURE_INIT(&wc_arg->wrapper_closure, wrapped_rr_closure, wc_arg,
- grpc_schedule_on_exec_ctx);
- wc_arg->rr_policy = glb_policy->rr_policy;
- wc_arg->target = target;
- wc_arg->context = context;
GPR_ASSERT(glb_policy->client_stats != nullptr);
- wc_arg->client_stats =
- grpc_grpclb_client_stats_ref(glb_policy->client_stats);
- wc_arg->wrapped_closure = on_complete;
- wc_arg->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage;
- wc_arg->initial_metadata = pick_args->initial_metadata;
- wc_arg->free_when_done = wc_arg;
- wc_arg->glb_policy = pol;
- pick_done = pick_from_internal_rr_locked(
- glb_policy, pick_args, false /* force_async */, target, wc_arg);
+ pick_done =
+ pick_from_internal_rr_locked(glb_policy, false /* force_async */, pp);
}
} else { // glb_policy->rr_policy == NULL
if (grpc_lb_glb_trace.enabled()) {
@@ -1242,8 +1099,7 @@ static int glb_pick_locked(grpc_lb_policy* pol,
"[grpclb %p] No RR policy. Adding to grpclb's pending picks",
glb_policy);
}
- add_pending_pick(&glb_policy->pending_picks, pick_args, target, context,
- on_complete);
+ pending_pick_add(&glb_policy->pending_picks, pp);
if (!glb_policy->started_picking) {
start_picking_locked(glb_policy);
}
@@ -1265,7 +1121,7 @@ static void glb_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate,
if (glb_policy->rr_policy) {
grpc_lb_policy_ping_one_locked(glb_policy->rr_policy, on_initiate, on_ack);
} else {
- add_pending_ping(&glb_policy->pending_pings, on_initiate, on_ack);
+ pending_ping_add(&glb_policy->pending_pings, on_initiate, on_ack);
if (!glb_policy->started_picking) {
start_picking_locked(glb_policy);
}
@@ -1282,7 +1138,7 @@ static void glb_notify_on_state_change_locked(grpc_lb_policy* pol,
static void lb_call_on_retry_timer_locked(void* arg, grpc_error* error) {
glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
- glb_policy->retry_timer_active = false;
+ glb_policy->retry_timer_callback_pending = false;
if (!glb_policy->shutting_down && glb_policy->lb_call == nullptr &&
error == GRPC_ERROR_NONE) {
if (grpc_lb_glb_trace.enabled()) {
@@ -1290,43 +1146,42 @@ static void lb_call_on_retry_timer_locked(void* arg, grpc_error* error) {
}
query_for_backends_locked(glb_policy);
}
- GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "grpclb_retry_timer");
+ GRPC_LB_POLICY_UNREF(&glb_policy->base, "grpclb_retry_timer");
}
static void maybe_restart_lb_call(glb_lb_policy* glb_policy) {
if (glb_policy->started_picking && glb_policy->updating_lb_call) {
- if (glb_policy->retry_timer_active) {
+ if (glb_policy->retry_timer_callback_pending) {
grpc_timer_cancel(&glb_policy->lb_call_retry_timer);
}
if (!glb_policy->shutting_down) start_picking_locked(glb_policy);
glb_policy->updating_lb_call = false;
} else if (!glb_policy->shutting_down) {
/* if we aren't shutting down, restart the LB client call after some time */
- grpc_millis next_try = grpc_backoff_step(&glb_policy->lb_call_backoff_state)
- .next_attempt_start_time;
+ grpc_millis next_try = glb_policy->lb_call_backoff->NextAttemptTime();
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_DEBUG, "[grpclb %p] Connection to LB server lost...",
glb_policy);
grpc_millis timeout = next_try - grpc_core::ExecCtx::Get()->Now();
if (timeout > 0) {
gpr_log(GPR_DEBUG,
- "[grpclb %p] ... retry_timer_active in %" PRIuPTR "ms.",
+ "[grpclb %p] ... retry LB call after %" PRIuPTR "ms.",
glb_policy, timeout);
} else {
- gpr_log(GPR_DEBUG, "[grpclb %p] ... retry_timer_active immediately.",
+ gpr_log(GPR_DEBUG, "[grpclb %p] ... retry LB call immediately.",
glb_policy);
}
}
- GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_retry_timer");
+ GRPC_LB_POLICY_REF(&glb_policy->base, "grpclb_retry_timer");
GRPC_CLOSURE_INIT(&glb_policy->lb_on_call_retry,
lb_call_on_retry_timer_locked, glb_policy,
grpc_combiner_scheduler(glb_policy->base.combiner));
- glb_policy->retry_timer_active = true;
+ glb_policy->retry_timer_callback_pending = true;
grpc_timer_init(&glb_policy->lb_call_retry_timer, next_try,
&glb_policy->lb_on_call_retry);
}
- GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base,
- "lb_on_server_status_received_locked");
+ GRPC_LB_POLICY_UNREF(&glb_policy->base,
+ "lb_on_server_status_received_locked");
}
static void send_client_load_report_locked(void* arg, grpc_error* error);
@@ -1348,8 +1203,8 @@ static void client_load_report_done_locked(void* arg, grpc_error* error) {
grpc_byte_buffer_destroy(glb_policy->client_load_report_payload);
glb_policy->client_load_report_payload = nullptr;
if (error != GRPC_ERROR_NONE || glb_policy->lb_call == nullptr) {
- glb_policy->client_load_report_timer_pending = false;
- GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "client_load_report");
+ glb_policy->client_load_report_timer_callback_pending = false;
+ GRPC_LB_POLICY_UNREF(&glb_policy->base, "client_load_report");
if (glb_policy->lb_call == nullptr) {
maybe_restart_lb_call(glb_policy);
}
@@ -1358,6 +1213,22 @@ static void client_load_report_done_locked(void* arg, grpc_error* error) {
schedule_next_client_load_report(glb_policy);
}
+static void do_send_client_load_report_locked(glb_lb_policy* glb_policy) {
+ grpc_op op;
+ memset(&op, 0, sizeof(op));
+ op.op = GRPC_OP_SEND_MESSAGE;
+ op.data.send_message.send_message = glb_policy->client_load_report_payload;
+ GRPC_CLOSURE_INIT(&glb_policy->client_load_report_closure,
+ client_load_report_done_locked, glb_policy,
+ grpc_combiner_scheduler(glb_policy->base.combiner));
+ grpc_call_error call_error = grpc_call_start_batch_and_execute(
+ glb_policy->lb_call, &op, 1, &glb_policy->client_load_report_closure);
+ if (call_error != GRPC_CALL_OK) {
+ gpr_log(GPR_ERROR, "[grpclb %p] call_error=%d", glb_policy, call_error);
+ GPR_ASSERT(GRPC_CALL_OK == call_error);
+ }
+}
+
static bool load_report_counters_are_zero(grpc_grpclb_request* request) {
grpc_grpclb_dropped_call_counts* drop_entries =
(grpc_grpclb_dropped_call_counts*)
@@ -1373,8 +1244,8 @@ static bool load_report_counters_are_zero(grpc_grpclb_request* request) {
static void send_client_load_report_locked(void* arg, grpc_error* error) {
glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
if (error == GRPC_ERROR_CANCELLED || glb_policy->lb_call == nullptr) {
- glb_policy->client_load_report_timer_pending = false;
- GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "client_load_report");
+ glb_policy->client_load_report_timer_callback_pending = false;
+ GRPC_LB_POLICY_UNREF(&glb_policy->base, "client_load_report");
if (glb_policy->lb_call == nullptr) {
maybe_restart_lb_call(glb_policy);
}
@@ -1401,22 +1272,15 @@ static void send_client_load_report_locked(void* arg, grpc_error* error) {
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_slice_unref_internal(request_payload_slice);
grpc_grpclb_request_destroy(request);
- // Send load report message.
- grpc_op op;
- memset(&op, 0, sizeof(op));
- op.op = GRPC_OP_SEND_MESSAGE;
- op.data.send_message.send_message = glb_policy->client_load_report_payload;
- GRPC_CLOSURE_INIT(&glb_policy->client_load_report_closure,
- client_load_report_done_locked, glb_policy,
- grpc_combiner_scheduler(glb_policy->base.combiner));
- grpc_call_error call_error = grpc_call_start_batch_and_execute(
- glb_policy->lb_call, &op, 1, &glb_policy->client_load_report_closure);
- if (call_error != GRPC_CALL_OK) {
- gpr_log(GPR_ERROR, "[grpclb %p] call_error=%d", glb_policy, call_error);
- GPR_ASSERT(GRPC_CALL_OK == call_error);
+ // If we've already sent the initial request, then we can go ahead and send
+ // the load report. Otherwise, we need to wait until the initial request has
+ // been sent to send this (see lb_on_sent_initial_request_locked() below).
+ if (glb_policy->initial_request_sent) {
+ do_send_client_load_report_locked(glb_policy);
}
}
+static void lb_on_sent_initial_request_locked(void* arg, grpc_error* error);
static void lb_on_server_status_received_locked(void* arg, grpc_error* error);
static void lb_on_response_received_locked(void* arg, grpc_error* error);
static void lb_call_init_locked(glb_lb_policy* glb_policy) {
@@ -1456,6 +1320,9 @@ static void lb_call_init_locked(glb_lb_policy* glb_policy) {
grpc_slice_unref_internal(request_payload_slice);
grpc_grpclb_request_destroy(request);
+ GRPC_CLOSURE_INIT(&glb_policy->lb_on_sent_initial_request,
+ lb_on_sent_initial_request_locked, glb_policy,
+ grpc_combiner_scheduler(glb_policy->base.combiner));
GRPC_CLOSURE_INIT(&glb_policy->lb_on_server_status_received,
lb_on_server_status_received_locked, glb_policy,
grpc_combiner_scheduler(glb_policy->base.combiner));
@@ -1463,13 +1330,16 @@ static void lb_call_init_locked(glb_lb_policy* glb_policy) {
lb_on_response_received_locked, glb_policy,
grpc_combiner_scheduler(glb_policy->base.combiner));
- grpc_backoff_init(&glb_policy->lb_call_backoff_state,
- GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS * 1000,
- GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER,
- GRPC_GRPCLB_RECONNECT_JITTER,
- GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS * 1000,
- GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
+ grpc_core::BackOff::Options backoff_options;
+ backoff_options
+ .set_initial_backoff(GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS * 1000)
+ .set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER)
+ .set_jitter(GRPC_GRPCLB_RECONNECT_JITTER)
+ .set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
+
+ glb_policy->lb_call_backoff.Init(backoff_options);
+ glb_policy->initial_request_sent = false;
glb_policy->seen_initial_response = false;
glb_policy->last_client_load_report_counters_were_zero = false;
}
@@ -1485,7 +1355,7 @@ static void lb_call_destroy_locked(glb_lb_policy* glb_policy) {
grpc_byte_buffer_destroy(glb_policy->lb_request_payload);
grpc_slice_unref_internal(glb_policy->lb_call_status_details);
- if (glb_policy->client_load_report_timer_pending) {
+ if (glb_policy->client_load_report_timer_callback_pending) {
grpc_timer_cancel(&glb_policy->client_load_report_timer);
}
}
@@ -1528,8 +1398,11 @@ static void query_for_backends_locked(glb_lb_policy* glb_policy) {
op->flags = 0;
op->reserved = nullptr;
op++;
- call_error = grpc_call_start_batch_and_execute(glb_policy->lb_call, ops,
- (size_t)(op - ops), nullptr);
+ /* take a ref to be released in lb_on_sent_initial_request_locked() */
+ GRPC_LB_POLICY_REF(&glb_policy->base, "lb_on_sent_initial_request_locked");
+ call_error = grpc_call_start_batch_and_execute(
+ glb_policy->lb_call, ops, (size_t)(op - ops),
+ &glb_policy->lb_on_sent_initial_request);
GPR_ASSERT(GRPC_CALL_OK == call_error);
op = ops;
@@ -1542,10 +1415,8 @@ static void query_for_backends_locked(glb_lb_policy* glb_policy) {
op->flags = 0;
op->reserved = nullptr;
op++;
- /* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref
- * count goes to zero) to be unref'd in lb_on_server_status_received_locked */
- GRPC_LB_POLICY_WEAK_REF(&glb_policy->base,
- "lb_on_server_status_received_locked");
+ /* take a ref to be released in lb_on_server_status_received_locked() */
+ GRPC_LB_POLICY_REF(&glb_policy->base, "lb_on_server_status_received_locked");
call_error = grpc_call_start_batch_and_execute(
glb_policy->lb_call, ops, (size_t)(op - ops),
&glb_policy->lb_on_server_status_received);
@@ -1557,22 +1428,32 @@ static void query_for_backends_locked(glb_lb_policy* glb_policy) {
op->flags = 0;
op->reserved = nullptr;
op++;
- /* take another weak ref to be unref'd/reused in
- * lb_on_response_received_locked */
- GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_response_received_locked");
+ /* take a ref to be unref'd/reused in lb_on_response_received_locked() */
+ GRPC_LB_POLICY_REF(&glb_policy->base, "lb_on_response_received_locked");
call_error = grpc_call_start_batch_and_execute(
glb_policy->lb_call, ops, (size_t)(op - ops),
&glb_policy->lb_on_response_received);
GPR_ASSERT(GRPC_CALL_OK == call_error);
}
+static void lb_on_sent_initial_request_locked(void* arg, grpc_error* error) {
+ glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
+ glb_policy->initial_request_sent = true;
+ // If we attempted to send a client load report before the initial request was
+ // sent, send the load report now.
+ if (glb_policy->client_load_report_payload != nullptr) {
+ do_send_client_load_report_locked(glb_policy);
+ }
+ GRPC_LB_POLICY_UNREF(&glb_policy->base, "lb_on_sent_initial_request_locked");
+}
+
static void lb_on_response_received_locked(void* arg, grpc_error* error) {
glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
grpc_op ops[2];
memset(ops, 0, sizeof(ops));
grpc_op* op = ops;
if (glb_policy->lb_response_payload != nullptr) {
- grpc_backoff_reset(&glb_policy->lb_call_backoff_state);
+ glb_policy->lb_call_backoff->Reset();
/* Received data from the LB server. Look inside
* glb_policy->lb_response_payload, for a serverlist. */
grpc_byte_buffer_reader bbr;
@@ -1595,11 +1476,9 @@ static void lb_on_response_received_locked(void* arg, grpc_error* error) {
"client load reporting interval = %" PRIdPTR " milliseconds",
glb_policy, glb_policy->client_stats_report_interval);
}
- /* take a weak ref (won't prevent calling of \a glb_shutdown() if the
- * strong ref count goes to zero) to be unref'd in
- * send_client_load_report_locked() */
- glb_policy->client_load_report_timer_pending = true;
- GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "client_load_report");
+ /* take a ref to be unref'd in send_client_load_report_locked() */
+ glb_policy->client_load_report_timer_callback_pending = true;
+ GRPC_LB_POLICY_REF(&glb_policy->base, "client_load_report");
schedule_next_client_load_report(glb_policy);
} else if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
@@ -1647,9 +1526,8 @@ static void lb_on_response_received_locked(void* arg, grpc_error* error) {
/* or dispose of the fallback */
grpc_lb_addresses_destroy(glb_policy->fallback_backend_addresses);
glb_policy->fallback_backend_addresses = nullptr;
- if (glb_policy->fallback_timer_active) {
+ if (glb_policy->fallback_timer_callback_pending) {
grpc_timer_cancel(&glb_policy->lb_fallback_timer);
- glb_policy->fallback_timer_active = false;
}
}
/* and update the copy in the glb_lb_policy instance. This
@@ -1682,27 +1560,27 @@ static void lb_on_response_received_locked(void* arg, grpc_error* error) {
op->flags = 0;
op->reserved = nullptr;
op++;
- /* reuse the "lb_on_response_received_locked" weak ref taken in
+ /* reuse the "lb_on_response_received_locked" ref taken in
* query_for_backends_locked() */
const grpc_call_error call_error = grpc_call_start_batch_and_execute(
glb_policy->lb_call, ops, (size_t)(op - ops),
&glb_policy->lb_on_response_received); /* loop */
GPR_ASSERT(GRPC_CALL_OK == call_error);
} else {
- GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base,
- "lb_on_response_received_locked_shutdown");
+ GRPC_LB_POLICY_UNREF(&glb_policy->base,
+ "lb_on_response_received_locked_shutdown");
}
} else { /* empty payload: call cancelled. */
- /* dispose of the "lb_on_response_received_locked" weak ref taken in
+ /* dispose of the "lb_on_response_received_locked" ref taken in
* query_for_backends_locked() and reused in every reception loop */
- GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base,
- "lb_on_response_received_locked_empty_payload");
+ GRPC_LB_POLICY_UNREF(&glb_policy->base,
+ "lb_on_response_received_locked_empty_payload");
}
}
static void lb_on_fallback_timer_locked(void* arg, grpc_error* error) {
glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
- glb_policy->fallback_timer_active = false;
+ glb_policy->fallback_timer_callback_pending = false;
/* If we receive a serverlist after the timer fires but before this callback
* actually runs, don't fall back. */
if (glb_policy->serverlist == nullptr) {
@@ -1716,7 +1594,7 @@ static void lb_on_fallback_timer_locked(void* arg, grpc_error* error) {
rr_handover_locked(glb_policy);
}
}
- GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "grpclb_fallback_timer");
+ GRPC_LB_POLICY_UNREF(&glb_policy->base, "grpclb_fallback_timer");
}
static void lb_on_server_status_received_locked(void* arg, grpc_error* error) {
@@ -1737,7 +1615,7 @@ static void lb_on_server_status_received_locked(void* arg, grpc_error* error) {
// If the load report timer is still pending, we wait for it to be
// called before restarting the call. Otherwise, we restart the call
// here.
- if (!glb_policy->client_load_report_timer_pending) {
+ if (!glb_policy->client_load_report_timer_callback_pending) {
maybe_restart_lb_call(glb_policy);
}
}
@@ -1800,7 +1678,7 @@ static void glb_update_locked(grpc_lb_policy* policy,
grpc_channel_get_channel_stack(glb_policy->lb_channel));
GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
glb_policy->watching_lb_channel = true;
- GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "watch_lb_channel_connectivity");
+ GRPC_LB_POLICY_REF(&glb_policy->base, "watch_lb_channel_connectivity");
grpc_client_channel_watch_connectivity_state(
client_channel_elem,
grpc_polling_entity_create_from_pollset_set(
@@ -1847,9 +1725,8 @@ static void glb_lb_channel_on_connectivity_changed_cb(void* arg,
// lb_on_server_status_received() will pick up the cancel and reinit
// lb_call.
} else if (glb_policy->started_picking) {
- if (glb_policy->retry_timer_active) {
+ if (glb_policy->retry_timer_callback_pending) {
grpc_timer_cancel(&glb_policy->lb_call_retry_timer);
- glb_policy->retry_timer_active = false;
}
start_picking_locked(glb_policy);
}
@@ -1857,8 +1734,8 @@ static void glb_lb_channel_on_connectivity_changed_cb(void* arg,
case GRPC_CHANNEL_SHUTDOWN:
done:
glb_policy->watching_lb_channel = false;
- GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base,
- "watch_lb_channel_connectivity_cb_shutdown");
+ GRPC_LB_POLICY_UNREF(&glb_policy->base,
+ "watch_lb_channel_connectivity_cb_shutdown");
break;
}
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.cc
index a8ecea4212..1e7f34bdc7 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.cc
@@ -22,8 +22,8 @@
#include "src/core/ext/filters/client_channel/client_channel.h"
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h"
#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/gpr/string.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
-#include "src/core/lib/support/string.h"
grpc_channel* grpc_lb_policy_grpclb_create_lb_channel(
const char* lb_service_target_addresses,
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc
index 76bcddf945..15233d371c 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc
@@ -22,11 +22,11 @@
#include "src/core/ext/filters/client_channel/client_channel.h"
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h"
#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/gpr/string.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/security/credentials/credentials.h"
#include "src/core/lib/security/transport/lb_targets_info.h"
#include "src/core/lib/slice/slice_internal.h"
-#include "src/core/lib/support/string.h"
grpc_channel* grpc_lb_policy_grpclb_create_lb_channel(
const char* lb_service_target_addresses,
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index 0861261359..725b78d478 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -31,13 +31,6 @@
grpc_core::TraceFlag grpc_lb_pick_first_trace(false, "pick_first");
-typedef struct pending_pick {
- struct pending_pick* next;
- uint32_t initial_metadata_flags;
- grpc_connected_subchannel** target;
- grpc_closure* on_complete;
-} pending_pick;
-
typedef struct {
/** base policy: must be first */
grpc_lb_policy base;
@@ -52,7 +45,7 @@ typedef struct {
/** are we shut down? */
bool shutdown;
/** list of picks that are waiting on connectivity */
- pending_pick* pending_picks;
+ grpc_lb_policy_pick_state* pending_picks;
/** our connectivity state tracker */
grpc_connectivity_state_tracker state_tracker;
} pick_first_lb_policy;
@@ -70,19 +63,27 @@ static void pf_destroy(grpc_lb_policy* pol) {
}
}
-static void pf_shutdown_locked(grpc_lb_policy* pol) {
+static void pf_shutdown_locked(grpc_lb_policy* pol,
+ grpc_lb_policy* new_policy) {
pick_first_lb_policy* p = (pick_first_lb_policy*)pol;
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_DEBUG, "Pick First %p Shutting down", p);
}
p->shutdown = true;
- pending_pick* pp;
- while ((pp = p->pending_picks) != nullptr) {
- p->pending_picks = pp->next;
- *pp->target = nullptr;
- GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_REF(error));
- gpr_free(pp);
+ grpc_lb_policy_pick_state* pick;
+ while ((pick = p->pending_picks) != nullptr) {
+ p->pending_picks = pick->next;
+ if (new_policy != nullptr) {
+ // Hand off to new LB policy.
+ if (grpc_lb_policy_pick_locked(new_policy, pick)) {
+ // Synchronous return, schedule closure.
+ GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
+ }
+ } else {
+ pick->connected_subchannel.reset();
+ GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error));
+ }
}
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "shutdown");
@@ -102,19 +103,18 @@ static void pf_shutdown_locked(grpc_lb_policy* pol) {
}
static void pf_cancel_pick_locked(grpc_lb_policy* pol,
- grpc_connected_subchannel** target,
+ grpc_lb_policy_pick_state* pick,
grpc_error* error) {
pick_first_lb_policy* p = (pick_first_lb_policy*)pol;
- pending_pick* pp = p->pending_picks;
+ grpc_lb_policy_pick_state* pp = p->pending_picks;
p->pending_picks = nullptr;
while (pp != nullptr) {
- pending_pick* next = pp->next;
- if (pp->target == target) {
- *target = nullptr;
- GRPC_CLOSURE_SCHED(pp->on_complete,
+ grpc_lb_policy_pick_state* next = pp->next;
+ if (pp == pick) {
+ pick->connected_subchannel.reset();
+ GRPC_CLOSURE_SCHED(pick->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick Cancelled", &error, 1));
- gpr_free(pp);
} else {
pp->next = p->pending_picks;
p->pending_picks = pp;
@@ -129,21 +129,20 @@ static void pf_cancel_picks_locked(grpc_lb_policy* pol,
uint32_t initial_metadata_flags_eq,
grpc_error* error) {
pick_first_lb_policy* p = (pick_first_lb_policy*)pol;
- pending_pick* pp = p->pending_picks;
+ grpc_lb_policy_pick_state* pick = p->pending_picks;
p->pending_picks = nullptr;
- while (pp != nullptr) {
- pending_pick* next = pp->next;
- if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
+ while (pick != nullptr) {
+ grpc_lb_policy_pick_state* next = pick->next;
+ if ((pick->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
- GRPC_CLOSURE_SCHED(pp->on_complete,
+ GRPC_CLOSURE_SCHED(pick->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick Cancelled", &error, 1));
- gpr_free(pp);
} else {
- pp->next = p->pending_picks;
- p->pending_picks = pp;
+ pick->next = p->pending_picks;
+ p->pending_picks = pick;
}
- pp = next;
+ pick = next;
}
GRPC_ERROR_UNREF(error);
}
@@ -173,27 +172,19 @@ static void pf_exit_idle_locked(grpc_lb_policy* pol) {
}
static int pf_pick_locked(grpc_lb_policy* pol,
- const grpc_lb_policy_pick_args* pick_args,
- grpc_connected_subchannel** target,
- grpc_call_context_element* context, void** user_data,
- grpc_closure* on_complete) {
+ grpc_lb_policy_pick_state* pick) {
pick_first_lb_policy* p = (pick_first_lb_policy*)pol;
// If we have a selected subchannel already, return synchronously.
if (p->selected != nullptr) {
- *target = GRPC_CONNECTED_SUBCHANNEL_REF(p->selected->connected_subchannel,
- "picked");
+ pick->connected_subchannel = p->selected->connected_subchannel;
return 1;
}
// No subchannel selected yet, so handle asynchronously.
if (!p->started_picking) {
start_picking_locked(p);
}
- pending_pick* pp = (pending_pick*)gpr_malloc(sizeof(*pp));
- pp->next = p->pending_picks;
- pp->target = target;
- pp->initial_metadata_flags = pick_args->initial_metadata_flags;
- pp->on_complete = on_complete;
- p->pending_picks = pp;
+ pick->next = p->pending_picks;
+ p->pending_picks = pick;
return 0;
}
@@ -225,8 +216,7 @@ static void pf_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate,
grpc_closure* on_ack) {
pick_first_lb_policy* p = (pick_first_lb_policy*)pol;
if (p->selected) {
- grpc_connected_subchannel_ping(p->selected->connected_subchannel,
- on_initiate, on_ack);
+ p->selected->connected_subchannel->Ping(on_initiate, on_ack);
} else {
GRPC_CLOSURE_SCHED(on_initiate,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected"));
@@ -305,8 +295,7 @@ static void pf_update_locked(grpc_lb_policy* policy,
subchannel_list->num_subchannels);
}
if (p->selected->connected_subchannel != nullptr) {
- sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
- p->selected->connected_subchannel, "pf_update_includes_selected");
+ sd->connected_subchannel = p->selected->connected_subchannel;
}
p->selected = sd;
if (p->subchannel_list != nullptr) {
@@ -418,8 +407,8 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
// re-resolution is introduced. But we need to investigate whether we
// really want to take any action instead of waiting for the selected
// subchannel reconnecting.
- if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN ||
- sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN);
+ if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
// If the selected channel goes bad, request a re-resolution.
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE,
GRPC_ERROR_NONE,
@@ -427,20 +416,19 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
p->started_picking = false;
grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_pick_first_trace,
GRPC_ERROR_NONE);
+ // in transient failure. Rely on re-resolution to recover.
+ p->selected = nullptr;
+ grpc_lb_subchannel_data_stop_connectivity_watch(sd);
+ grpc_lb_subchannel_list_unref_for_connectivity_watch(
+ sd->subchannel_list, "pf_selected_shutdown");
+ grpc_lb_subchannel_data_unref_subchannel(
+ sd, "pf_selected_shutdown"); // Unrefs connected subchannel
} else {
grpc_connectivity_state_set(&p->state_tracker,
sd->curr_connectivity_state,
GRPC_ERROR_REF(error), "selected_changed");
- }
- if (sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN) {
// Renew notification.
grpc_lb_subchannel_data_start_connectivity_watch(sd);
- } else {
- p->selected = nullptr;
- grpc_lb_subchannel_data_stop_connectivity_watch(sd);
- grpc_lb_subchannel_list_unref_for_connectivity_watch(
- sd->subchannel_list, "pf_selected_shutdown");
- grpc_lb_subchannel_data_unref_subchannel(sd, "pf_selected_shutdown");
}
}
return;
@@ -458,6 +446,8 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
case GRPC_CHANNEL_READY: {
// Case 2. Promote p->latest_pending_subchannel_list to
// p->subchannel_list.
+ sd->connected_subchannel =
+ grpc_subchannel_get_connected_subchannel(sd->subchannel);
if (sd->subchannel_list == p->latest_pending_subchannel_list) {
GPR_ASSERT(p->subchannel_list != nullptr);
grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list,
@@ -468,9 +458,6 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
// Cases 1 and 2.
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY,
GRPC_ERROR_NONE, "connecting_ready");
- sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
- grpc_subchannel_get_connected_subchannel(sd->subchannel),
- "connected");
p->selected = sd;
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", (void*)p,
@@ -479,18 +466,16 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
// Drop all other subchannels, since we are now connected.
destroy_unselected_subchannels_locked(p);
// Update any calls that were waiting for a pick.
- pending_pick* pp;
- while ((pp = p->pending_picks)) {
- p->pending_picks = pp->next;
- *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(
- p->selected->connected_subchannel, "picked");
+ grpc_lb_policy_pick_state* pick;
+ while ((pick = p->pending_picks)) {
+ p->pending_picks = pick->next;
+ pick->connected_subchannel = p->selected->connected_subchannel;
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO,
"Servicing pending pick with selected subchannel %p",
(void*)p->selected);
}
- GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_NONE);
- gpr_free(pp);
+ GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
}
// Renew notification.
grpc_lb_subchannel_data_start_connectivity_watch(sd);
@@ -529,39 +514,8 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
grpc_lb_subchannel_data_start_connectivity_watch(sd);
break;
}
- case GRPC_CHANNEL_SHUTDOWN: {
- grpc_lb_subchannel_data_stop_connectivity_watch(sd);
- grpc_lb_subchannel_data_unref_subchannel(sd, "pf_candidate_shutdown");
- // Advance to next subchannel and check its state.
- grpc_lb_subchannel_data* original_sd = sd;
- do {
- sd->subchannel_list->checking_subchannel =
- (sd->subchannel_list->checking_subchannel + 1) %
- sd->subchannel_list->num_subchannels;
- sd = &sd->subchannel_list
- ->subchannels[sd->subchannel_list->checking_subchannel];
- } while (sd->subchannel == nullptr && sd != original_sd);
- if (sd == original_sd) {
- grpc_lb_subchannel_list_unref_for_connectivity_watch(
- sd->subchannel_list, "pf_exhausted_subchannels");
- if (sd->subchannel_list == p->subchannel_list) {
- grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE,
- GRPC_ERROR_NONE,
- "exhausted_subchannels+reresolve");
- p->started_picking = false;
- grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_pick_first_trace,
- GRPC_ERROR_NONE);
- }
- } else {
- if (sd->subchannel_list == p->subchannel_list) {
- grpc_connectivity_state_set(
- &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
- GRPC_ERROR_REF(error), "subchannel_failed");
- }
- // Reuses the connectivity refs from the previous watch.
- grpc_lb_subchannel_data_start_connectivity_watch(sd);
- }
- }
+ case GRPC_CHANNEL_SHUTDOWN:
+ GPR_UNREACHABLE_CODE(break);
}
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index b0c84017df..e217a0b0c0 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -34,6 +34,7 @@
#include "src/core/ext/filters/client_channel/subchannel_index.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/gpr++/ref_counted_ptr.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/transport/connectivity_state.h"
@@ -41,29 +42,6 @@
grpc_core::TraceFlag grpc_lb_round_robin_trace(false, "round_robin");
-/** List of entities waiting for a pick.
- *
- * Once a pick is available, \a target is updated and \a on_complete called. */
-typedef struct pending_pick {
- struct pending_pick* next;
-
- /* output argument where to store the pick()ed user_data. It'll be NULL if no
- * such data is present or there's an error (the definite test for errors is
- * \a target being NULL). */
- void** user_data;
-
- /* bitmask passed to pick() and used for selective cancelling. See
- * grpc_lb_policy_cancel_picks() */
- uint32_t initial_metadata_flags;
-
- /* output argument where to store the pick()ed connected subchannel, or NULL
- * upon error. */
- grpc_connected_subchannel** target;
-
- /* to be invoked once the pick() has completed (regardless of success) */
- grpc_closure* on_complete;
-} pending_pick;
-
typedef struct round_robin_lb_policy {
/** base policy: must be first */
grpc_lb_policy base;
@@ -75,7 +53,7 @@ typedef struct round_robin_lb_policy {
/** are we shutting down? */
bool shutdown;
/** List of picks that are waiting on connectivity */
- pending_pick* pending_picks;
+ grpc_lb_policy_pick_state* pending_picks;
/** our connectivity state tracker */
grpc_connectivity_state_tracker state_tracker;
@@ -150,7 +128,7 @@ static void update_last_ready_subchannel_index_locked(round_robin_lb_policy* p,
(void*)p, (unsigned long)last_ready_index,
(void*)p->subchannel_list->subchannels[last_ready_index].subchannel,
(void*)p->subchannel_list->subchannels[last_ready_index]
- .connected_subchannel);
+ .connected_subchannel.get());
}
}
@@ -167,19 +145,27 @@ static void rr_destroy(grpc_lb_policy* pol) {
gpr_free(p);
}
-static void rr_shutdown_locked(grpc_lb_policy* pol) {
+static void rr_shutdown_locked(grpc_lb_policy* pol,
+ grpc_lb_policy* new_policy) {
round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_DEBUG, "[RR %p] Shutting down", p);
}
p->shutdown = true;
- pending_pick* pp;
- while ((pp = p->pending_picks) != nullptr) {
- p->pending_picks = pp->next;
- *pp->target = nullptr;
- GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_REF(error));
- gpr_free(pp);
+ grpc_lb_policy_pick_state* pick;
+ while ((pick = p->pending_picks) != nullptr) {
+ p->pending_picks = pick->next;
+ if (new_policy != nullptr) {
+ // Hand off to new LB policy.
+ if (grpc_lb_policy_pick_locked(new_policy, pick)) {
+ // Synchronous return; schedule callback.
+ GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
+ }
+ } else {
+ pick->connected_subchannel.reset();
+ GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error));
+ }
}
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "rr_shutdown");
@@ -199,19 +185,18 @@ static void rr_shutdown_locked(grpc_lb_policy* pol) {
}
static void rr_cancel_pick_locked(grpc_lb_policy* pol,
- grpc_connected_subchannel** target,
+ grpc_lb_policy_pick_state* pick,
grpc_error* error) {
round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
- pending_pick* pp = p->pending_picks;
+ grpc_lb_policy_pick_state* pp = p->pending_picks;
p->pending_picks = nullptr;
while (pp != nullptr) {
- pending_pick* next = pp->next;
- if (pp->target == target) {
- *target = nullptr;
- GRPC_CLOSURE_SCHED(pp->on_complete,
+ grpc_lb_policy_pick_state* next = pp->next;
+ if (pp == pick) {
+ pick->connected_subchannel.reset();
+ GRPC_CLOSURE_SCHED(pick->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick cancelled", &error, 1));
- gpr_free(pp);
} else {
pp->next = p->pending_picks;
p->pending_picks = pp;
@@ -226,22 +211,21 @@ static void rr_cancel_picks_locked(grpc_lb_policy* pol,
uint32_t initial_metadata_flags_eq,
grpc_error* error) {
round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
- pending_pick* pp = p->pending_picks;
+ grpc_lb_policy_pick_state* pick = p->pending_picks;
p->pending_picks = nullptr;
- while (pp != nullptr) {
- pending_pick* next = pp->next;
- if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
+ while (pick != nullptr) {
+ grpc_lb_policy_pick_state* next = pick->next;
+ if ((pick->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
- *pp->target = nullptr;
- GRPC_CLOSURE_SCHED(pp->on_complete,
+ pick->connected_subchannel.reset();
+ GRPC_CLOSURE_SCHED(pick->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick cancelled", &error, 1));
- gpr_free(pp);
} else {
- pp->next = p->pending_picks;
- p->pending_picks = pp;
+ pick->next = p->pending_picks;
+ p->pending_picks = pick;
}
- pp = next;
+ pick = next;
}
GRPC_ERROR_UNREF(error);
}
@@ -266,13 +250,10 @@ static void rr_exit_idle_locked(grpc_lb_policy* pol) {
}
static int rr_pick_locked(grpc_lb_policy* pol,
- const grpc_lb_policy_pick_args* pick_args,
- grpc_connected_subchannel** target,
- grpc_call_context_element* context, void** user_data,
- grpc_closure* on_complete) {
+ grpc_lb_policy_pick_state* pick) {
round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
if (grpc_lb_round_robin_trace.enabled()) {
- gpr_log(GPR_INFO, "[RR %p] Trying to pick (shutdown: %d)", (void*)pol,
+ gpr_log(GPR_INFO, "[RR %p] Trying to pick (shutdown: %d)", pol,
p->shutdown);
}
GPR_ASSERT(!p->shutdown);
@@ -282,18 +263,17 @@ static int rr_pick_locked(grpc_lb_policy* pol,
/* readily available, report right away */
grpc_lb_subchannel_data* sd =
&p->subchannel_list->subchannels[next_ready_index];
- *target =
- GRPC_CONNECTED_SUBCHANNEL_REF(sd->connected_subchannel, "rr_picked");
- if (user_data != nullptr) {
- *user_data = sd->user_data;
+ pick->connected_subchannel = sd->connected_subchannel;
+ if (pick->user_data != nullptr) {
+ *pick->user_data = sd->user_data;
}
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(
GPR_DEBUG,
"[RR %p] Picked target <-- Subchannel %p (connected %p) (sl %p, "
- "index %lu)",
- (void*)p, (void*)sd->subchannel, (void*)*target,
- (void*)sd->subchannel_list, (unsigned long)next_ready_index);
+ "index %" PRIuPTR ")",
+ p, sd->subchannel, pick->connected_subchannel.get(),
+ sd->subchannel_list, next_ready_index);
}
/* only advance the last picked pointer if the selection was used */
update_last_ready_subchannel_index_locked(p, next_ready_index);
@@ -304,27 +284,21 @@ static int rr_pick_locked(grpc_lb_policy* pol,
if (!p->started_picking) {
start_picking_locked(p);
}
- pending_pick* pp = (pending_pick*)gpr_malloc(sizeof(*pp));
- pp->next = p->pending_picks;
- pp->target = target;
- pp->on_complete = on_complete;
- pp->initial_metadata_flags = pick_args->initial_metadata_flags;
- pp->user_data = user_data;
- p->pending_picks = pp;
+ pick->next = p->pending_picks;
+ p->pending_picks = pick;
return 0;
}
static void update_state_counters_locked(grpc_lb_subchannel_data* sd) {
grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list;
+ GPR_ASSERT(sd->prev_connectivity_state != GRPC_CHANNEL_SHUTDOWN);
+ GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN);
if (sd->prev_connectivity_state == GRPC_CHANNEL_READY) {
GPR_ASSERT(subchannel_list->num_ready > 0);
--subchannel_list->num_ready;
} else if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
GPR_ASSERT(subchannel_list->num_transient_failures > 0);
--subchannel_list->num_transient_failures;
- } else if (sd->prev_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
- GPR_ASSERT(subchannel_list->num_shutdown > 0);
- --subchannel_list->num_shutdown;
} else if (sd->prev_connectivity_state == GRPC_CHANNEL_IDLE) {
GPR_ASSERT(subchannel_list->num_idle > 0);
--subchannel_list->num_idle;
@@ -334,8 +308,6 @@ static void update_state_counters_locked(grpc_lb_subchannel_data* sd) {
++subchannel_list->num_ready;
} else if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
++subchannel_list->num_transient_failures;
- } else if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
- ++subchannel_list->num_shutdown;
} else if (sd->curr_connectivity_state == GRPC_CHANNEL_IDLE) {
++subchannel_list->num_idle;
}
@@ -435,6 +407,7 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
// either the current or latest pending subchannel lists.
GPR_ASSERT(sd->subchannel_list == p->subchannel_list ||
sd->subchannel_list == p->latest_pending_subchannel_list);
+ GPR_ASSERT(sd->pending_connectivity_state_unsafe != GRPC_CHANNEL_SHUTDOWN);
// Now that we're inside the combiner, copy the pending connectivity
// state (which was set by the connectivity state watcher) to
// curr_connectivity_state, which is what we use inside of the combiner.
@@ -442,18 +415,17 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
// Update state counters and new overall state.
update_state_counters_locked(sd);
update_lb_connectivity_status_locked(sd, GRPC_ERROR_REF(error));
- // If the sd's new state is SHUTDOWN, unref the subchannel.
- if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
- grpc_lb_subchannel_data_stop_connectivity_watch(sd);
- grpc_lb_subchannel_data_unref_subchannel(sd, "rr_connectivity_shutdown");
- grpc_lb_subchannel_list_unref_for_connectivity_watch(
- sd->subchannel_list, "rr_connectivity_shutdown");
- } else { // sd not in SHUTDOWN
- if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) {
+ // If the sd's new state is TRANSIENT_FAILURE, unref the *connected*
+ // subchannel, if any.
+ switch (sd->curr_connectivity_state) {
+ case GRPC_CHANNEL_TRANSIENT_FAILURE: {
+ sd->connected_subchannel.reset();
+ break;
+ }
+ case GRPC_CHANNEL_READY: {
if (sd->connected_subchannel == nullptr) {
- sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
- grpc_subchannel_get_connected_subchannel(sd->subchannel),
- "connected");
+ sd->connected_subchannel =
+ grpc_subchannel_get_connected_subchannel(sd->subchannel);
}
if (sd->subchannel_list != p->subchannel_list) {
// promote sd->subchannel_list to p->subchannel_list.
@@ -493,13 +465,12 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
// picks, update the last picked pointer
update_last_ready_subchannel_index_locked(p, next_ready_index);
}
- pending_pick* pp;
- while ((pp = p->pending_picks)) {
- p->pending_picks = pp->next;
- *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(
- selected->connected_subchannel, "rr_picked");
- if (pp->user_data != nullptr) {
- *pp->user_data = selected->user_data;
+ grpc_lb_policy_pick_state* pick;
+ while ((pick = p->pending_picks)) {
+ p->pending_picks = pick->next;
+ pick->connected_subchannel = selected->connected_subchannel;
+ if (pick->user_data != nullptr) {
+ *pick->user_data = selected->user_data;
}
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_DEBUG,
@@ -508,13 +479,17 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
(void*)p, (void*)selected->subchannel,
(void*)p->subchannel_list, (unsigned long)next_ready_index);
}
- GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_NONE);
- gpr_free(pp);
+ GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
}
+ break;
}
- // Renew notification.
- grpc_lb_subchannel_data_start_connectivity_watch(sd);
+ case GRPC_CHANNEL_SHUTDOWN:
+ GPR_UNREACHABLE_CODE(return );
+ case GRPC_CHANNEL_CONNECTING:
+ case GRPC_CHANNEL_IDLE:; // fallthrough
}
+ // Renew notification.
+ grpc_lb_subchannel_data_start_connectivity_watch(sd);
}
static grpc_connectivity_state rr_check_connectivity_locked(
@@ -538,10 +513,9 @@ static void rr_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate,
if (next_ready_index < p->subchannel_list->num_subchannels) {
grpc_lb_subchannel_data* selected =
&p->subchannel_list->subchannels[next_ready_index];
- grpc_connected_subchannel* target = GRPC_CONNECTED_SUBCHANNEL_REF(
- selected->connected_subchannel, "rr_ping");
- grpc_connected_subchannel_ping(target, on_initiate, on_ack);
- GRPC_CONNECTED_SUBCHANNEL_UNREF(target, "rr_ping");
+ grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> target =
+ selected->connected_subchannel;
+ target->Ping(on_initiate, on_ack);
} else {
GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Round Robin not connected"));
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc
index a3b4c8e524..fa2ffcc796 100644
--- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc
@@ -42,10 +42,7 @@ void grpc_lb_subchannel_data_unref_subchannel(grpc_lb_subchannel_data* sd,
}
GRPC_SUBCHANNEL_UNREF(sd->subchannel, reason);
sd->subchannel = nullptr;
- if (sd->connected_subchannel != nullptr) {
- GRPC_CONNECTED_SUBCHANNEL_UNREF(sd->connected_subchannel, reason);
- sd->connected_subchannel = nullptr;
- }
+ sd->connected_subchannel.reset();
if (sd->user_data != nullptr) {
GPR_ASSERT(sd->user_data_vtable != nullptr);
sd->user_data_vtable->destroy(sd->user_data);
@@ -213,13 +210,13 @@ void grpc_lb_subchannel_list_unref(grpc_lb_subchannel_list* subchannel_list,
void grpc_lb_subchannel_list_ref_for_connectivity_watch(
grpc_lb_subchannel_list* subchannel_list, const char* reason) {
- GRPC_LB_POLICY_WEAK_REF(subchannel_list->policy, reason);
+ GRPC_LB_POLICY_REF(subchannel_list->policy, reason);
grpc_lb_subchannel_list_ref(subchannel_list, reason);
}
void grpc_lb_subchannel_list_unref_for_connectivity_watch(
grpc_lb_subchannel_list* subchannel_list, const char* reason) {
- GRPC_LB_POLICY_WEAK_UNREF(subchannel_list->policy, reason);
+ GRPC_LB_POLICY_UNREF(subchannel_list->policy, reason);
grpc_lb_subchannel_list_unref(subchannel_list, reason);
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
index 0f8cea9347..f4e345def6 100644
--- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
+++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
@@ -22,6 +22,7 @@
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/gpr++/ref_counted_ptr.h"
#include "src/core/lib/transport/connectivity_state.h"
// TODO(roth): This code is intended to be shared between pick_first and
@@ -43,7 +44,7 @@ typedef struct {
grpc_lb_subchannel_list* subchannel_list;
/** subchannel itself */
grpc_subchannel* subchannel;
- grpc_connected_subchannel* connected_subchannel;
+ grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> connected_subchannel;
/** Is a connectivity notification pending? */
bool connectivity_notification_pending;
/** notification that connectivity has changed on subchannel */
diff --git a/src/core/ext/filters/client_channel/lb_policy_registry.cc b/src/core/ext/filters/client_channel/lb_policy_registry.cc
index edd0330c6a..8414504e8f 100644
--- a/src/core/ext/filters/client_channel/lb_policy_registry.cc
+++ b/src/core/ext/filters/client_channel/lb_policy_registry.cc
@@ -20,7 +20,7 @@
#include <string.h>
-#include "src/core/lib/support/string.h"
+#include "src/core/lib/gpr/string.h"
#define MAX_POLICIES 10
diff --git a/src/core/ext/filters/client_channel/parse_address.cc b/src/core/ext/filters/client_channel/parse_address.cc
index 39b1237c77..c3309e36a3 100644
--- a/src/core/ext/filters/client_channel/parse_address.cc
+++ b/src/core/ext/filters/client_channel/parse_address.cc
@@ -29,7 +29,7 @@
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
-#include "src/core/lib/support/string.h"
+#include "src/core/lib/gpr/string.h"
#ifdef GRPC_HAVE_UNIX_SOCKET
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
index 4ec4477c82..1efdc26d56 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
@@ -34,16 +34,16 @@
#include "src/core/ext/filters/client_channel/resolver_registry.h"
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/gpr++/manual_constructor.h"
+#include "src/core/lib/gpr/env.h"
+#include "src/core/lib/gpr/string.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/gethostname.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/json/json.h"
-#include "src/core/lib/support/env.h"
-#include "src/core/lib/support/string.h"
#include "src/core/lib/transport/service_config.h"
-#define GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS 1
#define GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS 1
#define GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER 1.6
#define GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS 120
@@ -89,7 +89,7 @@ typedef struct {
bool have_retry_timer;
grpc_timer retry_timer;
/** retry backoff state */
- grpc_backoff backoff_state;
+ grpc_core::ManualConstructor<grpc_core::BackOff> backoff;
/** currently resolving addresses */
grpc_lb_addresses* lb_addresses;
@@ -131,7 +131,7 @@ static void dns_ares_shutdown_locked(grpc_resolver* resolver) {
static void dns_ares_channel_saw_error_locked(grpc_resolver* resolver) {
ares_dns_resolver* r = (ares_dns_resolver*)resolver;
if (!r->resolving) {
- grpc_backoff_reset(&r->backoff_state);
+ r->backoff->Reset();
dns_ares_start_resolving_locked(r);
}
}
@@ -264,8 +264,7 @@ static void dns_ares_on_resolved_locked(void* arg, grpc_error* error) {
} else {
const char* msg = grpc_error_string(error);
gpr_log(GPR_DEBUG, "dns resolution failed: %s", msg);
- grpc_millis next_try =
- grpc_backoff_step(&r->backoff_state).next_attempt_start_time;
+ grpc_millis next_try = r->backoff->NextAttemptTime();
grpc_millis timeout = next_try - grpc_core::ExecCtx::Get()->Now();
gpr_log(GPR_INFO, "dns resolution failed (will retry): %s",
grpc_error_string(error));
@@ -298,7 +297,7 @@ static void dns_ares_next_locked(grpc_resolver* resolver,
r->next_completion = on_complete;
r->target_result = target_result;
if (r->resolved_version == 0 && !r->resolving) {
- grpc_backoff_reset(&r->backoff_state);
+ r->backoff->Reset();
dns_ares_start_resolving_locked(r);
} else {
dns_ares_maybe_finish_next_locked(r);
@@ -368,11 +367,13 @@ static grpc_resolver* dns_ares_create(grpc_resolver_args* args,
if (args->pollset_set != nullptr) {
grpc_pollset_set_add_pollset_set(r->interested_parties, args->pollset_set);
}
- grpc_backoff_init(
- &r->backoff_state, GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000,
- GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER, GRPC_DNS_RECONNECT_JITTER,
- GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS * 1000,
- GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
+ grpc_core::BackOff::Options backoff_options;
+ backoff_options
+ .set_initial_backoff(GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000)
+ .set_multiplier(GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER)
+ .set_jitter(GRPC_DNS_RECONNECT_JITTER)
+ .set_max_backoff(GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
+ r->backoff.Init(grpc_core::BackOff(backoff_options));
GRPC_CLOSURE_INIT(&r->dns_ares_on_retry_timer_locked,
dns_ares_on_retry_timer_locked, r,
grpc_combiner_scheduler(r->base.combiner));
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
index 40e264504c..2eb2a9b59d 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
@@ -30,10 +30,10 @@
#include <grpc/support/time.h>
#include <grpc/support/useful.h>
#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h"
+#include "src/core/lib/gpr/string.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
-#include "src/core/lib/support/string.h"
typedef struct fd_node {
/** the owner of this fd node */
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
index 3a870b2d06..2b35bdb605 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
@@ -36,12 +36,12 @@
#include "src/core/ext/filters/client_channel/parse_address.h"
#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h"
+#include "src/core/lib/gpr/string.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/nameser.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
-#include "src/core/lib/support/string.h"
static gpr_once g_basic_init = GPR_ONCE_INIT;
static gpr_mu g_init_mu;
diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
index 77698e97aa..66a03c5a85 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
@@ -29,13 +29,13 @@
#include "src/core/ext/filters/client_channel/resolver_registry.h"
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/gpr++/manual_constructor.h"
+#include "src/core/lib/gpr/env.h"
+#include "src/core/lib/gpr/string.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/timer.h"
-#include "src/core/lib/support/env.h"
-#include "src/core/lib/support/string.h"
-#define GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS 1
#define GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS 1
#define GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER 1.6
#define GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS 120
@@ -70,7 +70,7 @@ typedef struct {
grpc_timer retry_timer;
grpc_closure on_retry;
/** retry backoff state */
- grpc_backoff backoff_state;
+ grpc_core::ManualConstructor<grpc_core::BackOff> backoff;
/** currently resolving addresses */
grpc_resolved_addresses* addresses;
@@ -106,7 +106,7 @@ static void dns_shutdown_locked(grpc_resolver* resolver) {
static void dns_channel_saw_error_locked(grpc_resolver* resolver) {
dns_resolver* r = (dns_resolver*)resolver;
if (!r->resolving) {
- grpc_backoff_reset(&r->backoff_state);
+ r->backoff->Reset();
dns_start_resolving_locked(r);
}
}
@@ -119,7 +119,7 @@ static void dns_next_locked(grpc_resolver* resolver,
r->next_completion = on_complete;
r->target_result = target_result;
if (r->resolved_version == 0 && !r->resolving) {
- grpc_backoff_reset(&r->backoff_state);
+ r->backoff->Reset();
dns_start_resolving_locked(r);
} else {
dns_maybe_finish_next_locked(r);
@@ -161,8 +161,7 @@ static void dns_on_resolved_locked(void* arg, grpc_error* error) {
grpc_resolved_addresses_destroy(r->addresses);
grpc_lb_addresses_destroy(addresses);
} else {
- grpc_millis next_try =
- grpc_backoff_step(&r->backoff_state).next_attempt_start_time;
+ grpc_millis next_try = r->backoff->NextAttemptTime();
grpc_millis timeout = next_try - grpc_core::ExecCtx::Get()->Now();
gpr_log(GPR_INFO, "dns resolution failed (will retry): %s",
grpc_error_string(error));
@@ -244,11 +243,13 @@ static grpc_resolver* dns_create(grpc_resolver_args* args,
if (args->pollset_set != nullptr) {
grpc_pollset_set_add_pollset_set(r->interested_parties, args->pollset_set);
}
- grpc_backoff_init(
- &r->backoff_state, GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000,
- GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER, GRPC_DNS_RECONNECT_JITTER,
- GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS * 1000,
- GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
+ grpc_core::BackOff::Options backoff_options;
+ backoff_options
+ .set_initial_backoff(GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000)
+ .set_multiplier(GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER)
+ .set_jitter(GRPC_DNS_RECONNECT_JITTER)
+ .set_max_backoff(GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
+ r->backoff.Init(grpc_core::BackOff(backoff_options));
return &r->base;
}
diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc
index fe3ad1403c..eaa5e6ac49 100644
--- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc
+++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc
@@ -32,13 +32,13 @@
#include "src/core/ext/filters/client_channel/parse_address.h"
#include "src/core/ext/filters/client_channel/resolver_registry.h"
#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/gpr/string.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/unix_sockets_posix.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
-#include "src/core/lib/support/string.h"
#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
diff --git a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc
index 7d1e283fa3..99ad78e23c 100644
--- a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc
+++ b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc
@@ -30,12 +30,12 @@
#include "src/core/ext/filters/client_channel/parse_address.h"
#include "src/core/ext/filters/client_channel/resolver_registry.h"
#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/gpr/string.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/unix_sockets_posix.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
-#include "src/core/lib/support/string.h"
typedef struct {
/** base class: must be first */
diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc
index 84a5ace31d..bb43651d0c 100644
--- a/src/core/ext/filters/client_channel/subchannel.cc
+++ b/src/core/ext/filters/client_channel/subchannel.cc
@@ -20,7 +20,9 @@
#include <inttypes.h>
#include <limits.h>
-#include <string.h>
+
+#include <algorithm>
+#include <cstring>
#include <grpc/support/alloc.h>
#include <grpc/support/avl.h>
@@ -35,6 +37,8 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/connected_channel.h"
#include "src/core/lib/debug/stats.h"
+#include "src/core/lib/gpr++/debug_location.h"
+#include "src/core/lib/gpr++/manual_constructor.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/profiling/timers.h"
@@ -48,19 +52,17 @@
#define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1
#define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6
-#define GRPC_SUBCHANNEL_RECONNECT_MIN_BACKOFF_SECONDS 20
+#define GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS 20
#define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120
#define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2
-#define GET_CONNECTED_SUBCHANNEL(subchannel, barrier) \
- ((grpc_connected_subchannel*)(gpr_atm_##barrier##_load( \
- &(subchannel)->connected_subchannel)))
-
-typedef struct {
+namespace {
+struct state_watcher {
grpc_closure closure;
grpc_subchannel* subchannel;
grpc_connectivity_state connectivity_state;
-} state_watcher;
+};
+} // namespace
typedef struct external_state_watcher {
grpc_subchannel* subchannel;
@@ -93,7 +95,7 @@ struct grpc_subchannel {
grpc_connect_out_args connecting_result;
/** callback for connection finishing */
- grpc_closure connected;
+ grpc_closure on_connected;
/** callback for our alarm */
grpc_closure on_alarm;
@@ -102,12 +104,13 @@ struct grpc_subchannel {
being setup */
grpc_pollset_set* pollset_set;
- /** active connection, or null; of type grpc_connected_subchannel */
- gpr_atm connected_subchannel;
-
/** mutex protecting remaining elements */
gpr_mu mu;
+ /** active connection, or null; of type grpc_core::ConnectedSubchannel
+ */
+ grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> connected_subchannel;
+
/** have we seen a disconnection? */
bool disconnected;
/** are we connecting */
@@ -118,8 +121,9 @@ struct grpc_subchannel {
external_state_watcher root_external_state_watcher;
/** backoff state */
- grpc_backoff backoff_state;
- grpc_backoff_result backoff_result;
+ grpc_core::ManualConstructor<grpc_core::BackOff> backoff;
+ grpc_millis next_attempt_deadline;
+ grpc_millis min_connect_timeout_ms;
/** do we have an active alarm? */
bool have_alarm;
@@ -130,16 +134,15 @@ struct grpc_subchannel {
};
struct grpc_subchannel_call {
- grpc_connected_subchannel* connection;
+ grpc_core::ConnectedSubchannel* connection;
grpc_closure* schedule_closure_after_destroy;
};
#define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack*)((call) + 1))
-#define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack*)(con))
#define CALLSTACK_TO_SUBCHANNEL_CALL(callstack) \
(((grpc_subchannel_call*)(callstack)) - 1)
-static void subchannel_connected(void* subchannel, grpc_error* error);
+static void on_subchannel_connected(void* subchannel, grpc_error* error);
#ifndef NDEBUG
#define REF_REASON reason
@@ -157,20 +160,9 @@ static void subchannel_connected(void* subchannel, grpc_error* error);
*/
static void connection_destroy(void* arg, grpc_error* error) {
- grpc_connected_subchannel* c = (grpc_connected_subchannel*)arg;
- grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CONNECTION(c));
- gpr_free(c);
-}
-
-grpc_connected_subchannel* grpc_connected_subchannel_ref(
- grpc_connected_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
- GRPC_CHANNEL_STACK_REF(CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON);
- return c;
-}
-
-void grpc_connected_subchannel_unref(
- grpc_connected_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
- GRPC_CHANNEL_STACK_UNREF(CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON);
+ grpc_channel_stack* stk = (grpc_channel_stack*)arg;
+ grpc_channel_stack_destroy(stk);
+ gpr_free(stk);
}
/*
@@ -237,18 +229,13 @@ grpc_subchannel* grpc_subchannel_ref_from_weak_ref(
}
static void disconnect(grpc_subchannel* c) {
- grpc_connected_subchannel* con;
grpc_subchannel_index_unregister(c->key, c);
gpr_mu_lock(&c->mu);
GPR_ASSERT(!c->disconnected);
c->disconnected = true;
grpc_connector_shutdown(c->connector, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Subchannel disconnected"));
- con = GET_CONNECTED_SUBCHANNEL(c, no_barrier);
- if (con != nullptr) {
- GRPC_CONNECTED_SUBCHANNEL_UNREF(con, "connection");
- gpr_atm_no_barrier_store(&c->connected_subchannel, (gpr_atm)0xdeadbeef);
- }
+ c->connected_subchannel.reset();
gpr_mu_unlock(&c->mu);
}
@@ -274,6 +261,54 @@ void grpc_subchannel_weak_unref(
}
}
+static void parse_args_for_backoff_values(
+ const grpc_channel_args* args, grpc_core::BackOff::Options* backoff_options,
+ grpc_millis* min_connect_timeout_ms) {
+ grpc_millis initial_backoff_ms =
+ GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000;
+ *min_connect_timeout_ms =
+ GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS * 1000;
+ grpc_millis max_backoff_ms =
+ GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000;
+ bool fixed_reconnect_backoff = false;
+ if (args != nullptr) {
+ for (size_t i = 0; i < args->num_args; i++) {
+ if (0 == strcmp(args->args[i].key,
+ "grpc.testing.fixed_reconnect_backoff_ms")) {
+ fixed_reconnect_backoff = true;
+ initial_backoff_ms = *min_connect_timeout_ms = max_backoff_ms =
+ grpc_channel_arg_get_integer(
+ &args->args[i],
+ {static_cast<int>(initial_backoff_ms), 100, INT_MAX});
+ } else if (0 ==
+ strcmp(args->args[i].key, GRPC_ARG_MIN_RECONNECT_BACKOFF_MS)) {
+ fixed_reconnect_backoff = false;
+ *min_connect_timeout_ms = grpc_channel_arg_get_integer(
+ &args->args[i],
+ {static_cast<int>(*min_connect_timeout_ms), 100, INT_MAX});
+ } else if (0 ==
+ strcmp(args->args[i].key, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) {
+ fixed_reconnect_backoff = false;
+ max_backoff_ms = grpc_channel_arg_get_integer(
+ &args->args[i], {static_cast<int>(max_backoff_ms), 100, INT_MAX});
+ } else if (0 == strcmp(args->args[i].key,
+ GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS)) {
+ fixed_reconnect_backoff = false;
+ initial_backoff_ms = grpc_channel_arg_get_integer(
+ &args->args[i],
+ {static_cast<int>(initial_backoff_ms), 100, INT_MAX});
+ }
+ }
+ }
+ backoff_options->set_initial_backoff(initial_backoff_ms)
+ .set_multiplier(fixed_reconnect_backoff
+ ? 1.0
+ : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER)
+ .set_jitter(fixed_reconnect_backoff ? 0.0
+ : GRPC_SUBCHANNEL_RECONNECT_JITTER)
+ .set_max_backoff(max_backoff_ms);
+}
+
grpc_subchannel* grpc_subchannel_create(grpc_connector* connector,
const grpc_subchannel_args* args) {
grpc_subchannel_key* key = grpc_subchannel_key_create(args);
@@ -320,47 +355,14 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector,
if (new_args != nullptr) grpc_channel_args_destroy(new_args);
c->root_external_state_watcher.next = c->root_external_state_watcher.prev =
&c->root_external_state_watcher;
- GRPC_CLOSURE_INIT(&c->connected, subchannel_connected, c,
+ GRPC_CLOSURE_INIT(&c->on_connected, on_subchannel_connected, c,
grpc_schedule_on_exec_ctx);
grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE,
"subchannel");
- int initial_backoff_ms =
- GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000;
- int min_backoff_ms = GRPC_SUBCHANNEL_RECONNECT_MIN_BACKOFF_SECONDS * 1000;
- int max_backoff_ms = GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000;
- bool fixed_reconnect_backoff = false;
- if (c->args) {
- for (size_t i = 0; i < c->args->num_args; i++) {
- if (0 == strcmp(c->args->args[i].key,
- "grpc.testing.fixed_reconnect_backoff_ms")) {
- fixed_reconnect_backoff = true;
- initial_backoff_ms = min_backoff_ms = max_backoff_ms =
- grpc_channel_arg_get_integer(&c->args->args[i],
- {initial_backoff_ms, 100, INT_MAX});
- } else if (0 == strcmp(c->args->args[i].key,
- GRPC_ARG_MIN_RECONNECT_BACKOFF_MS)) {
- fixed_reconnect_backoff = false;
- min_backoff_ms = grpc_channel_arg_get_integer(
- &c->args->args[i], {min_backoff_ms, 100, INT_MAX});
- } else if (0 == strcmp(c->args->args[i].key,
- GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) {
- fixed_reconnect_backoff = false;
- max_backoff_ms = grpc_channel_arg_get_integer(
- &c->args->args[i], {max_backoff_ms, 100, INT_MAX});
- } else if (0 == strcmp(c->args->args[i].key,
- GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS)) {
- fixed_reconnect_backoff = false;
- initial_backoff_ms = grpc_channel_arg_get_integer(
- &c->args->args[i], {initial_backoff_ms, 100, INT_MAX});
- }
- }
- }
- grpc_backoff_init(
- &c->backoff_state, initial_backoff_ms,
- fixed_reconnect_backoff ? 1.0
- : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER,
- fixed_reconnect_backoff ? 0.0 : GRPC_SUBCHANNEL_RECONNECT_JITTER,
- min_backoff_ms, max_backoff_ms);
+ grpc_core::BackOff::Options backoff_options;
+ parse_args_for_backoff_values(args->args, &backoff_options,
+ &c->min_connect_timeout_ms);
+ c->backoff.Init(backoff_options);
gpr_mu_init(&c->mu);
return grpc_subchannel_index_register(key, c);
@@ -368,15 +370,16 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector,
static void continue_connect_locked(grpc_subchannel* c) {
grpc_connect_in_args args;
-
args.interested_parties = c->pollset_set;
- args.deadline = c->backoff_result.current_deadline;
+ const grpc_millis min_deadline =
+ c->min_connect_timeout_ms + grpc_core::ExecCtx::Get()->Now();
+ c->next_attempt_deadline = c->backoff->NextAttemptTime();
+ args.deadline = std::max(c->next_attempt_deadline, min_deadline);
args.channel_args = c->args;
-
grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_CONNECTING,
GRPC_ERROR_NONE, "state_change");
grpc_connector_connect(c->connector, &args, &c->connecting_result,
- &c->connected);
+ &c->on_connected);
}
grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel* c,
@@ -416,7 +419,6 @@ static void on_alarm(void* arg, grpc_error* error) {
}
if (error == GRPC_ERROR_NONE) {
gpr_log(GPR_INFO, "Failed to connect to channel, retrying");
- c->backoff_result = grpc_backoff_step(&c->backoff_state);
continue_connect_locked(c);
gpr_mu_unlock(&c->mu);
} else {
@@ -437,7 +439,7 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) {
return;
}
- if (GET_CONNECTED_SUBCHANNEL(c, no_barrier) != nullptr) {
+ if (c->connected_subchannel != nullptr) {
/* Already connected: don't restart */
return;
}
@@ -452,22 +454,20 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) {
if (!c->backoff_begun) {
c->backoff_begun = true;
- c->backoff_result = grpc_backoff_begin(&c->backoff_state);
continue_connect_locked(c);
} else {
GPR_ASSERT(!c->have_alarm);
c->have_alarm = true;
const grpc_millis time_til_next =
- c->backoff_result.next_attempt_start_time -
- grpc_core::ExecCtx::Get()->Now();
+ c->next_attempt_deadline - grpc_core::ExecCtx::Get()->Now();
if (time_til_next <= 0) {
- gpr_log(GPR_INFO, "Retry immediately");
+ gpr_log(GPR_INFO, "Subchannel %p: Retry immediately", c);
} else {
- gpr_log(GPR_INFO, "Retry in %" PRIdPTR " milliseconds", time_til_next);
+ gpr_log(GPR_INFO, "Subchannel %p: Retry in %" PRIdPTR " milliseconds", c,
+ time_til_next);
}
GRPC_CLOSURE_INIT(&c->on_alarm, on_alarm, c, grpc_schedule_on_exec_ctx);
- grpc_timer_init(&c->alarm, c->backoff_result.next_attempt_start_time,
- &c->on_alarm);
+ grpc_timer_init(&c->alarm, c->next_attempt_deadline, &c->on_alarm);
}
}
@@ -508,75 +508,56 @@ void grpc_subchannel_notify_on_state_change(
}
}
-void grpc_connected_subchannel_process_transport_op(
- grpc_connected_subchannel* con, grpc_transport_op* op) {
- grpc_channel_stack* channel_stack = CHANNEL_STACK_FROM_CONNECTION(con);
- grpc_channel_element* top_elem = grpc_channel_stack_element(channel_stack, 0);
- top_elem->filter->start_transport_op(top_elem, op);
-}
-
-static void subchannel_on_child_state_changed(void* p, grpc_error* error) {
- state_watcher* sw = (state_watcher*)p;
- grpc_subchannel* c = sw->subchannel;
+static void on_connected_subchannel_connectivity_changed(void* p,
+ grpc_error* error) {
+ state_watcher* connected_subchannel_watcher = (state_watcher*)p;
+ grpc_subchannel* c = connected_subchannel_watcher->subchannel;
gpr_mu* mu = &c->mu;
gpr_mu_lock(mu);
- /* if we failed just leave this closure */
- if (sw->connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
- /* any errors on a subchannel ==> we're done, create a new one */
- sw->connectivity_state = GRPC_CHANNEL_SHUTDOWN;
- }
- grpc_connectivity_state_set(&c->state_tracker, sw->connectivity_state,
- GRPC_ERROR_REF(error), "reflect_child");
- if (sw->connectivity_state != GRPC_CHANNEL_SHUTDOWN) {
- grpc_connected_subchannel_notify_on_state_change(
- GET_CONNECTED_SUBCHANNEL(c, no_barrier), nullptr,
- &sw->connectivity_state, &sw->closure);
- GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
- sw = nullptr;
+ switch (connected_subchannel_watcher->connectivity_state) {
+ case GRPC_CHANNEL_TRANSIENT_FAILURE:
+ case GRPC_CHANNEL_SHUTDOWN: {
+ if (!c->disconnected && c->connected_subchannel != nullptr) {
+ if (grpc_trace_stream_refcount.enabled()) {
+ gpr_log(GPR_INFO,
+ "Connected subchannel %p of subchannel %p has gone into %s. "
+ "Attempting to reconnect.",
+ c->connected_subchannel.get(), c,
+ grpc_connectivity_state_name(
+ connected_subchannel_watcher->connectivity_state));
+ }
+ c->connected_subchannel.reset();
+ grpc_connectivity_state_set(&c->state_tracker,
+ GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_REF(error), "reflect_child");
+ c->backoff_begun = false;
+ c->backoff->Reset();
+ maybe_start_connecting_locked(c);
+ } else {
+ connected_subchannel_watcher->connectivity_state =
+ GRPC_CHANNEL_SHUTDOWN;
+ }
+ break;
+ }
+ default: {
+ grpc_connectivity_state_set(
+ &c->state_tracker, connected_subchannel_watcher->connectivity_state,
+ GRPC_ERROR_REF(error), "reflect_child");
+ GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
+ c->connected_subchannel->NotifyOnStateChange(
+ nullptr, &connected_subchannel_watcher->connectivity_state,
+ &connected_subchannel_watcher->closure);
+ connected_subchannel_watcher = nullptr;
+ }
}
-
gpr_mu_unlock(mu);
GRPC_SUBCHANNEL_WEAK_UNREF(c, "state_watcher");
- gpr_free(sw);
-}
-
-static void connected_subchannel_state_op(grpc_connected_subchannel* con,
- grpc_pollset_set* interested_parties,
- grpc_connectivity_state* state,
- grpc_closure* closure) {
- grpc_transport_op* op = grpc_make_transport_op(nullptr);
- grpc_channel_element* elem;
- op->connectivity_state = state;
- op->on_connectivity_state_change = closure;
- op->bind_pollset_set = interested_parties;
- elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0);
- elem->filter->start_transport_op(elem, op);
-}
-
-void grpc_connected_subchannel_notify_on_state_change(
- grpc_connected_subchannel* con, grpc_pollset_set* interested_parties,
- grpc_connectivity_state* state, grpc_closure* closure) {
- connected_subchannel_state_op(con, interested_parties, state, closure);
-}
-
-void grpc_connected_subchannel_ping(grpc_connected_subchannel* con,
- grpc_closure* on_initiate,
- grpc_closure* on_ack) {
- grpc_transport_op* op = grpc_make_transport_op(nullptr);
- grpc_channel_element* elem;
- op->send_ping.on_initiate = on_initiate;
- op->send_ping.on_ack = on_ack;
- elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0);
- elem->filter->start_transport_op(elem, op);
+ gpr_free(connected_subchannel_watcher);
}
static bool publish_transport_locked(grpc_subchannel* c) {
- grpc_connected_subchannel* con;
- grpc_channel_stack* stk;
- state_watcher* sw_subchannel;
-
/* construct channel stack */
grpc_channel_stack_builder* builder = grpc_channel_stack_builder_create();
grpc_channel_stack_builder_set_channel_arguments(
@@ -588,8 +569,9 @@ static bool publish_transport_locked(grpc_subchannel* c) {
grpc_channel_stack_builder_destroy(builder);
return false;
}
+ grpc_channel_stack* stk;
grpc_error* error = grpc_channel_stack_builder_finish(
- builder, 0, 1, connection_destroy, nullptr, (void**)&con);
+ builder, 0, 1, connection_destroy, nullptr, (void**)&stk);
if (error != GRPC_ERROR_NONE) {
grpc_transport_destroy(c->connecting_result.transport);
gpr_log(GPR_ERROR, "error initializing subchannel stack: %s",
@@ -597,38 +579,37 @@ static bool publish_transport_locked(grpc_subchannel* c) {
GRPC_ERROR_UNREF(error);
return false;
}
- stk = CHANNEL_STACK_FROM_CONNECTION(con);
memset(&c->connecting_result, 0, sizeof(c->connecting_result));
/* initialize state watcher */
- sw_subchannel = (state_watcher*)gpr_malloc(sizeof(*sw_subchannel));
- sw_subchannel->subchannel = c;
- sw_subchannel->connectivity_state = GRPC_CHANNEL_READY;
- GRPC_CLOSURE_INIT(&sw_subchannel->closure, subchannel_on_child_state_changed,
- sw_subchannel, grpc_schedule_on_exec_ctx);
+ state_watcher* connected_subchannel_watcher =
+ (state_watcher*)gpr_zalloc(sizeof(*connected_subchannel_watcher));
+ connected_subchannel_watcher->subchannel = c;
+ connected_subchannel_watcher->connectivity_state = GRPC_CHANNEL_READY;
+ GRPC_CLOSURE_INIT(&connected_subchannel_watcher->closure,
+ on_connected_subchannel_connectivity_changed,
+ connected_subchannel_watcher, grpc_schedule_on_exec_ctx);
if (c->disconnected) {
- gpr_free(sw_subchannel);
+ gpr_free(connected_subchannel_watcher);
grpc_channel_stack_destroy(stk);
- gpr_free(con);
+ gpr_free(stk);
return false;
}
/* publish */
- /* TODO(ctiller): this full barrier seems to clear up a TSAN failure.
- I'd have expected the rel_cas below to be enough, but
- seemingly it's not.
- Re-evaluate if we really need this. */
- gpr_atm_full_barrier();
- GPR_ASSERT(gpr_atm_rel_cas(&c->connected_subchannel, 0, (gpr_atm)con));
+ c->connected_subchannel.reset(
+ grpc_core::New<grpc_core::ConnectedSubchannel>(stk));
+ gpr_log(GPR_INFO, "New connected subchannel at %p for subchannel %p",
+ c->connected_subchannel.get(), c);
/* setup subchannel watching connected subchannel for changes; subchannel
ref for connecting is donated to the state watcher */
GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting");
- grpc_connected_subchannel_notify_on_state_change(
- con, c->pollset_set, &sw_subchannel->connectivity_state,
- &sw_subchannel->closure);
+ c->connected_subchannel->NotifyOnStateChange(
+ c->pollset_set, &connected_subchannel_watcher->connectivity_state,
+ &connected_subchannel_watcher->closure);
/* signal completion */
grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_READY,
@@ -636,11 +617,11 @@ static bool publish_transport_locked(grpc_subchannel* c) {
return true;
}
-static void subchannel_connected(void* arg, grpc_error* error) {
+static void on_subchannel_connected(void* arg, grpc_error* error) {
grpc_subchannel* c = (grpc_subchannel*)arg;
grpc_channel_args* delete_channel_args = c->connecting_result.channel_args;
- GRPC_SUBCHANNEL_WEAK_REF(c, "connected");
+ GRPC_SUBCHANNEL_WEAK_REF(c, "on_subchannel_connected");
gpr_mu_lock(&c->mu);
c->connecting = false;
if (c->connecting_result.transport != nullptr &&
@@ -675,10 +656,10 @@ static void subchannel_call_destroy(void* call, grpc_error* error) {
grpc_subchannel_call* c = (grpc_subchannel_call*)call;
GPR_ASSERT(c->schedule_closure_after_destroy != nullptr);
GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0);
- grpc_connected_subchannel* connection = c->connection;
+ grpc_core::ConnectedSubchannel* connection = c->connection;
grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c), nullptr,
c->schedule_closure_after_destroy);
- GRPC_CONNECTED_SUBCHANNEL_UNREF(connection, "subchannel_call");
+ connection->Unref(DEBUG_LOCATION, "subchannel_call");
GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0);
}
@@ -709,9 +690,12 @@ void grpc_subchannel_call_process_op(grpc_subchannel_call* call,
GPR_TIMER_END("grpc_subchannel_call_process_op", 0);
}
-grpc_connected_subchannel* grpc_subchannel_get_connected_subchannel(
- grpc_subchannel* c) {
- return GET_CONNECTED_SUBCHANNEL(c, acq);
+grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel>
+grpc_subchannel_get_connected_subchannel(grpc_subchannel* c) {
+ gpr_mu_lock(&c->mu);
+ auto copy = c->connected_subchannel;
+ gpr_mu_unlock(&c->mu);
+ return copy;
}
const grpc_subchannel_key* grpc_subchannel_get_key(
@@ -719,36 +703,6 @@ const grpc_subchannel_key* grpc_subchannel_get_key(
return subchannel->key;
}
-grpc_error* grpc_connected_subchannel_create_call(
- grpc_connected_subchannel* con,
- const grpc_connected_subchannel_call_args* args,
- grpc_subchannel_call** call) {
- grpc_channel_stack* chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
- *call = (grpc_subchannel_call*)gpr_arena_alloc(
- args->arena, sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
- grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call);
- (*call)->connection = GRPC_CONNECTED_SUBCHANNEL_REF(con, "subchannel_call");
- const grpc_call_element_args call_args = {
- callstk, /* call_stack */
- nullptr, /* server_transport_data */
- args->context, /* context */
- args->path, /* path */
- args->start_time, /* start_time */
- args->deadline, /* deadline */
- args->arena, /* arena */
- args->call_combiner /* call_combiner */
- };
- grpc_error* error = grpc_call_stack_init(chanstk, 1, subchannel_call_destroy,
- *call, &call_args);
- if (error != GRPC_ERROR_NONE) {
- const char* error_string = grpc_error_string(error);
- gpr_log(GPR_ERROR, "error: %s", error_string);
- return error;
- }
- grpc_call_stack_set_pollset_or_pollset_set(callstk, args->pollent);
- return GRPC_ERROR_NONE;
-}
-
grpc_call_stack* grpc_subchannel_call_get_call_stack(
grpc_subchannel_call* subchannel_call) {
return SUBCHANNEL_CALL_TO_CALL_STACK(subchannel_call);
@@ -784,3 +738,64 @@ grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address* addr) {
(char*)GRPC_ARG_SUBCHANNEL_ADDRESS,
addr->len > 0 ? grpc_sockaddr_to_uri(addr) : gpr_strdup(""));
}
+
+namespace grpc_core {
+ConnectedSubchannel::ConnectedSubchannel(grpc_channel_stack* channel_stack)
+ : grpc_core::RefCountedWithTracing(&grpc_trace_stream_refcount),
+ channel_stack_(channel_stack) {}
+
+ConnectedSubchannel::~ConnectedSubchannel() {
+ GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor");
+}
+
+void ConnectedSubchannel::NotifyOnStateChange(
+ grpc_pollset_set* interested_parties, grpc_connectivity_state* state,
+ grpc_closure* closure) {
+ grpc_transport_op* op = grpc_make_transport_op(nullptr);
+ grpc_channel_element* elem;
+ op->connectivity_state = state;
+ op->on_connectivity_state_change = closure;
+ op->bind_pollset_set = interested_parties;
+ elem = grpc_channel_stack_element(channel_stack_, 0);
+ elem->filter->start_transport_op(elem, op);
+}
+
+void ConnectedSubchannel::Ping(grpc_closure* on_initiate,
+ grpc_closure* on_ack) {
+ grpc_transport_op* op = grpc_make_transport_op(nullptr);
+ grpc_channel_element* elem;
+ op->send_ping.on_initiate = on_initiate;
+ op->send_ping.on_ack = on_ack;
+ elem = grpc_channel_stack_element(channel_stack_, 0);
+ elem->filter->start_transport_op(elem, op);
+}
+
+grpc_error* ConnectedSubchannel::CreateCall(const CallArgs& args,
+ grpc_subchannel_call** call) {
+ *call = (grpc_subchannel_call*)gpr_arena_alloc(
+ args.arena,
+ sizeof(grpc_subchannel_call) + channel_stack_->call_stack_size);
+ grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call);
+ Ref(DEBUG_LOCATION, "subchannel_call");
+ (*call)->connection = this;
+ const grpc_call_element_args call_args = {
+ callstk, /* call_stack */
+ nullptr, /* server_transport_data */
+ args.context, /* context */
+ args.path, /* path */
+ args.start_time, /* start_time */
+ args.deadline, /* deadline */
+ args.arena, /* arena */
+ args.call_combiner /* call_combiner */
+ };
+ grpc_error* error = grpc_call_stack_init(
+ channel_stack_, 1, subchannel_call_destroy, *call, &call_args);
+ if (error != GRPC_ERROR_NONE) {
+ const char* error_string = grpc_error_string(error);
+ gpr_log(GPR_ERROR, "error: %s", error_string);
+ return error;
+ }
+ grpc_call_stack_set_pollset_or_pollset_set(callstk, args.pollent);
+ return GRPC_ERROR_NONE;
+}
+} // namespace grpc_core
diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h
index 9d34fff07a..f2a5c1e273 100644
--- a/src/core/ext/filters/client_channel/subchannel.h
+++ b/src/core/ext/filters/client_channel/subchannel.h
@@ -21,8 +21,10 @@
#include "src/core/ext/filters/client_channel/connector.h"
#include "src/core/lib/channel/channel_stack.h"
+#include "src/core/lib/gpr++/ref_counted.h"
+#include "src/core/lib/gpr++/ref_counted_ptr.h"
+#include "src/core/lib/gpr/arena.h"
#include "src/core/lib/iomgr/polling_entity.h"
-#include "src/core/lib/support/arena.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/metadata.h"
@@ -32,7 +34,6 @@
/** A (sub-)channel that knows how to connect to exactly one target
address. Provides a target for load balancing. */
typedef struct grpc_subchannel grpc_subchannel;
-typedef struct grpc_connected_subchannel grpc_connected_subchannel;
typedef struct grpc_subchannel_call grpc_subchannel_call;
typedef struct grpc_subchannel_args grpc_subchannel_args;
typedef struct grpc_subchannel_key grpc_subchannel_key;
@@ -48,10 +49,6 @@ typedef struct grpc_subchannel_key grpc_subchannel_key;
grpc_subchannel_weak_ref((p), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) \
grpc_subchannel_weak_unref((p), __FILE__, __LINE__, (r))
-#define GRPC_CONNECTED_SUBCHANNEL_REF(p, r) \
- grpc_connected_subchannel_ref((p), __FILE__, __LINE__, (r))
-#define GRPC_CONNECTED_SUBCHANNEL_UNREF(p, r) \
- grpc_connected_subchannel_unref((p), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_CALL_REF(p, r) \
grpc_subchannel_call_ref((p), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_CALL_UNREF(p, r) \
@@ -65,14 +62,39 @@ typedef struct grpc_subchannel_key grpc_subchannel_key;
#define GRPC_SUBCHANNEL_UNREF(p, r) grpc_subchannel_unref((p))
#define GRPC_SUBCHANNEL_WEAK_REF(p, r) grpc_subchannel_weak_ref((p))
#define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) grpc_subchannel_weak_unref((p))
-#define GRPC_CONNECTED_SUBCHANNEL_REF(p, r) grpc_connected_subchannel_ref((p))
-#define GRPC_CONNECTED_SUBCHANNEL_UNREF(p, r) \
- grpc_connected_subchannel_unref((p))
#define GRPC_SUBCHANNEL_CALL_REF(p, r) grpc_subchannel_call_ref((p))
#define GRPC_SUBCHANNEL_CALL_UNREF(p, r) grpc_subchannel_call_unref((p))
#define GRPC_SUBCHANNEL_REF_EXTRA_ARGS
#endif
+namespace grpc_core {
+class ConnectedSubchannel : public grpc_core::RefCountedWithTracing {
+ public:
+ struct CallArgs {
+ grpc_polling_entity* pollent;
+ grpc_slice path;
+ gpr_timespec start_time;
+ grpc_millis deadline;
+ gpr_arena* arena;
+ grpc_call_context_element* context;
+ grpc_call_combiner* call_combiner;
+ };
+
+ explicit ConnectedSubchannel(grpc_channel_stack* channel_stack);
+ ~ConnectedSubchannel();
+
+ grpc_channel_stack* channel_stack() { return channel_stack_; }
+ void NotifyOnStateChange(grpc_pollset_set* interested_parties,
+ grpc_connectivity_state* state,
+ grpc_closure* closure);
+ void Ping(grpc_closure* on_initiate, grpc_closure* on_ack);
+ grpc_error* CreateCall(const CallArgs& args, grpc_subchannel_call** call);
+
+ private:
+ grpc_channel_stack* channel_stack_;
+};
+} // namespace grpc_core
+
grpc_subchannel* grpc_subchannel_ref(
grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
grpc_subchannel* grpc_subchannel_ref_from_weak_ref(
@@ -83,35 +105,11 @@ grpc_subchannel* grpc_subchannel_weak_ref(
grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_subchannel_weak_unref(
grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
-grpc_connected_subchannel* grpc_connected_subchannel_ref(
- grpc_connected_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
-void grpc_connected_subchannel_unref(
- grpc_connected_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_subchannel_call_ref(
grpc_subchannel_call* call GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_subchannel_call_unref(
grpc_subchannel_call* call GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
-/** construct a subchannel call */
-typedef struct {
- grpc_polling_entity* pollent;
- grpc_slice path;
- gpr_timespec start_time;
- grpc_millis deadline;
- gpr_arena* arena;
- grpc_call_context_element* context;
- grpc_call_combiner* call_combiner;
-} grpc_connected_subchannel_call_args;
-
-grpc_error* grpc_connected_subchannel_create_call(
- grpc_connected_subchannel* connected_subchannel,
- const grpc_connected_subchannel_call_args* args,
- grpc_subchannel_call** subchannel_call);
-
-/** process a transport level op */
-void grpc_connected_subchannel_process_transport_op(
- grpc_connected_subchannel* subchannel, grpc_transport_op* op);
-
/** poll the current connectivity state of a channel */
grpc_connectivity_state grpc_subchannel_check_connectivity(
grpc_subchannel* channel, grpc_error** error);
@@ -121,17 +119,12 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(
void grpc_subchannel_notify_on_state_change(
grpc_subchannel* channel, grpc_pollset_set* interested_parties,
grpc_connectivity_state* state, grpc_closure* notify);
-void grpc_connected_subchannel_notify_on_state_change(
- grpc_connected_subchannel* channel, grpc_pollset_set* interested_parties,
- grpc_connectivity_state* state, grpc_closure* notify);
-void grpc_connected_subchannel_ping(grpc_connected_subchannel* channel,
- grpc_closure* on_initiate,
- grpc_closure* on_ack);
-
-/** retrieve the grpc_connected_subchannel - or NULL if called before
- the subchannel becomes connected */
-grpc_connected_subchannel* grpc_subchannel_get_connected_subchannel(
- grpc_subchannel* subchannel);
+
+/** retrieve the grpc_core::ConnectedSubchannel - or nullptr if not connected
+ * (which may happen before it initially connects or during transient failures)
+ * */
+grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel>
+grpc_subchannel_get_connected_subchannel(grpc_subchannel* c);
/** return the subchannel index key for \a subchannel */
const grpc_subchannel_key* grpc_subchannel_get_key(
diff --git a/src/core/ext/filters/client_channel/uri_parser.cc b/src/core/ext/filters/client_channel/uri_parser.cc
index 3428f4b54c..c5f2d6822c 100644
--- a/src/core/ext/filters/client_channel/uri_parser.cc
+++ b/src/core/ext/filters/client_channel/uri_parser.cc
@@ -26,10 +26,10 @@
#include <grpc/support/port_platform.h>
#include <grpc/support/string_util.h>
+#include "src/core/lib/gpr/string.h"
#include "src/core/lib/slice/percent_encoding.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
-#include "src/core/lib/support/string.h"
/** a size_t default value... maps to all 1's */
#define NOT_SET (~(size_t)0)
diff --git a/src/core/ext/filters/http/client/http_client_filter.cc b/src/core/ext/filters/http/client/http_client_filter.cc
index a1fb10f5b8..5584d50018 100644
--- a/src/core/ext/filters/http/client/http_client_filter.cc
+++ b/src/core/ext/filters/http/client/http_client_filter.cc
@@ -20,12 +20,12 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <string.h>
+#include "src/core/lib/gpr/string.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/b64.h"
#include "src/core/lib/slice/percent_encoding.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
-#include "src/core/lib/support/string.h"
#include "src/core/lib/transport/static_metadata.h"
#include "src/core/lib/transport/transport_impl.h"
@@ -35,7 +35,8 @@
/* default maximum size of payload eligable for GET request */
static const size_t kMaxPayloadSizeForGet = 2048;
-typedef struct call_data {
+namespace {
+struct call_data {
grpc_call_combiner* call_combiner;
// State for handling send_initial_metadata ops.
grpc_linked_mdelem method;
@@ -60,13 +61,14 @@ typedef struct call_data {
grpc_closure on_send_message_next_done;
grpc_closure* original_send_message_on_complete;
grpc_closure send_message_on_complete;
-} call_data;
+};
-typedef struct channel_data {
+struct channel_data {
grpc_mdelem static_scheme;
grpc_mdelem user_agent;
size_t max_payload_size_for_get;
-} channel_data;
+};
+} // namespace
static grpc_error* client_filter_incoming_metadata(grpc_call_element* elem,
grpc_metadata_batch* b) {
diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.cc b/src/core/ext/filters/http/message_compress/message_compress_filter.cc
index c17830fa7b..0218ec6e40 100644
--- a/src/core/ext/filters/http/message_compress/message_compress_filter.cc
+++ b/src/core/ext/filters/http/message_compress/message_compress_filter.cc
@@ -29,23 +29,24 @@
#include "src/core/lib/compression/algorithm_metadata.h"
#include "src/core/lib/compression/compression_internal.h"
#include "src/core/lib/compression/message_compress.h"
+#include "src/core/lib/gpr/string.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
-#include "src/core/lib/support/string.h"
#include "src/core/lib/surface/call.h"
#include "src/core/lib/transport/static_metadata.h"
-typedef enum {
+namespace {
+enum initial_metadata_state {
// Initial metadata not yet seen.
INITIAL_METADATA_UNSEEN = 0,
// Initial metadata seen; compression algorithm set.
HAS_COMPRESSION_ALGORITHM,
// Initial metadata seen; no compression algorithm set.
NO_COMPRESSION_ALGORITHM,
-} initial_metadata_state;
+};
-typedef struct call_data {
+struct call_data {
grpc_call_combiner* call_combiner;
grpc_linked_mdelem compression_algorithm_storage;
grpc_linked_mdelem stream_compression_algorithm_storage;
@@ -63,9 +64,9 @@ typedef struct call_data {
grpc_closure* original_send_message_on_complete;
grpc_closure send_message_on_complete;
grpc_closure on_send_message_next_done;
-} call_data;
+};
-typedef struct channel_data {
+struct channel_data {
/** The default, channel-level, compression algorithm */
grpc_compression_algorithm default_compression_algorithm;
/** Bitset of enabled compression algorithms */
@@ -74,7 +75,8 @@ typedef struct channel_data {
uint32_t supported_message_compression_algorithms;
/** Supported stream compression algorithms */
uint32_t supported_stream_compression_algorithms;
-} channel_data;
+};
+} // namespace
static bool skip_compression(grpc_call_element* elem, uint32_t flags,
bool has_compression_algorithm) {
diff --git a/src/core/ext/filters/http/server/http_server_filter.cc b/src/core/ext/filters/http/server/http_server_filter.cc
index b872dc98f5..508a3bf9fc 100644
--- a/src/core/ext/filters/http/server/http_server_filter.cc
+++ b/src/core/ext/filters/http/server/http_server_filter.cc
@@ -31,7 +31,8 @@
#define EXPECTED_CONTENT_TYPE "application/grpc"
#define EXPECTED_CONTENT_TYPE_LENGTH sizeof(EXPECTED_CONTENT_TYPE) - 1
-typedef struct call_data {
+namespace {
+struct call_data {
grpc_call_combiner* call_combiner;
grpc_linked_mdelem status;
@@ -60,11 +61,12 @@ typedef struct call_data {
grpc_closure hs_on_recv;
grpc_closure hs_on_complete;
grpc_closure hs_recv_message_ready;
-} call_data;
+};
-typedef struct channel_data {
+struct channel_data {
uint8_t unused;
-} channel_data;
+};
+} // namespace
static grpc_error* server_filter_outgoing_metadata(grpc_call_element* elem,
grpc_metadata_batch* b) {
diff --git a/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc b/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc
index f50a928fcd..a414229768 100644
--- a/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc
+++ b/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc
@@ -31,7 +31,8 @@
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/transport/static_metadata.h"
-typedef struct call_data {
+namespace {
+struct call_data {
intptr_t id; /**< an id unique to the call */
bool have_trailing_md_string;
grpc_slice trailing_md_string;
@@ -48,11 +49,12 @@ typedef struct call_data {
/* to get notified of the availability of the incoming initial metadata. */
grpc_closure on_initial_md_ready;
grpc_metadata_batch* recv_initial_metadata;
-} call_data;
+};
-typedef struct channel_data {
+struct channel_data {
intptr_t id; /**< an id unique to the channel */
-} channel_data;
+};
+} // namespace
static void on_initial_md_ready(void* user_data, grpc_error* err) {
grpc_call_element* elem = (grpc_call_element*)user_data;
diff --git a/src/core/ext/filters/max_age/max_age_filter.cc b/src/core/ext/filters/max_age/max_age_filter.cc
index 0499c6ecfc..7b86e4cd6c 100644
--- a/src/core/ext/filters/max_age/max_age_filter.cc
+++ b/src/core/ext/filters/max_age/max_age_filter.cc
@@ -37,7 +37,8 @@
#define MAX_CONNECTION_IDLE_INTEGER_OPTIONS \
{ DEFAULT_MAX_CONNECTION_IDLE_MS, 1, INT_MAX }
-typedef struct channel_data {
+namespace {
+struct channel_data {
/* We take a reference to the channel stack for the timer callback */
grpc_channel_stack* channel_stack;
/* Guards access to max_age_timer, max_age_timer_pending, max_age_grace_timer
@@ -84,7 +85,8 @@ typedef struct channel_data {
grpc_connectivity_state connectivity_state;
/* Number of active calls */
gpr_atm call_count;
-} channel_data;
+};
+} // namespace
/* Increase the nubmer of active calls. Before the increasement, if there are no
calls, the max_idle_timer should be cancelled. */
diff --git a/src/core/ext/filters/message_size/message_size_filter.cc b/src/core/ext/filters/message_size/message_size_filter.cc
index f8487f9a9e..8d76c4a837 100644
--- a/src/core/ext/filters/message_size/message_size_filter.cc
+++ b/src/core/ext/filters/message_size/message_size_filter.cc
@@ -26,7 +26,7 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack_builder.h"
-#include "src/core/lib/support/string.h"
+#include "src/core/lib/gpr/string.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/transport/service_config.h"
@@ -86,7 +86,8 @@ static void* refcounted_message_size_limits_create_from_json(
return value;
}
-typedef struct call_data {
+namespace {
+struct call_data {
grpc_call_combiner* call_combiner;
message_size_limits limits;
// Receive closures are chained: we inject this closure as the
@@ -97,13 +98,14 @@ typedef struct call_data {
grpc_byte_stream** recv_message;
// Original recv_message_ready callback, invoked after our own.
grpc_closure* next_recv_message_ready;
-} call_data;
+};
-typedef struct channel_data {
+struct channel_data {
message_size_limits limits;
// Maps path names to refcounted_message_size_limits structs.
grpc_slice_hash_table* method_limit_table;
-} channel_data;
+};
+} // namespace
// Callback invoked when we receive a message. Here we check the max
// receive message size.
diff --git a/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.cc b/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.cc
index 555a9134a2..88bb8c71cc 100644
--- a/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.cc
+++ b/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.cc
@@ -25,7 +25,8 @@
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/transport/metadata.h"
-typedef struct call_data {
+namespace {
+struct call_data {
// Receive closures are chained: we inject this closure as the
// recv_initial_metadata_ready up-call on transport_stream_op, and remember to
// call our next_recv_initial_metadata_ready member after handling it.
@@ -37,7 +38,8 @@ typedef struct call_data {
// Marks whether the workaround is active
bool workaround_active;
-} call_data;
+};
+} // namespace
// Find the user agent metadata element in the batch
static bool get_user_agent_mdelem(const grpc_metadata_batch* batch,