aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext
diff options
context:
space:
mode:
authorGravatar Sree Kuchibhotla <sreek@google.com>2016-10-18 09:45:45 -0700
committerGravatar Sree Kuchibhotla <sreek@google.com>2016-10-18 09:45:45 -0700
commit2a00d3d1bca70096451370102afec25faa33edb8 (patch)
tree1121d9f2e16b306ce372cd16e24bc09aa26b3828 /src/core/ext
parentc37a8a56df0b2dd64f802fee4d4da7d5552212e1 (diff)
parent21fbe2016c173edd3cb8943138cabf8f097cb62a (diff)
Merge branch 'master' into rpc_mgr
Diffstat (limited to 'src/core/ext')
-rw-r--r--src/core/ext/client_config/client_channel.c22
-rw-r--r--src/core/ext/client_config/lb_policy.h6
-rw-r--r--src/core/ext/lb_policy/grpclb/grpclb.c186
-rw-r--r--src/core/ext/lb_policy/pick_first/pick_first.c12
-rw-r--r--src/core/ext/lb_policy/round_robin/round_robin.c14
-rw-r--r--src/core/ext/load_reporting/load_reporting.h20
-rw-r--r--src/core/ext/load_reporting/load_reporting_filter.c4
7 files changed, 133 insertions, 131 deletions
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c
index a6056c3e8d..cbf79afa17 100644
--- a/src/core/ext/client_config/client_channel.c
+++ b/src/core/ext/client_config/client_channel.c
@@ -513,10 +513,14 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) {
static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
- call_data *calld = arg;
+ grpc_call_element *elem = arg;
+ call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
gpr_mu_lock(&calld->mu);
GPR_ASSERT(calld->creation_phase ==
GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL);
+ grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent,
+ chand->interested_parties);
calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
if (calld->connected_subchannel == NULL) {
gpr_atm_no_barrier_store(&calld->subchannel_call, 1);
@@ -564,6 +568,9 @@ typedef struct {
grpc_closure closure;
} continue_picking_args;
+/** Return true if subchannel is available immediately (in which case on_ready
+ should not be called), or false otherwise (in which case on_ready should be
+ called when the subchannel is available). */
static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
@@ -629,8 +636,8 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
gpr_mu_unlock(&chand->mu);
// TODO(dgq): make this deadline configurable somehow.
const grpc_lb_policy_pick_args inputs = {
- calld->pollent, initial_metadata, initial_metadata_flags,
- &calld->lb_token_mdelem, gpr_inf_future(GPR_CLOCK_MONOTONIC)};
+ initial_metadata, initial_metadata_flags, &calld->lb_token_mdelem,
+ gpr_inf_future(GPR_CLOCK_MONOTONIC)};
r = grpc_lb_policy_pick(exec_ctx, lb_policy, &inputs, connected_subchannel,
NULL, on_ready);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel");
@@ -672,6 +679,7 @@ static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op *op) {
call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
grpc_deadline_state_client_start_transport_stream_op(exec_ctx, elem, op);
/* try to (atomically) get the call */
@@ -739,14 +747,20 @@ retry:
calld->connected_subchannel == NULL &&
op->send_initial_metadata != NULL) {
calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL;
- grpc_closure_init(&calld->next_step, subchannel_ready, calld);
+ grpc_closure_init(&calld->next_step, subchannel_ready, elem);
GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel");
+ /* If a subchannel is not available immediately, the polling entity from
+ call_data should be provided to channel_data's interested_parties, so
+ that IO of the lb_policy and resolver could be done under it. */
if (pick_subchannel(exec_ctx, elem, op->send_initial_metadata,
op->send_initial_metadata_flags,
&calld->connected_subchannel, &calld->next_step,
GRPC_ERROR_NONE)) {
calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
+ } else {
+ grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent,
+ chand->interested_parties);
}
}
/* if we've got a subchannel, then let's ask it to create a call */
diff --git a/src/core/ext/client_config/lb_policy.h b/src/core/ext/client_config/lb_policy.h
index 110d08fcac..de424cd105 100644
--- a/src/core/ext/client_config/lb_policy.h
+++ b/src/core/ext/client_config/lb_policy.h
@@ -35,7 +35,6 @@
#define GRPC_CORE_EXT_CLIENT_CONFIG_LB_POLICY_H
#include "src/core/ext/client_config/subchannel.h"
-#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/transport/connectivity_state.h"
/** A load balancing policy: specified by a vtable and a struct (which
@@ -55,8 +54,6 @@ struct grpc_lb_policy {
/** Extra arguments for an LB pick */
typedef struct grpc_lb_policy_pick_args {
- /** Parties interested in the pick's progress */
- grpc_polling_entity *pollent;
/** Initial metadata associated with the picking call. */
grpc_metadata_batch *initial_metadata;
/** Bitmask used for selective cancelling. See \a
@@ -153,7 +150,8 @@ void grpc_lb_policy_init(grpc_lb_policy *policy,
once the pick is complete with its error argument set to indicate
success or failure.
- Any I/O should be done under \a pick_args->pollent. */
+ Any IO should be done under the \a interested_parties \a grpc_pollset_set
+ in the \a grpc_lb_policy struct. */
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target, void **user_data,
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c
index 626f285b90..9af92b787d 100644
--- a/src/core/ext/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/lb_policy/grpclb/grpclb.c
@@ -69,8 +69,8 @@
* possible scenarios:
*
* 1. This is the first server list received. There was no previous instance of
- * the Round Robin policy. \a rr_handover() will instantiate the RR policy
- * and perform all the pending operations over it.
+ * the Round Robin policy. \a rr_handover_locked() will instantiate the RR
+ * policy and perform all the pending operations over it.
* 2. There's already a RR policy instance active. We need to introduce the new
* one build from the new serverlist, but taking care not to disrupt the
* operations in progress over the old RR instance. This is done by
@@ -78,7 +78,7 @@
* references are held on the old RR policy, it'll be destroyed and \a
* glb_rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN
* state. At this point we can transition to a new RR instance safely, which
- * is done once again via \a rr_handover().
+ * is done once again via \a rr_handover_locked().
*
*
* Once a RR policy instance is in place (and getting updated as described),
@@ -86,8 +86,8 @@
* forwarding them to the RR instance. Any time there's no RR policy available
* (ie, right after the creation of the gRPCLB policy, if an empty serverlist
* is received, etc), pick/ping requests are added to a list of pending
- * picks/pings to be flushed and serviced as part of \a rr_handover() the moment
- * the RR policy instance becomes available.
+ * picks/pings to be flushed and serviced as part of \a rr_handover_locked() the
+ * moment the RR policy instance becomes available.
*
* \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the
* high level design and details. */
@@ -134,6 +134,9 @@ static void initial_metadata_add_lb_token(
}
typedef struct wrapped_rr_closure_arg {
+ /* the closure instance using this struct as argument */
+ grpc_closure wrapper_closure;
+
/* the original closure. Usually a on_complete/notify cb for pick() and ping()
* calls against the internal RR instance, respectively. */
grpc_closure *wrapped_closure;
@@ -155,9 +158,8 @@ typedef struct wrapped_rr_closure_arg {
/* The RR instance related to the closure */
grpc_lb_policy *rr_policy;
- /* when not NULL, represents a pending_{pick,ping} node to be freed upon
- * closure execution */
- void *owning_pending_node; /* to be freed if not NULL */
+ /* heap memory to be freed upon closure execution. */
+ void *free_when_done;
} wrapped_rr_closure_arg;
/* The \a on_complete closure passed as part of the pick requires keeping a
@@ -183,10 +185,10 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
}
}
GPR_ASSERT(wc_arg->wrapped_closure != NULL);
-
grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error),
NULL);
- gpr_free(wc_arg->owning_pending_node);
+ GPR_ASSERT(wc_arg->free_when_done != NULL);
+ gpr_free(wc_arg->free_when_done);
}
/* Linked list of pending pick requests. It stores all information needed to
@@ -207,10 +209,6 @@ typedef struct pending_pick {
* upon error. */
grpc_connected_subchannel **target;
- /* a closure wrapping the original on_complete one to be invoked once the
- * pick() has completed (regardless of success) */
- grpc_closure wrapped_on_complete;
-
/* args for wrapped_on_complete */
wrapped_rr_closure_arg wrapped_on_complete_arg;
} pending_pick;
@@ -230,8 +228,9 @@ static void add_pending_pick(pending_pick **root,
pp->wrapped_on_complete_arg.initial_metadata = pick_args->initial_metadata;
pp->wrapped_on_complete_arg.lb_token_mdelem_storage =
pick_args->lb_token_mdelem_storage;
- grpc_closure_init(&pp->wrapped_on_complete, wrapped_rr_closure,
- &pp->wrapped_on_complete_arg);
+ pp->wrapped_on_complete_arg.free_when_done = pp;
+ grpc_closure_init(&pp->wrapped_on_complete_arg.wrapper_closure,
+ wrapped_rr_closure, &pp->wrapped_on_complete_arg);
*root = pp;
}
@@ -239,10 +238,6 @@ static void add_pending_pick(pending_pick **root,
typedef struct pending_ping {
struct pending_ping *next;
- /* a closure wrapping the original on_complete one to be invoked once the
- * ping() has completed (regardless of success) */
- grpc_closure wrapped_notify;
-
/* args for wrapped_notify */
wrapped_rr_closure_arg wrapped_notify_arg;
} pending_ping;
@@ -251,10 +246,11 @@ static void add_pending_ping(pending_ping **root, grpc_closure *notify) {
pending_ping *pping = gpr_malloc(sizeof(*pping));
memset(pping, 0, sizeof(pending_ping));
memset(&pping->wrapped_notify_arg, 0, sizeof(wrapped_rr_closure_arg));
- pping->next = *root;
- grpc_closure_init(&pping->wrapped_notify, wrapped_rr_closure,
- &pping->wrapped_notify_arg);
pping->wrapped_notify_arg.wrapped_closure = notify;
+ pping->wrapped_notify_arg.free_when_done = pping;
+ pping->next = *root;
+ grpc_closure_init(&pping->wrapped_notify_arg.wrapper_closure,
+ wrapped_rr_closure, &pping->wrapped_notify_arg);
*root = pping;
}
@@ -307,13 +303,6 @@ typedef struct glb_lb_policy {
/** for tracking of the RR connectivity */
rr_connectivity_data *rr_connectivity;
-
- /* a wrapped (see \a wrapped_rr_closure) on-complete closure for readily
- * available RR picks */
- grpc_closure wrapped_on_complete;
-
- /* arguments for the wrapped_on_complete closure */
- wrapped_rr_closure_arg wc_arg;
} glb_lb_policy;
/* Keeps track and reacts to changes in connectivity of the RR instance */
@@ -399,14 +388,14 @@ static grpc_lb_addresses *process_serverlist(
GPR_ARRAY_SIZE(server->load_balance_token) - 1;
grpc_mdstr *lb_token_mdstr = grpc_mdstr_from_buffer(
(uint8_t *)server->load_balance_token, lb_token_size);
- user_data = grpc_mdelem_from_metadata_strings(
- GRPC_MDSTR_LOAD_REPORTING_INITIAL, lb_token_mdstr);
+ user_data = grpc_mdelem_from_metadata_strings(GRPC_MDSTR_LB_TOKEN,
+ lb_token_mdstr);
} else {
gpr_log(GPR_ERROR,
"Missing LB token for backend address '%s'. The empty token will "
"be used instead",
grpc_sockaddr_to_uri((struct sockaddr *)&addr.addr));
- user_data = GRPC_MDELEM_LOAD_REPORTING_INITIAL_EMPTY;
+ user_data = GRPC_MDELEM_LB_TOKEN_EMPTY;
}
grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr, addr.len,
@@ -424,9 +413,43 @@ static void lb_token_destroy(void *token) {
if (token != NULL) GRPC_MDELEM_UNREF(token);
}
-static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx,
- const grpc_grpclb_serverlist *serverlist,
- glb_lb_policy *glb_policy) {
+/* perform a pick over \a rr_policy. Given that a pick can return immediately
+ * (ignoring its completion callback) we need to perform the cleanups this
+ * callback would be otherwise resposible for */
+static bool pick_from_internal_rr_locked(
+ grpc_exec_ctx *exec_ctx, grpc_lb_policy *rr_policy,
+ const grpc_lb_policy_pick_args *pick_args,
+ grpc_connected_subchannel **target, wrapped_rr_closure_arg *wc_arg) {
+ GPR_ASSERT(rr_policy != NULL);
+ const bool pick_done =
+ grpc_lb_policy_pick(exec_ctx, rr_policy, pick_args, target,
+ (void **)&wc_arg->lb_token, &wc_arg->wrapper_closure);
+ if (pick_done) {
+ /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */
+ if (grpc_lb_glb_trace) {
+ gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")",
+ (intptr_t)wc_arg->rr_policy);
+ }
+ GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick");
+
+ /* add the load reporting initial metadata */
+ initial_metadata_add_lb_token(pick_args->initial_metadata,
+ pick_args->lb_token_mdelem_storage,
+ GRPC_MDELEM_REF(wc_arg->lb_token));
+
+ gpr_free(wc_arg);
+ }
+ /* else, the pending pick will be registered and taken care of by the
+ * pending pick list inside the RR policy (glb_policy->rr_policy).
+ * Eventually, wrapped_on_complete will be called, which will -among other
+ * things- add the LB token to the call's initial metadata */
+
+ return pick_done;
+}
+
+static grpc_lb_policy *create_rr_locked(
+ grpc_exec_ctx *exec_ctx, const grpc_grpclb_serverlist *serverlist,
+ glb_lb_policy *glb_policy) {
GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0);
grpc_lb_policy_args args;
@@ -446,18 +469,21 @@ static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx,
return rr;
}
-static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
- grpc_error *error) {
+static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
+ glb_lb_policy *glb_policy, grpc_error *error) {
GPR_ASSERT(glb_policy->serverlist != NULL &&
glb_policy->serverlist->num_servers > 0);
glb_policy->rr_policy =
- create_rr(exec_ctx, glb_policy->serverlist, glb_policy);
+ create_rr_locked(exec_ctx, glb_policy->serverlist, glb_policy);
if (grpc_lb_glb_trace) {
gpr_log(GPR_INFO, "Created RR policy (0x%" PRIxPTR ")",
(intptr_t)glb_policy->rr_policy);
}
GPR_ASSERT(glb_policy->rr_policy != NULL);
+ grpc_pollset_set_add_pollset_set(exec_ctx,
+ glb_policy->rr_policy->interested_parties,
+ glb_policy->base.interested_parties);
glb_policy->rr_connectivity->state = grpc_lb_policy_check_connectivity(
exec_ctx, glb_policy->rr_policy, &error);
grpc_lb_policy_notify_on_state_change(
@@ -478,11 +504,9 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "",
(intptr_t)glb_policy->rr_policy);
}
- grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, &pp->pick_args,
- pp->target,
- (void **)&pp->wrapped_on_complete_arg.lb_token,
- &pp->wrapped_on_complete);
- pp->wrapped_on_complete_arg.owning_pending_node = pp;
+ pick_from_internal_rr_locked(exec_ctx, glb_policy->rr_policy,
+ &pp->pick_args, pp->target,
+ &pp->wrapped_on_complete_arg);
}
pending_ping *pping;
@@ -495,8 +519,7 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
(intptr_t)glb_policy->rr_policy);
}
grpc_lb_policy_ping_one(exec_ctx, glb_policy->rr_policy,
- &pping->wrapped_notify);
- pping->wrapped_notify_arg.owning_pending_node = pping;
+ &pping->wrapped_notify_arg.wrapper_closure);
}
}
@@ -509,13 +532,16 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
if (glb_policy->serverlist != NULL) {
/* a RR policy is shutting down but there's a serverlist available ->
* perform a handover */
- rr_handover(exec_ctx, glb_policy, error);
+ gpr_mu_lock(&glb_policy->mu);
+ rr_handover_locked(exec_ctx, glb_policy, error);
+ gpr_mu_unlock(&glb_policy->mu);
} else {
/* shutting down and no new serverlist available. Bail out. */
gpr_free(rr_conn_data);
}
} else {
if (error == GRPC_ERROR_NONE) {
+ gpr_mu_lock(&glb_policy->mu);
/* RR not shutting down. Mimic the RR's policy state */
grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
rr_conn_data->state, GRPC_ERROR_REF(error),
@@ -524,6 +550,7 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy,
&rr_conn_data->state,
&rr_conn_data->on_change);
+ gpr_mu_unlock(&glb_policy->mu);
} else { /* error */
gpr_free(rr_conn_data);
}
@@ -648,15 +675,15 @@ static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
while (pp != NULL) {
pending_pick *next = pp->next;
*pp->target = NULL;
- grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete, GRPC_ERROR_NONE,
- NULL);
+ grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
+ GRPC_ERROR_NONE, NULL);
pp = next;
}
while (pping != NULL) {
pending_ping *next = pping->next;
- grpc_exec_ctx_sched(exec_ctx, &pping->wrapped_notify, GRPC_ERROR_NONE,
- NULL);
+ grpc_exec_ctx_sched(exec_ctx, &pping->wrapped_notify_arg.wrapper_closure,
+ GRPC_ERROR_NONE, NULL);
pping = next;
}
@@ -686,11 +713,9 @@ static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
while (pp != NULL) {
pending_pick *next = pp->next;
if (pp->target == target) {
- grpc_polling_entity_del_from_pollset_set(
- exec_ctx, pp->pick_args.pollent, glb_policy->base.interested_parties);
*target = NULL;
grpc_exec_ctx_sched(
- exec_ctx, &pp->wrapped_on_complete,
+ exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL);
} else {
pp->next = glb_policy->pending_picks;
@@ -719,10 +744,8 @@ static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pending_pick *next = pp->next;
if ((pp->pick_args.initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
- grpc_polling_entity_del_from_pollset_set(
- exec_ctx, pp->pick_args.pollent, glb_policy->base.interested_parties);
grpc_exec_ctx_sched(
- exec_ctx, &pp->wrapped_on_complete,
+ exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL);
} else {
pp->next = glb_policy->pending_picks;
@@ -775,39 +798,20 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
(intptr_t)glb_policy->rr_policy);
}
GRPC_LB_POLICY_REF(glb_policy->rr_policy, "glb_pick");
- memset(&glb_policy->wc_arg, 0, sizeof(wrapped_rr_closure_arg));
- glb_policy->wc_arg.rr_policy = glb_policy->rr_policy;
- glb_policy->wc_arg.target = target;
- glb_policy->wc_arg.wrapped_closure = on_complete;
- glb_policy->wc_arg.lb_token_mdelem_storage =
- pick_args->lb_token_mdelem_storage;
- glb_policy->wc_arg.initial_metadata = pick_args->initial_metadata;
- glb_policy->wc_arg.owning_pending_node = NULL;
- grpc_closure_init(&glb_policy->wrapped_on_complete, wrapped_rr_closure,
- &glb_policy->wc_arg);
-
- pick_done =
- grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pick_args, target,
- (void **)&glb_policy->wc_arg.lb_token,
- &glb_policy->wrapped_on_complete);
- if (pick_done) {
- /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */
- if (grpc_lb_glb_trace) {
- gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")",
- (intptr_t)glb_policy->wc_arg.rr_policy);
- }
- GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->wc_arg.rr_policy, "glb_pick");
- /* add the load reporting initial metadata */
- initial_metadata_add_lb_token(
- pick_args->initial_metadata, pick_args->lb_token_mdelem_storage,
- GRPC_MDELEM_REF(glb_policy->wc_arg.lb_token));
- }
+ wrapped_rr_closure_arg *wc_arg = gpr_malloc(sizeof(wrapped_rr_closure_arg));
+ memset(wc_arg, 0, sizeof(wrapped_rr_closure_arg));
+
+ grpc_closure_init(&wc_arg->wrapper_closure, wrapped_rr_closure, wc_arg);
+ wc_arg->rr_policy = glb_policy->rr_policy;
+ wc_arg->target = target;
+ wc_arg->wrapped_closure = on_complete;
+ wc_arg->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage;
+ wc_arg->initial_metadata = pick_args->initial_metadata;
+ wc_arg->free_when_done = wc_arg;
+ pick_done = pick_from_internal_rr_locked(exec_ctx, glb_policy->rr_policy,
+ pick_args, target, wc_arg);
} else {
- /* else, the pending pick will be registered and taken care of by the
- * pending pick list inside the RR policy (glb_policy->rr_policy) */
- grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent,
- glb_policy->base.interested_parties);
add_pending_pick(&glb_policy->pending_picks, pick_args, target,
on_complete);
@@ -931,7 +935,7 @@ static lb_client_data *lb_client_data_create(glb_lb_policy *glb_policy) {
/* Note the following LB call progresses every time there's activity in \a
* glb_policy->base.interested_parties, which is comprised of the polling
- * entities passed to glb_pick(). */
+ * entities from \a client_channel. */
lb_client->lb_call = grpc_channel_create_pollset_set_call(
glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS,
glb_policy->base.interested_parties,
@@ -1076,6 +1080,7 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
/* update serverlist */
if (serverlist->num_servers > 0) {
+ gpr_mu_lock(&lb_client->glb_policy->mu);
if (grpc_grpclb_serverlist_equals(lb_client->glb_policy->serverlist,
serverlist)) {
if (grpc_lb_glb_trace) {
@@ -1093,7 +1098,7 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
if (lb_client->glb_policy->rr_policy == NULL) {
/* initial "handover", in this case from a null RR policy, meaning
* it'll just create the first RR policy instance */
- rr_handover(exec_ctx, lb_client->glb_policy, error);
+ rr_handover_locked(exec_ctx, lb_client->glb_policy, error);
} else {
/* unref the RR policy, eventually leading to its substitution with a
* new one constructed from the received serverlist (see
@@ -1101,6 +1106,7 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
GRPC_LB_POLICY_UNREF(exec_ctx, lb_client->glb_policy->rr_policy,
"serverlist_received");
}
+ gpr_mu_unlock(&lb_client->glb_policy->mu);
} else {
if (grpc_lb_glb_trace) {
gpr_log(GPR_INFO,
diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c
index 961a0c9b19..6533327343 100644
--- a/src/core/ext/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/lb_policy/pick_first/pick_first.c
@@ -39,7 +39,6 @@
typedef struct pending_pick {
struct pending_pick *next;
- grpc_polling_entity *pollent;
uint32_t initial_metadata_flags;
grpc_connected_subchannel **target;
grpc_closure *on_complete;
@@ -119,8 +118,6 @@ static void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
while (pp != NULL) {
pending_pick *next = pp->next;
*pp->target = NULL;
- grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
- p->base.interested_parties);
grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL);
gpr_free(pp);
pp = next;
@@ -138,8 +135,6 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
while (pp != NULL) {
pending_pick *next = pp->next;
if (pp->target == target) {
- grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
- p->base.interested_parties);
*target = NULL;
grpc_exec_ctx_sched(
exec_ctx, pp->on_complete,
@@ -168,8 +163,6 @@ static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pending_pick *next = pp->next;
if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
- grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
- p->base.interested_parties);
grpc_exec_ctx_sched(
exec_ctx, pp->on_complete,
GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL);
@@ -229,11 +222,8 @@ static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
if (!p->started_picking) {
start_picking(exec_ctx, p);
}
- grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent,
- p->base.interested_parties);
pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
- pp->pollent = pick_args->pollent;
pp->target = target;
pp->initial_metadata_flags = pick_args->initial_metadata_flags;
pp->on_complete = on_complete;
@@ -319,8 +309,6 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = selected;
- grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
- p->base.interested_parties);
grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL);
gpr_free(pp);
}
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c
index 930fa86aca..9bd3f9da24 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/lb_policy/round_robin/round_robin.c
@@ -78,9 +78,6 @@ int grpc_lb_round_robin_trace = 0;
typedef struct pending_pick {
struct pending_pick *next;
- /* polling entity for the pick()'s async notification */
- grpc_polling_entity *pollent;
-
/* output argument where to store the pick()ed user_data. It'll be NULL if no
* such data is present or there's an error (the definite test for errors is
* \a target being NULL). */
@@ -318,8 +315,6 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
while (pp != NULL) {
pending_pick *next = pp->next;
if (pp->target == target) {
- grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
- p->base.interested_parties);
*target = NULL;
grpc_exec_ctx_sched(
exec_ctx, pp->on_complete,
@@ -348,8 +343,6 @@ static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pending_pick *next = pp->next;
if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
- grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
- p->base.interested_parties);
*pp->target = NULL;
grpc_exec_ctx_sched(
exec_ctx, pp->on_complete,
@@ -403,7 +396,6 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
gpr_mu_lock(&p->mu);
if ((selected = peek_next_connected_locked(p))) {
/* readily available, report right away */
- gpr_mu_unlock(&p->mu);
*target = grpc_subchannel_get_connected_subchannel(selected->subchannel);
if (user_data != NULL) {
@@ -416,17 +408,15 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
/* only advance the last picked pointer if the selection was used */
advance_last_picked_locked(p);
+ gpr_mu_unlock(&p->mu);
return 1;
} else {
/* no pick currently available. Save for later in list of pending picks */
if (!p->started_picking) {
start_picking(exec_ctx, p);
}
- grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent,
- p->base.interested_parties);
pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
- pp->pollent = pick_args->pollent;
pp->target = target;
pp->on_complete = on_complete;
pp->initial_metadata_flags = pick_args->initial_metadata_flags;
@@ -482,8 +472,6 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
"[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)",
(void *)selected->subchannel, (void *)selected);
}
- grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
- p->base.interested_parties);
grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL);
gpr_free(pp);
}
diff --git a/src/core/ext/load_reporting/load_reporting.h b/src/core/ext/load_reporting/load_reporting.h
index e37817d8c2..e13097654d 100644
--- a/src/core/ext/load_reporting/load_reporting.h
+++ b/src/core/ext/load_reporting/load_reporting.h
@@ -37,13 +37,21 @@
#include <grpc/impl/codegen/grpc_types.h>
#include "src/core/lib/channel/channel_stack.h"
-/** Metadata key for initial metadata coming from clients */
-/* TODO(dgq): change to the final value TBD */
-#define GRPC_LOAD_REPORTING_INITIAL_MD_KEY "load-reporting-initial"
+/** Metadata key for the gRPC LB load balancer token.
+ *
+ * The value corresponding to this key is an opaque token that is given to the
+ * frontend as part of each pick; the frontend sends this token to the backend
+ * in each request it sends when using that pick. The token is used by the
+ * backend to verify the request and to allow the backend to report load to the
+ * gRPC LB system. */
+#define GRPC_LB_TOKEN_MD_KEY "lb-token"
-/** Metadata key for trailing metadata from servers */
-/* TODO(dgq): change to the final value TBD */
-#define GRPC_LOAD_REPORTING_TRAILING_MD_KEY "load-reporting-trailing"
+/** Metadata key for gRPC LB cost reporting.
+ *
+ * The value corresponding to this key is an opaque binary blob reported by the
+ * backend as part of its trailing metadata containing cost information for the
+ * call. */
+#define GRPC_LB_COST_MD_KEY "lb-cost"
/** Identifiers for the invocation point of the users LR callback */
typedef enum grpc_load_reporting_source {
diff --git a/src/core/ext/load_reporting/load_reporting_filter.c b/src/core/ext/load_reporting/load_reporting_filter.c
index 394f0cb832..22bf36367f 100644
--- a/src/core/ext/load_reporting/load_reporting_filter.c
+++ b/src/core/ext/load_reporting/load_reporting_filter.c
@@ -75,7 +75,7 @@ static grpc_mdelem *recv_md_filter(void *user_data, grpc_mdelem *md) {
if (md->key == GRPC_MDSTR_PATH) {
calld->service_method = grpc_mdstr_as_c_string(md->value);
- } else if (md->key == GRPC_MDSTR_LOAD_REPORTING_INITIAL) {
+ } else if (md->key == GRPC_MDSTR_LB_TOKEN) {
calld->initial_md_string = gpr_strdup(grpc_mdstr_as_c_string(md->value));
return NULL;
}
@@ -193,7 +193,7 @@ static grpc_mdelem *lr_trailing_md_filter(void *user_data, grpc_mdelem *md) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
- if (md->key == GRPC_MDSTR_LOAD_REPORTING_TRAILING) {
+ if (md->key == GRPC_MDSTR_LB_COST) {
calld->trailing_md_string = gpr_strdup(grpc_mdstr_as_c_string(md->value));
return NULL;
}