aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel/lb_policy/round_robin
diff options
context:
space:
mode:
authorGravatar Yash Tibrewal <yashkt@google.com>2017-12-06 09:47:54 -0800
committerGravatar GitHub <noreply@github.com>2017-12-06 09:47:54 -0800
commit8cf1470a51ea276ca84825e7495d4ee24743540d (patch)
tree72385cc865094115bc08cb813201d48cb09840bb /src/core/ext/filters/client_channel/lb_policy/round_robin
parent1d4e99508409be052bd129ba507bae1fbe7eb7fa (diff)
Revert "Revert "All instances of exec_ctx being passed around in src/core removed""
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/round_robin')
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc140
1 files changed, 64 insertions, 76 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index f68daba474..6958b72693 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -154,7 +154,7 @@ static void update_last_ready_subchannel_index_locked(round_robin_lb_policy* p,
}
}
-static void rr_destroy(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
+static void rr_destroy(grpc_lb_policy* pol) {
round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_DEBUG, "[RR %p] Destroying Round Robin policy at %p",
@@ -162,12 +162,12 @@ static void rr_destroy(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
}
GPR_ASSERT(p->subchannel_list == nullptr);
GPR_ASSERT(p->latest_pending_subchannel_list == nullptr);
- grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
+ grpc_connectivity_state_destroy(&p->state_tracker);
grpc_subchannel_index_unref();
gpr_free(p);
}
-static void rr_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
+static void rr_shutdown_locked(grpc_lb_policy* pol) {
round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
if (grpc_lb_round_robin_trace.enabled()) {
@@ -178,29 +178,27 @@ static void rr_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
while ((pp = p->pending_picks) != nullptr) {
p->pending_picks = pp->next;
*pp->target = nullptr;
- GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_REF(error));
gpr_free(pp);
}
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error),
- "rr_shutdown");
+ grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
+ GRPC_ERROR_REF(error), "rr_shutdown");
if (p->subchannel_list != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list,
+ grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list,
"sl_shutdown_rr_shutdown");
p->subchannel_list = nullptr;
}
if (p->latest_pending_subchannel_list != nullptr) {
grpc_lb_subchannel_list_shutdown_and_unref(
- exec_ctx, p->latest_pending_subchannel_list,
- "sl_shutdown_pending_rr_shutdown");
+ p->latest_pending_subchannel_list, "sl_shutdown_pending_rr_shutdown");
p->latest_pending_subchannel_list = nullptr;
}
- grpc_lb_policy_try_reresolve(exec_ctx, &p->base, &grpc_lb_round_robin_trace,
+ grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_round_robin_trace,
GRPC_ERROR_CANCELLED);
GRPC_ERROR_UNREF(error);
}
-static void rr_cancel_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
+static void rr_cancel_pick_locked(grpc_lb_policy* pol,
grpc_connected_subchannel** target,
grpc_error* error) {
round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
@@ -210,7 +208,7 @@ static void rr_cancel_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
pending_pick* next = pp->next;
if (pp->target == target) {
*target = nullptr;
- GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete,
+ GRPC_CLOSURE_SCHED(pp->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick cancelled", &error, 1));
gpr_free(pp);
@@ -223,7 +221,7 @@ static void rr_cancel_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
GRPC_ERROR_UNREF(error);
}
-static void rr_cancel_picks_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
+static void rr_cancel_picks_locked(grpc_lb_policy* pol,
uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq,
grpc_error* error) {
@@ -235,7 +233,7 @@ static void rr_cancel_picks_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
*pp->target = nullptr;
- GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete,
+ GRPC_CLOSURE_SCHED(pp->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick cancelled", &error, 1));
gpr_free(pp);
@@ -248,27 +246,26 @@ static void rr_cancel_picks_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
GRPC_ERROR_UNREF(error);
}
-static void start_picking_locked(grpc_exec_ctx* exec_ctx,
- round_robin_lb_policy* p) {
+static void start_picking_locked(round_robin_lb_policy* p) {
p->started_picking = true;
for (size_t i = 0; i < p->subchannel_list->num_subchannels; i++) {
if (p->subchannel_list->subchannels[i].subchannel != nullptr) {
grpc_lb_subchannel_list_ref_for_connectivity_watch(p->subchannel_list,
"connectivity_watch");
grpc_lb_subchannel_data_start_connectivity_watch(
- exec_ctx, &p->subchannel_list->subchannels[i]);
+ &p->subchannel_list->subchannels[i]);
}
}
}
-static void rr_exit_idle_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
+static void rr_exit_idle_locked(grpc_lb_policy* pol) {
round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
if (!p->started_picking) {
- start_picking_locked(exec_ctx, p);
+ start_picking_locked(p);
}
}
-static int rr_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
+static int rr_pick_locked(grpc_lb_policy* pol,
const grpc_lb_policy_pick_args* pick_args,
grpc_connected_subchannel** target,
grpc_call_context_element* context, void** user_data,
@@ -305,7 +302,7 @@ static int rr_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
}
/* no pick currently available. Save for later in list of pending picks */
if (!p->started_picking) {
- start_picking_locked(exec_ctx, p);
+ start_picking_locked(p);
}
pending_pick* pp = (pending_pick*)gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
@@ -348,8 +345,7 @@ static void update_state_counters_locked(grpc_lb_subchannel_data* sd) {
* (the grpc_lb_subchannel_data associated with the updated subchannel) and the
* subchannel list \a sd belongs to (sd->subchannel_list). \a error will be used
* only if the policy transitions to state TRANSIENT_FAILURE. */
-static void update_lb_connectivity_status_locked(grpc_exec_ctx* exec_ctx,
- grpc_lb_subchannel_data* sd,
+static void update_lb_connectivity_status_locked(grpc_lb_subchannel_data* sd,
grpc_error* error) {
/* In priority order. The first rule to match terminates the search (ie, if we
* are on rule n, all previous rules were unfulfilled).
@@ -382,38 +378,36 @@ static void update_lb_connectivity_status_locked(grpc_exec_ctx* exec_ctx,
round_robin_lb_policy* p = (round_robin_lb_policy*)subchannel_list->policy;
if (subchannel_list->num_ready > 0) {
/* 1) READY */
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY,
+ grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY,
GRPC_ERROR_NONE, "rr_ready");
} else if (sd->curr_connectivity_state == GRPC_CHANNEL_CONNECTING) {
/* 2) CONNECTING */
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE,
- "rr_connecting");
+ grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_CONNECTING,
+ GRPC_ERROR_NONE, "rr_connecting");
} else if (subchannel_list->num_shutdown ==
subchannel_list->num_subchannels) {
/* 3) IDLE and re-resolve */
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_IDLE,
+ grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE,
GRPC_ERROR_NONE,
"rr_exhausted_subchannels+reresolve");
p->started_picking = false;
- grpc_lb_policy_try_reresolve(exec_ctx, &p->base, &grpc_lb_round_robin_trace,
+ grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_round_robin_trace,
GRPC_ERROR_NONE);
} else if (subchannel_list->num_transient_failures ==
subchannel_list->num_subchannels) {
/* 4) TRANSIENT_FAILURE */
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
+ grpc_connectivity_state_set(&p->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "rr_transient_failure");
} else if (subchannel_list->num_idle == subchannel_list->num_subchannels) {
/* 5) IDLE */
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_IDLE,
+ grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE,
GRPC_ERROR_NONE, "rr_idle");
}
GRPC_ERROR_UNREF(error);
}
-static void rr_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* error) {
+static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
grpc_lb_subchannel_data* sd = (grpc_lb_subchannel_data*)arg;
round_robin_lb_policy* p =
(round_robin_lb_policy*)sd->subchannel_list->policy;
@@ -431,18 +425,18 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg,
}
// If the policy is shutting down, unref and return.
if (p->shutdown) {
- grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd);
- grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, "rr_shutdown");
- grpc_lb_subchannel_list_unref_for_connectivity_watch(
- exec_ctx, sd->subchannel_list, "rr_shutdown");
+ grpc_lb_subchannel_data_stop_connectivity_watch(sd);
+ grpc_lb_subchannel_data_unref_subchannel(sd, "rr_shutdown");
+ grpc_lb_subchannel_list_unref_for_connectivity_watch(sd->subchannel_list,
+ "rr_shutdown");
return;
}
// If the subchannel list is shutting down, stop watching.
if (sd->subchannel_list->shutting_down || error == GRPC_ERROR_CANCELLED) {
- grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd);
- grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, "rr_sl_shutdown");
- grpc_lb_subchannel_list_unref_for_connectivity_watch(
- exec_ctx, sd->subchannel_list, "rr_sl_shutdown");
+ grpc_lb_subchannel_data_stop_connectivity_watch(sd);
+ grpc_lb_subchannel_data_unref_subchannel(sd, "rr_sl_shutdown");
+ grpc_lb_subchannel_list_unref_for_connectivity_watch(sd->subchannel_list,
+ "rr_sl_shutdown");
return;
}
// If we're still here, the notification must be for a subchannel in
@@ -455,14 +449,13 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg,
sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe;
// Update state counters and new overall state.
update_state_counters_locked(sd);
- update_lb_connectivity_status_locked(exec_ctx, sd, GRPC_ERROR_REF(error));
+ update_lb_connectivity_status_locked(sd, GRPC_ERROR_REF(error));
// If the sd's new state is SHUTDOWN, unref the subchannel.
if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
- grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd);
- grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd,
- "rr_connectivity_shutdown");
+ grpc_lb_subchannel_data_stop_connectivity_watch(sd);
+ grpc_lb_subchannel_data_unref_subchannel(sd, "rr_connectivity_shutdown");
grpc_lb_subchannel_list_unref_for_connectivity_watch(
- exec_ctx, sd->subchannel_list, "rr_connectivity_shutdown");
+ sd->subchannel_list, "rr_connectivity_shutdown");
} else { // sd not in SHUTDOWN
if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) {
if (sd->connected_subchannel == nullptr) {
@@ -490,8 +483,8 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg,
}
if (p->subchannel_list != nullptr) {
// dispose of the current subchannel_list
- grpc_lb_subchannel_list_shutdown_and_unref(
- exec_ctx, p->subchannel_list, "sl_phase_out_shutdown");
+ grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list,
+ "sl_phase_out_shutdown");
}
p->subchannel_list = p->latest_pending_subchannel_list;
p->latest_pending_subchannel_list = nullptr;
@@ -523,32 +516,30 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg,
(void*)p, (void*)selected->subchannel,
(void*)p->subchannel_list, (unsigned long)next_ready_index);
}
- GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
}
}
// Renew notification.
- grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
+ grpc_lb_subchannel_data_start_connectivity_watch(sd);
}
}
static grpc_connectivity_state rr_check_connectivity_locked(
- grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, grpc_error** error) {
+ grpc_lb_policy* pol, grpc_error** error) {
round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
return grpc_connectivity_state_get(&p->state_tracker, error);
}
-static void rr_notify_on_state_change_locked(grpc_exec_ctx* exec_ctx,
- grpc_lb_policy* pol,
+static void rr_notify_on_state_change_locked(grpc_lb_policy* pol,
grpc_connectivity_state* current,
grpc_closure* notify) {
round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
- grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker,
- current, notify);
+ grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current,
+ notify);
}
-static void rr_ping_one_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
- grpc_closure* closure) {
+static void rr_ping_one_locked(grpc_lb_policy* pol, grpc_closure* closure) {
round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
if (next_ready_index < p->subchannel_list->num_subchannels) {
@@ -556,16 +547,15 @@ static void rr_ping_one_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
&p->subchannel_list->subchannels[next_ready_index];
grpc_connected_subchannel* target = GRPC_CONNECTED_SUBCHANNEL_REF(
selected->connected_subchannel, "rr_ping");
- grpc_connected_subchannel_ping(exec_ctx, target, closure);
- GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, target, "rr_ping");
+ grpc_connected_subchannel_ping(target, closure);
+ GRPC_CONNECTED_SUBCHANNEL_UNREF(target, "rr_ping");
} else {
- GRPC_CLOSURE_SCHED(
- exec_ctx, closure,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Round Robin not connected"));
+ GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Round Robin not connected"));
}
}
-static void rr_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
+static void rr_update_locked(grpc_lb_policy* policy,
const grpc_lb_policy_args* args) {
round_robin_lb_policy* p = (round_robin_lb_policy*)policy;
const grpc_arg* arg =
@@ -576,7 +566,7 @@ static void rr_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
// Otherwise, keep using the current subchannel list (ignore this update).
if (p->subchannel_list == nullptr) {
grpc_connectivity_state_set(
- exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"),
"rr_update_missing");
}
@@ -588,15 +578,15 @@ static void rr_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
addresses->num_addresses);
}
grpc_lb_subchannel_list* subchannel_list = grpc_lb_subchannel_list_create(
- exec_ctx, &p->base, &grpc_lb_round_robin_trace, addresses, args,
+ &p->base, &grpc_lb_round_robin_trace, addresses, args,
rr_connectivity_changed_locked);
if (subchannel_list->num_subchannels == 0) {
grpc_connectivity_state_set(
- exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
"rr_update_empty");
if (p->subchannel_list != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list,
+ grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list,
"sl_shutdown_empty_update");
}
p->subchannel_list = subchannel_list; // empty list
@@ -612,7 +602,7 @@ static void rr_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
(void*)subchannel_list);
}
grpc_lb_subchannel_list_shutdown_and_unref(
- exec_ctx, p->latest_pending_subchannel_list, "sl_outdated");
+ p->latest_pending_subchannel_list, "sl_outdated");
}
p->latest_pending_subchannel_list = subchannel_list;
for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) {
@@ -623,22 +613,21 @@ static void rr_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
grpc_lb_subchannel_list_ref_for_connectivity_watch(subchannel_list,
"connectivity_watch");
grpc_lb_subchannel_data_start_connectivity_watch(
- exec_ctx, &subchannel_list->subchannels[i]);
+ &subchannel_list->subchannels[i]);
}
} else {
// The policy isn't picking yet. Save the update for later, disposing of
// previous version if any.
if (p->subchannel_list != nullptr) {
grpc_lb_subchannel_list_shutdown_and_unref(
- exec_ctx, p->subchannel_list, "rr_update_before_started_picking");
+ p->subchannel_list, "rr_update_before_started_picking");
}
p->subchannel_list = subchannel_list;
}
}
static void rr_set_reresolve_closure_locked(
- grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
- grpc_closure* request_reresolution) {
+ grpc_lb_policy* policy, grpc_closure* request_reresolution) {
round_robin_lb_policy* p = (round_robin_lb_policy*)policy;
GPR_ASSERT(!p->shutdown);
GPR_ASSERT(policy->request_reresolution == nullptr);
@@ -662,8 +651,7 @@ static void round_robin_factory_ref(grpc_lb_policy_factory* factory) {}
static void round_robin_factory_unref(grpc_lb_policy_factory* factory) {}
-static grpc_lb_policy* round_robin_create(grpc_exec_ctx* exec_ctx,
- grpc_lb_policy_factory* factory,
+static grpc_lb_policy* round_robin_create(grpc_lb_policy_factory* factory,
grpc_lb_policy_args* args) {
GPR_ASSERT(args->client_channel_factory != nullptr);
round_robin_lb_policy* p = (round_robin_lb_policy*)gpr_zalloc(sizeof(*p));
@@ -671,7 +659,7 @@ static grpc_lb_policy* round_robin_create(grpc_exec_ctx* exec_ctx,
grpc_subchannel_index_ref();
grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
"round_robin");
- rr_update_locked(exec_ctx, &p->base, args);
+ rr_update_locked(&p->base, args);
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_DEBUG, "[RR %p] Created with %lu subchannels", (void*)p,
(unsigned long)p->subchannel_list->num_subchannels);