aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/ext/client_config/client_channel.c27
-rw-r--r--src/core/ext/client_config/lb_policy.c3
-rw-r--r--src/core/ext/client_config/lb_policy.h6
-rw-r--r--src/core/ext/lb_policy/grpclb/grpclb.c27
-rw-r--r--src/core/ext/lb_policy/pick_first/pick_first.c13
-rw-r--r--src/core/ext/lb_policy/round_robin/round_robin.c11
6 files changed, 32 insertions, 55 deletions
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c
index 61e012578e..b5b6cc4df0 100644
--- a/src/core/ext/client_config/client_channel.c
+++ b/src/core/ext/client_config/client_channel.c
@@ -456,10 +456,16 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) {
static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
- call_data *calld = arg;
+ grpc_call_element *elem = arg;
+ call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
gpr_mu_lock(&calld->mu);
GPR_ASSERT(calld->creation_phase ==
GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL);
+ // gpr_mu_lock(&chand->mu);
+ grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent,
+ chand->interested_parties);
+ // gpr_mu_unlock(&chand->mu);
calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
if (calld->connected_subchannel == NULL) {
gpr_atm_no_barrier_store(&calld->subchannel_call, 1);
@@ -536,7 +542,6 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
GPR_TIMER_BEGIN("pick_subchannel", 0);
channel_data *chand = elem->channel_data;
- call_data *calld = elem->call_data;
continue_picking_args *cpa;
grpc_closure *closure;
@@ -566,9 +571,9 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
int r;
GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel");
gpr_mu_unlock(&chand->mu);
- r = grpc_lb_policy_pick(exec_ctx, lb_policy, calld->pollent,
- initial_metadata, initial_metadata_flags,
- connected_subchannel, on_ready);
+ r = grpc_lb_policy_pick(exec_ctx, lb_policy, initial_metadata,
+ initial_metadata_flags, connected_subchannel,
+ on_ready);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel");
GPR_TIMER_END("pick_subchannel", 0);
return r;
@@ -608,6 +613,7 @@ static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op *op) {
call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
/* try to (atomically) get the call */
grpc_subchannel_call *call = GET_CALL(calld);
@@ -668,13 +674,22 @@ retry:
calld->connected_subchannel == NULL &&
op->send_initial_metadata != NULL) {
calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL;
- grpc_closure_init(&calld->next_step, subchannel_ready, calld);
+ grpc_closure_init(&calld->next_step, subchannel_ready, elem);
GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel");
+ // grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent,
+ // chand->interested_parties);
if (pick_subchannel(exec_ctx, elem, op->send_initial_metadata,
op->send_initial_metadata_flags,
&calld->connected_subchannel, &calld->next_step)) {
calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
+ // grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent,
+ // chand->interested_parties);
GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
+ } else {
+ // gpr_mu_lock(&chand->mu);
+ grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent,
+ chand->interested_parties);
+ // gpr_mu_unlock(&chand->mu);
}
}
/* if we've got a subchannel, then let's ask it to create a call */
diff --git a/src/core/ext/client_config/lb_policy.c b/src/core/ext/client_config/lb_policy.c
index 8b980b2cca..0e02de04ed 100644
--- a/src/core/ext/client_config/lb_policy.c
+++ b/src/core/ext/client_config/lb_policy.c
@@ -100,12 +100,11 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx,
}
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_polling_entity *pollent,
grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **target,
grpc_closure *on_complete) {
- return policy->vtable->pick(exec_ctx, policy, pollent, initial_metadata,
+ return policy->vtable->pick(exec_ctx, policy, initial_metadata,
initial_metadata_flags, target, on_complete);
}
diff --git a/src/core/ext/client_config/lb_policy.h b/src/core/ext/client_config/lb_policy.h
index a2f5446fc6..b92a73c09a 100644
--- a/src/core/ext/client_config/lb_policy.h
+++ b/src/core/ext/client_config/lb_policy.h
@@ -35,7 +35,6 @@
#define GRPC_CORE_EXT_CLIENT_CONFIG_LB_POLICY_H
#include "src/core/ext/client_config/subchannel.h"
-#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/transport/connectivity_state.h"
/** A load balancing policy: specified by a vtable and a struct (which
@@ -60,7 +59,6 @@ struct grpc_lb_policy_vtable {
/** implement grpc_lb_policy_pick */
int (*pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_polling_entity *pollent,
grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **target, grpc_closure *on_complete);
@@ -127,9 +125,9 @@ void grpc_lb_policy_init(grpc_lb_policy *policy,
/** Given initial metadata in \a initial_metadata, find an appropriate
target for this rpc, and 'return' it by calling \a on_complete after setting
\a target.
- Picking can be asynchronous. Any IO should be done under \a pollent. */
+ Picking can be asynchronous. Any IO should be done under the pollset_set
+ interested_parties. */
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_polling_entity *pollent,
grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **target,
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c
index af913d8a9d..515d379dbb 100644
--- a/src/core/ext/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/lb_policy/grpclb/grpclb.c
@@ -158,9 +158,6 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
typedef struct pending_pick {
struct pending_pick *next;
- /* polling entity for the pick()'s async notification */
- grpc_polling_entity *pollent;
-
/* the initial metadata for the pick. See grpc_lb_policy_pick() */
grpc_metadata_batch *initial_metadata;
@@ -180,7 +177,7 @@ typedef struct pending_pick {
wrapped_rr_closure_arg wrapped_on_complete_arg;
} pending_pick;
-static void add_pending_pick(pending_pick **root, grpc_polling_entity *pollent,
+static void add_pending_pick(pending_pick **root,
grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **target,
@@ -189,7 +186,6 @@ static void add_pending_pick(pending_pick **root, grpc_polling_entity *pollent,
memset(pp, 0, sizeof(pending_pick));
memset(&pp->wrapped_on_complete_arg, 0, sizeof(wrapped_rr_closure_arg));
pp->next = *root;
- pp->pollent = pollent;
pp->target = target;
pp->initial_metadata = initial_metadata;
pp->initial_metadata_flags = initial_metadata_flags;
@@ -359,9 +355,9 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "",
(intptr_t)glb_policy->rr_policy);
}
- grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pp->pollent,
- pp->initial_metadata, pp->initial_metadata_flags,
- pp->target, &pp->wrapped_on_complete);
+ grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pp->initial_metadata,
+ pp->initial_metadata_flags, pp->target,
+ &pp->wrapped_on_complete);
pp->wrapped_on_complete_arg.owning_pending_node = pp;
}
@@ -541,8 +537,6 @@ static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
while (pp != NULL) {
pending_pick *next = pp->next;
if (pp->target == target) {
- grpc_polling_entity_del_from_pollset_set(
- exec_ctx, pp->pollent, glb_policy->base.interested_parties);
*target = NULL;
grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete,
GRPC_ERROR_CANCELLED, NULL);
@@ -572,8 +566,6 @@ static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pending_pick *next = pp->next;
if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
- grpc_polling_entity_del_from_pollset_set(
- exec_ctx, pp->pollent, glb_policy->base.interested_parties);
grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete,
GRPC_ERROR_CANCELLED, NULL);
gpr_free(pp);
@@ -603,7 +595,6 @@ static void glb_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_polling_entity *pollent,
grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **target,
@@ -623,8 +614,8 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
glb_policy->wc_arg.wrapped_closure = on_complete;
grpc_closure_init(&glb_policy->wrapped_on_complete, wrapped_rr_closure,
&glb_policy->wc_arg);
- r = grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pollent,
- initial_metadata, initial_metadata_flags, target,
+ r = grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, initial_metadata,
+ initial_metadata_flags, target,
&glb_policy->wrapped_on_complete);
if (r != 0) {
/* the call to grpc_lb_policy_pick has been sychronous. Unreffing the RR
@@ -639,9 +630,7 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
GRPC_ERROR_NONE, NULL);
}
} else {
- grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent,
- glb_policy->base.interested_parties);
- add_pending_pick(&glb_policy->pending_picks, pollent, initial_metadata,
+ add_pending_pick(&glb_policy->pending_picks, initial_metadata,
initial_metadata_flags, target, on_complete);
if (!glb_policy->started_picking) {
@@ -769,7 +758,7 @@ static lb_client_data *lb_client_data_create(glb_lb_policy *glb_policy) {
/* Note the following LB call progresses every time there's activity in \a
* glb_policy->base.interested_parties, which is comprised of the polling
- * entities passed to glb_pick(). */
+ * entities from client_channel. */
lb_client->lb_call = grpc_channel_create_pollset_set_call(
glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS,
glb_policy->base.interested_parties, "/BalanceLoad",
diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c
index 9decf70692..a1a438c1df 100644
--- a/src/core/ext/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/lb_policy/pick_first/pick_first.c
@@ -39,7 +39,6 @@
typedef struct pending_pick {
struct pending_pick *next;
- grpc_polling_entity *pollent;
uint32_t initial_metadata_flags;
grpc_connected_subchannel **target;
grpc_closure *on_complete;
@@ -119,8 +118,6 @@ static void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
while (pp != NULL) {
pending_pick *next = pp->next;
*pp->target = NULL;
- grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
- p->base.interested_parties);
grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL);
gpr_free(pp);
pp = next;
@@ -137,8 +134,6 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
while (pp != NULL) {
pending_pick *next = pp->next;
if (pp->target == target) {
- grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
- p->base.interested_parties);
*target = NULL;
grpc_exec_ctx_sched(exec_ctx, pp->on_complete,
GRPC_ERROR_CREATE("Pick Cancelled"), NULL);
@@ -164,8 +159,6 @@ static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pending_pick *next = pp->next;
if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
- grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
- p->base.interested_parties);
grpc_exec_ctx_sched(exec_ctx, pp->on_complete,
GRPC_ERROR_CREATE("Pick Cancelled"), NULL);
gpr_free(pp);
@@ -199,7 +192,6 @@ static void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_polling_entity *pollent,
grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **target,
@@ -225,11 +217,8 @@ static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
if (!p->started_picking) {
start_picking(exec_ctx, p);
}
- grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent,
- p->base.interested_parties);
pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
- pp->pollent = pollent;
pp->target = target;
pp->initial_metadata_flags = initial_metadata_flags;
pp->on_complete = on_complete;
@@ -315,8 +304,6 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = selected;
- grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
- p->base.interested_parties);
grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL);
gpr_free(pp);
}
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c
index 7bcf608ab9..9b557f423e 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/lb_policy/round_robin/round_robin.c
@@ -76,7 +76,6 @@ int grpc_lb_round_robin_trace = 0;
* Once a pick is available, \a target is updated and \a on_complete called. */
typedef struct pending_pick {
struct pending_pick *next;
- grpc_polling_entity *pollent;
uint32_t initial_metadata_flags;
grpc_connected_subchannel **target;
grpc_closure *on_complete;
@@ -290,8 +289,6 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
while (pp != NULL) {
pending_pick *next = pp->next;
if (pp->target == target) {
- grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
- p->base.interested_parties);
*target = NULL;
grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_CANCELLED,
NULL);
@@ -317,8 +314,6 @@ static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pending_pick *next = pp->next;
if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
- grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
- p->base.interested_parties);
*pp->target = NULL;
grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_CANCELLED,
NULL);
@@ -361,7 +356,6 @@ static void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_polling_entity *pollent,
grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **target,
@@ -385,11 +379,8 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
if (!p->started_picking) {
start_picking(exec_ctx, p);
}
- grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent,
- p->base.interested_parties);
pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
- pp->pollent = pollent;
pp->target = target;
pp->on_complete = on_complete;
pp->initial_metadata_flags = initial_metadata_flags;
@@ -440,8 +431,6 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
"[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)",
selected->subchannel, selected);
}
- grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
- p->base.interested_parties);
grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL);
gpr_free(pp);
}