aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext
diff options
context:
space:
mode:
authorGravatar Mark D. Roth <roth@google.com>2016-09-16 08:51:01 -0700
committerGravatar Mark D. Roth <roth@google.com>2016-09-16 08:51:01 -0700
commitc5c38782ecc885c372b4337f69c6d27a5c51dd42 (patch)
tree693406a1020ba941ef91eadf8f1b16bb3a77cc5a /src/core/ext
parent46131d849efa3fabace12e1dcd2c17d9f1fee35e (diff)
parentd2f7268bc702d0bf2083b9dedb2ec07544b83dd2 (diff)
Merge remote-tracking branch 'upstream/master' into grpclb_resolver_changes
Diffstat (limited to 'src/core/ext')
-rw-r--r--src/core/ext/client_config/client_channel.c10
-rw-r--r--src/core/ext/client_config/lb_policy.c10
-rw-r--r--src/core/ext/client_config/lb_policy.h60
-rw-r--r--src/core/ext/client_config/lb_policy_factory.c37
-rw-r--r--src/core/ext/client_config/lb_policy_factory.h37
-rw-r--r--src/core/ext/client_config/resolver_result.c24
-rw-r--r--src/core/ext/client_config/resolver_result.h29
-rw-r--r--src/core/ext/lb_policy/grpclb/grpclb.c344
-rw-r--r--src/core/ext/lb_policy/grpclb/load_balancer_api.c5
-rw-r--r--src/core/ext/lb_policy/grpclb/load_balancer_api.h1
-rw-r--r--src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c8
-rw-r--r--src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h30
-rw-r--r--src/core/ext/lb_policy/pick_first/pick_first.c17
-rw-r--r--src/core/ext/lb_policy/round_robin/round_robin.c89
-rw-r--r--src/core/ext/resolver/dns/native/dns_resolver.c10
-rw-r--r--src/core/ext/resolver/sockaddr/sockaddr_resolver.c9
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c111
17 files changed, 556 insertions, 275 deletions
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c
index 61e012578e..58ef629be0 100644
--- a/src/core/ext/client_config/client_channel.c
+++ b/src/core/ext/client_config/client_channel.c
@@ -394,6 +394,8 @@ typedef struct client_channel_call_data {
grpc_closure next_step;
grpc_call_stack *owning_call;
+
+ grpc_linked_mdelem lb_token_mdelem;
} call_data;
static void add_waiting_locked(call_data *calld, grpc_transport_stream_op *op) {
@@ -566,9 +568,11 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
int r;
GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel");
gpr_mu_unlock(&chand->mu);
- r = grpc_lb_policy_pick(exec_ctx, lb_policy, calld->pollent,
- initial_metadata, initial_metadata_flags,
- connected_subchannel, on_ready);
+ const grpc_lb_policy_pick_args inputs = {calld->pollent, initial_metadata,
+ initial_metadata_flags,
+ &calld->lb_token_mdelem};
+ r = grpc_lb_policy_pick(exec_ctx, lb_policy, &inputs, connected_subchannel,
+ NULL, on_ready);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel");
GPR_TIMER_END("pick_subchannel", 0);
return r;
diff --git a/src/core/ext/client_config/lb_policy.c b/src/core/ext/client_config/lb_policy.c
index 8b980b2cca..903563ef6b 100644
--- a/src/core/ext/client_config/lb_policy.c
+++ b/src/core/ext/client_config/lb_policy.c
@@ -100,13 +100,11 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx,
}
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_polling_entity *pollent,
- grpc_metadata_batch *initial_metadata,
- uint32_t initial_metadata_flags,
- grpc_connected_subchannel **target,
+ const grpc_lb_policy_pick_args *pick_args,
+ grpc_connected_subchannel **target, void **user_data,
grpc_closure *on_complete) {
- return policy->vtable->pick(exec_ctx, policy, pollent, initial_metadata,
- initial_metadata_flags, target, on_complete);
+ return policy->vtable->pick(exec_ctx, policy, pick_args, target, user_data,
+ on_complete);
}
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
diff --git a/src/core/ext/client_config/lb_policy.h b/src/core/ext/client_config/lb_policy.h
index a2f5446fc6..37c93d707c 100644
--- a/src/core/ext/client_config/lb_policy.h
+++ b/src/core/ext/client_config/lb_policy.h
@@ -53,23 +53,38 @@ struct grpc_lb_policy {
grpc_pollset_set *interested_parties;
};
+/** Extra arguments for an LB pick */
+typedef struct grpc_lb_policy_pick_args {
+ /** Parties interested in the pick's progress */
+ grpc_polling_entity *pollent;
+ /** Initial metadata associated with the picking call. */
+ grpc_metadata_batch *initial_metadata;
+ /** See \a GRPC_INITIAL_METADATA_* in grpc_types.h */
+ uint32_t initial_metadata_flags;
+ /** Storage for LB token in \a initial_metadata, or NULL if not used */
+ grpc_linked_mdelem *lb_token_mdelem_storage;
+} grpc_lb_policy_pick_args;
+
struct grpc_lb_policy_vtable {
void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
-
void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
- /** implement grpc_lb_policy_pick */
+ /** \see grpc_lb_policy_pick */
int (*pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_polling_entity *pollent,
- grpc_metadata_batch *initial_metadata,
- uint32_t initial_metadata_flags,
- grpc_connected_subchannel **target, grpc_closure *on_complete);
+ const grpc_lb_policy_pick_args *pick_args,
+ grpc_connected_subchannel **target, void **user_data,
+ grpc_closure *on_complete);
+
+ /** \see grpc_lb_policy_cancel_pick */
void (*cancel_pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_connected_subchannel **target);
+
+ /** \see grpc_lb_policy_cancel_picks */
void (*cancel_picks)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq);
+ /** \see grpc_lb_policy_ping_one */
void (*ping_one)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_closure *closure);
@@ -83,8 +98,7 @@ struct grpc_lb_policy_vtable {
/** call notify when the connectivity state of a channel changes from *state.
Updates *state with the new state of the policy. Calling with a NULL \a
- state cancels the subscription.
- */
+ state cancels the subscription. */
void (*notify_on_state_change)(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *policy,
grpc_connectivity_state *state,
@@ -124,26 +138,34 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
void grpc_lb_policy_init(grpc_lb_policy *policy,
const grpc_lb_policy_vtable *vtable);
-/** Given initial metadata in \a initial_metadata, find an appropriate
- target for this rpc, and 'return' it by calling \a on_complete after setting
- \a target.
- Picking can be asynchronous. Any IO should be done under \a pollent. */
+/** Find an appropriate target for this call, based on \a pick_args.
+ Picking can be synchronous or asynchronous. In the synchronous case, when a
+ pick is readily available, it'll be returned in \a target and a non-zero
+ value will be returned.
+ In the asynchronous case, zero is returned and \a on_complete will be called
+ once \a target and \a user_data have been set. Any IO should be done under
+ \a pick_args->pollent. The opaque \a user_data output argument corresponds
+ to information that may need be propagated from the LB policy. It may be
+ NULL. Errors are signaled by receiving a NULL \a *target. */
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_polling_entity *pollent,
- grpc_metadata_batch *initial_metadata,
- uint32_t initial_metadata_flags,
- grpc_connected_subchannel **target,
+ const grpc_lb_policy_pick_args *pick_args,
+ grpc_connected_subchannel **target, void **user_data,
grpc_closure *on_complete);
+/** Perform a connected subchannel ping (see \a grpc_connected_subchannel_ping)
+ against one of the connected subchannels managed by \a policy. */
void grpc_lb_policy_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_closure *closure);
+/** Cancel picks for \a target.
+ The \a on_complete callback of the pending picks will be invoked with \a
+ *target set to NULL. */
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_connected_subchannel **target);
-/** Cancel all pending picks which have:
- (initial_metadata_flags & initial_metadata_flags_mask) ==
- initial_metadata_flags_eq */
+/** Cancel all pending picks for which their \a initial_metadata_flags (as given
+ in the call to \a grpc_lb_policy_pick) matches \a initial_metadata_flags_eq
+ when AND'd with \a initial_metadata_flags_mask */
void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *policy,
uint32_t initial_metadata_flags_mask,
diff --git a/src/core/ext/client_config/lb_policy_factory.c b/src/core/ext/client_config/lb_policy_factory.c
index 70e46ef3cf..e5e416ced9 100644
--- a/src/core/ext/client_config/lb_policy_factory.c
+++ b/src/core/ext/client_config/lb_policy_factory.c
@@ -31,8 +31,45 @@
*
*/
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+
#include "src/core/ext/client_config/lb_policy_factory.h"
+grpc_lb_addresses* grpc_lb_addresses_create(size_t num_addresses) {
+ grpc_lb_addresses* addresses = gpr_malloc(sizeof(grpc_lb_addresses));
+ addresses->num_addresses = num_addresses;
+ const size_t addresses_size = sizeof(grpc_lb_address) * num_addresses;
+ addresses->addresses = gpr_malloc(addresses_size);
+ memset(addresses->addresses, 0, addresses_size);
+ return addresses;
+}
+
+void grpc_lb_addresses_set_address(grpc_lb_addresses* addresses, size_t index,
+ void* address, size_t address_len,
+ bool is_balancer, char* balancer_name,
+ void* user_data) {
+ GPR_ASSERT(index < addresses->num_addresses);
+ grpc_lb_address* target = &addresses->addresses[index];
+ memcpy(target->address.addr, address, address_len);
+ target->address.len = address_len;
+ target->is_balancer = is_balancer;
+ target->balancer_name = balancer_name;
+ target->user_data = user_data;
+}
+
+void grpc_lb_addresses_destroy(grpc_lb_addresses* addresses,
+ void (*user_data_destroy)(void*)) {
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ gpr_free(addresses->addresses[i].balancer_name);
+ if (user_data_destroy != NULL)
+ user_data_destroy(addresses->addresses[i].user_data);
+ }
+ gpr_free(addresses->addresses);
+ gpr_free(addresses);
+}
+
void grpc_lb_policy_factory_ref(grpc_lb_policy_factory* factory) {
factory->vtable->ref(factory);
}
diff --git a/src/core/ext/client_config/lb_policy_factory.h b/src/core/ext/client_config/lb_policy_factory.h
index 6919b1eb98..41db806241 100644
--- a/src/core/ext/client_config/lb_policy_factory.h
+++ b/src/core/ext/client_config/lb_policy_factory.h
@@ -47,8 +47,43 @@ struct grpc_lb_policy_factory {
const grpc_lb_policy_factory_vtable *vtable;
};
+/** A resolved address alongside any LB related information associated with it.
+ * \a user_data, if not NULL, contains opaque data meant to be consumed by the
+ * gRPC LB policy. Note that no all LB policies support \a user_data as input.
+ * Those who don't will simply ignore it and will correspondingly return NULL in
+ * their namesake pick() output argument. */
+typedef struct grpc_lb_address {
+ grpc_resolved_address address;
+ bool is_balancer;
+ char *balancer_name; /* For secure naming. */
+ void *user_data;
+} grpc_lb_address;
+
+typedef struct grpc_lb_addresses {
+ size_t num_addresses;
+ grpc_lb_address *addresses;
+} grpc_lb_addresses;
+
+/** Returns a grpc_addresses struct with enough space for
+ * \a num_addresses addresses. */
+grpc_lb_addresses* grpc_lb_addresses_create(size_t num_addresses);
+
+/** Sets the value of the address at index \a index of \a addresses.
+ * \a address is a socket address of length \a address_len.
+ * Takes ownership of \a balancer_name. */
+void grpc_lb_addresses_set_address(grpc_lb_addresses *addresses, size_t index,
+ void *address, size_t address_len,
+ bool is_balancer, char* balancer_name,
+ void *user_data);
+
+/** Destroys \a addresses. If \a user_data_destroy is not NULL, it will
+ * be invoked to destroy the \a user_data field of each address. */
+void grpc_lb_addresses_destroy(grpc_lb_addresses* addresses,
+ void (*user_data_destroy)(void*));
+
+/** Arguments passed to LB policies. */
typedef struct grpc_lb_policy_args {
- grpc_addresses *addresses;
+ grpc_lb_addresses *addresses;
grpc_client_channel_factory *client_channel_factory;
} grpc_lb_policy_args;
diff --git a/src/core/ext/client_config/resolver_result.c b/src/core/ext/client_config/resolver_result.c
index 68f6f5bfb6..242989a0da 100644
--- a/src/core/ext/client_config/resolver_result.c
+++ b/src/core/ext/client_config/resolver_result.c
@@ -35,30 +35,6 @@
#include <grpc/support/alloc.h>
-grpc_addresses* grpc_addresses_create(size_t num_addresses) {
- grpc_addresses* addresses = gpr_malloc(sizeof(grpc_addresses));
- addresses->num_addresses = num_addresses;
- const size_t addresses_size = sizeof(grpc_address) * num_addresses;
- addresses->addresses = gpr_malloc(addresses_size);
- memset(addresses->addresses, 0, addresses_size);
- return addresses;
-}
-
-void grpc_addresses_set_address(grpc_addresses* addresses, size_t index,
- void* address, size_t address_len,
- bool is_balancer) {
- GPR_ASSERT(index < addresses->num_addresses);
- grpc_address* target = &addresses->addresses[index];
- memcpy(target->address.addr, address, address_len);
- target->address.len = address_len;
- target->is_balancer = is_balancer;
-}
-
-void grpc_addresses_destroy(grpc_addresses* addresses) {
- gpr_free(addresses->addresses);
- gpr_free(addresses);
-}
-
struct grpc_resolver_result {
gpr_refcount refs;
grpc_lb_policy* lb_policy;
diff --git a/src/core/ext/client_config/resolver_result.h b/src/core/ext/client_config/resolver_result.h
index ff6fa74f11..5a69d81990 100644
--- a/src/core/ext/client_config/resolver_result.h
+++ b/src/core/ext/client_config/resolver_result.h
@@ -37,38 +37,17 @@
#include "src/core/ext/client_config/lb_policy.h"
#include "src/core/lib/iomgr/resolve_address.h"
-/// Used to represent addresses returned by the resolver.
-typedef struct grpc_address {
- grpc_resolved_address address;
- bool is_balancer;
-} grpc_address;
-
-typedef struct grpc_addresses {
- size_t num_addresses;
- grpc_address* addresses;
-} grpc_addresses;
-
-/// Returns a grpc_addresses struct with enough space for
-/// \a num_addresses addresses.
-grpc_addresses* grpc_addresses_create(size_t num_addresses);
-
-void grpc_addresses_set_address(grpc_addresses* addresses, size_t index,
- void* address, size_t address_len,
- bool is_balancer);
-
-void grpc_addresses_destroy(grpc_addresses* addresses);
-
/// Results reported from a grpc_resolver.
typedef struct grpc_resolver_result grpc_resolver_result;
grpc_resolver_result* grpc_resolver_result_create();
-void grpc_resolver_result_ref(grpc_resolver_result* client_config);
+void grpc_resolver_result_ref(grpc_resolver_result* result);
void grpc_resolver_result_unref(grpc_exec_ctx* exec_ctx,
- grpc_resolver_result* client_config);
+ grpc_resolver_result* result);
-void grpc_resolver_result_set_lb_policy(grpc_resolver_result* client_config,
+void grpc_resolver_result_set_lb_policy(grpc_resolver_result* result,
grpc_lb_policy* lb_policy);
grpc_lb_policy* grpc_resolver_result_get_lb_policy(
- grpc_resolver_result* client_config);
+ grpc_resolver_result* result);
#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_RESULT_H */
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c
index 61721a4a9e..a9b6f8d9e4 100644
--- a/src/core/ext/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/lb_policy/grpclb/grpclb.c
@@ -76,9 +76,9 @@
* operations in progress over the old RR instance. This is done by
* decreasing the reference count on the old policy. The moment no more
* references are held on the old RR policy, it'll be destroyed and \a
- * rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN state.
- * At this point we can transition to a new RR instance safely, which is done
- * once again via \a rr_handover().
+ * glb_rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN
+ * state. At this point we can transition to a new RR instance safely, which
+ * is done once again via \a rr_handover().
*
*
* Once a RR policy instance is in place (and getting updated as described),
@@ -96,6 +96,8 @@
* - Implement LB service forwarding (point 2c. in the doc's diagram).
*/
+#include <errno.h>
+
#include <string.h>
#include <grpc/byte_buffer_reader.h>
@@ -105,22 +107,50 @@
#include <grpc/support/string_util.h>
#include "src/core/ext/client_config/client_channel_factory.h"
+#include "src/core/ext/client_config/lb_policy_factory.h"
#include "src/core/ext/client_config/lb_policy_registry.h"
#include "src/core/ext/client_config/parse_address.h"
#include "src/core/ext/lb_policy/grpclb/grpclb.h"
#include "src/core/ext/lb_policy/grpclb/load_balancer_api.h"
+#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/channel.h"
+#include "src/core/lib/transport/static_metadata.h"
int grpc_lb_glb_trace = 0;
+/* add lb_token of selected subchannel (address) to the call's initial
+ * metadata */
+static void initial_metadata_add_lb_token(
+ grpc_metadata_batch *initial_metadata,
+ grpc_linked_mdelem *lb_token_mdelem_storage, grpc_mdelem *lb_token) {
+ GPR_ASSERT(lb_token_mdelem_storage != NULL);
+ GPR_ASSERT(lb_token != NULL);
+ grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage,
+ lb_token);
+}
+
typedef struct wrapped_rr_closure_arg {
/* the original closure. Usually a on_complete/notify cb for pick() and ping()
* calls against the internal RR instance, respectively. */
grpc_closure *wrapped_closure;
+ /* the pick's initial metadata, kept in order to append the LB token for the
+ * pick */
+ grpc_metadata_batch *initial_metadata;
+
+ /* the picked target, used to determine which LB token to add to the pick's
+ * initial metadata */
+ grpc_connected_subchannel **target;
+
+ /* the LB token associated with the pick */
+ grpc_mdelem *lb_token;
+
+ /* storage for the lb token initial metadata mdelem */
+ grpc_linked_mdelem *lb_token_mdelem_storage;
+
/* The RR instance related to the closure */
grpc_lb_policy *rr_policy;
@@ -143,6 +173,11 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "wrapped_rr_closure");
}
GPR_ASSERT(wc_arg->wrapped_closure != NULL);
+
+ initial_metadata_add_lb_token(wc_arg->initial_metadata,
+ wc_arg->lb_token_mdelem_storage,
+ GRPC_MDELEM_REF(wc_arg->lb_token));
+
grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, error, NULL);
gpr_free(wc_arg->owning_pending_node);
}
@@ -164,6 +199,9 @@ typedef struct pending_pick {
/* the initial metadata for the pick. See grpc_lb_policy_pick() */
grpc_metadata_batch *initial_metadata;
+ /* storage for the lb token initial metadata mdelem */
+ grpc_linked_mdelem *lb_token_mdelem_storage;
+
/* bitmask passed to pick() and used for selective cancelling. See
* grpc_lb_policy_cancel_picks() */
uint32_t initial_metadata_flags;
@@ -180,20 +218,23 @@ typedef struct pending_pick {
wrapped_rr_closure_arg wrapped_on_complete_arg;
} pending_pick;
-static void add_pending_pick(pending_pick **root, grpc_polling_entity *pollent,
- grpc_metadata_batch *initial_metadata,
- uint32_t initial_metadata_flags,
+static void add_pending_pick(pending_pick **root,
+ const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target,
grpc_closure *on_complete) {
pending_pick *pp = gpr_malloc(sizeof(*pp));
memset(pp, 0, sizeof(pending_pick));
memset(&pp->wrapped_on_complete_arg, 0, sizeof(wrapped_rr_closure_arg));
pp->next = *root;
- pp->pollent = pollent;
+ pp->pollent = pick_args->pollent;
pp->target = target;
- pp->initial_metadata = initial_metadata;
- pp->initial_metadata_flags = initial_metadata_flags;
+ pp->initial_metadata = pick_args->initial_metadata;
+ pp->initial_metadata_flags = pick_args->initial_metadata_flags;
+ pp->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage;
pp->wrapped_on_complete_arg.wrapped_closure = on_complete;
+ pp->wrapped_on_complete_arg.initial_metadata = pick_args->initial_metadata;
+ pp->wrapped_on_complete_arg.lb_token_mdelem_storage =
+ pick_args->lb_token_mdelem_storage;
grpc_closure_init(&pp->wrapped_on_complete, wrapped_rr_closure,
&pp->wrapped_on_complete_arg);
*root = pp;
@@ -279,58 +320,127 @@ struct rr_connectivity_data {
glb_lb_policy *glb_policy;
};
-static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx,
- const grpc_grpclb_serverlist *serverlist,
- glb_lb_policy *glb_policy) {
- /* TODO(dgq): support mixed ip version */
- GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0);
- char **host_ports = gpr_malloc(sizeof(char *) * serverlist->num_servers);
- for (size_t i = 0; i < serverlist->num_servers; ++i) {
- gpr_join_host_port(&host_ports[i], serverlist->servers[i]->ip_address,
- serverlist->servers[i]->port);
+static bool is_server_valid(const grpc_grpclb_server *server, size_t idx,
+ bool log) {
+ const grpc_grpclb_ip_address *ip = &server->ip_address;
+ if (server->port >> 16 != 0) {
+ if (log) {
+ gpr_log(GPR_ERROR,
+ "Invalid port '%d' at index %zu of serverlist. Ignoring.",
+ server->port, idx);
+ }
+ return false;
}
- size_t uri_path_len;
- char *concat_ipports = gpr_strjoin_sep(
- (const char **)host_ports, serverlist->num_servers, ",", &uri_path_len);
+ if (ip->size != 4 && ip->size != 16) {
+ if (log) {
+ gpr_log(GPR_ERROR,
+ "Expected IP to be 4 or 16 bytes, got %d at index %zu of "
+ "serverlist. Ignoring",
+ ip->size, idx);
+ }
+ return false;
+ }
+ return true;
+}
- grpc_lb_policy_args args;
- args.client_channel_factory = glb_policy->cc_factory;
- args.addresses = grpc_addresses_create(serverlist->num_servers);
- size_t out_addrs_idx = 0;
+/* populate \a addresses according to \a serverlist. Returns the number of
+ * addresses successfully parsed and added to \a addresses */
+static grpc_lb_addresses* process_serverlist(
+ const grpc_grpclb_serverlist *serverlist) {
+ size_t num_valid = 0;
+ /* first pass: count how many are valid in order to allocate the necessary
+ * memory in a single block */
for (size_t i = 0; i < serverlist->num_servers; ++i) {
- grpc_uri uri;
- struct sockaddr_storage sa;
- size_t sa_len;
- uri.path = host_ports[i];
- if (parse_ipv4(&uri, &sa, &sa_len)) { /* TODO(dgq): add support for ipv6 */
- /* These are, of course, actually balancer addresses. However, we
- * want the round_robin LB policy to treat them as normal backend
- * addresses, since we don't need to talk to balancers in order to
- * find the balancers themselves, so we set is_balancer=false. */
- grpc_addresses_set_address(args.addresses, out_addrs_idx, &sa, sa_len,
- false /* is_balancer */);
- ++out_addrs_idx;
+ if (is_server_valid(serverlist->servers[i], i, true)) ++num_valid;
+ }
+ if (num_valid == 0) return NULL;
+
+ grpc_lb_addresses *lb_addresses = grpc_lb_addresses_create(num_valid);
+
+ /* second pass: actually populate the addresses and LB tokens (aka user data
+ * to the outside world) to be read by the RR policy during its creation.
+ * Given that the validity tests are very cheap, they are performed again
+ * instead of marking the valid ones during the first pass, as this would
+ * incurr in an allocation due to the arbitrary number of server */
+ size_t addr_idx = 0;
+ for (size_t sl_idx = 0; sl_idx < serverlist->num_servers; ++sl_idx) {
+ GPR_ASSERT(addr_idx < num_valid);
+ const grpc_grpclb_server *server = serverlist->servers[sl_idx];
+ if (!is_server_valid(serverlist->servers[sl_idx], sl_idx, false)) continue;
+
+ /* address processing */
+ const uint16_t netorder_port = htons((uint16_t)server->port);
+ /* the addresses are given in binary format (a in(6)_addr struct) in
+ * server->ip_address.bytes. */
+ const grpc_grpclb_ip_address *ip = &server->ip_address;
+ grpc_resolved_address addr;
+ memset(&addr, 0, sizeof(addr));
+ if (ip->size == 4) {
+ addr.len = sizeof(struct sockaddr_in);
+ struct sockaddr_in *addr4 = (struct sockaddr_in *)&addr.addr;
+ addr4->sin_family = AF_INET;
+ memcpy(&addr4->sin_addr, ip->bytes, ip->size);
+ addr4->sin_port = netorder_port;
+ } else if (ip->size == 16) {
+ addr.len = sizeof(struct sockaddr_in6);
+ struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)&addr.addr;
+ addr6->sin6_family = AF_INET;
+ memcpy(&addr6->sin6_addr, ip->bytes, ip->size);
+ addr6->sin6_port = netorder_port;
+ }
+
+ /* lb token processing */
+ void* user_data;
+ if (server->has_load_balance_token) {
+ const size_t lb_token_size =
+ GPR_ARRAY_SIZE(server->load_balance_token) - 1;
+ grpc_mdstr *lb_token_mdstr = grpc_mdstr_from_buffer(
+ (uint8_t *)server->load_balance_token, lb_token_size);
+ user_data = grpc_mdelem_from_metadata_strings(
+ GRPC_MDSTR_LOAD_REPORTING_INITIAL, lb_token_mdstr);
} else {
- gpr_log(GPR_ERROR, "Invalid LB service address '%s', ignoring.",
- host_ports[i]);
+ gpr_log(GPR_ERROR,
+ "Missing LB token for backend address '%s'. The empty token will "
+ "be used instead",
+ grpc_sockaddr_to_uri((struct sockaddr *)&addr.addr));
+ user_data = GRPC_MDELEM_LOAD_REPORTING_INITIAL_EMPTY;
}
+
+ grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr,
+ addr.len, false /* is_balancer */,
+ NULL /* balancer_name */, user_data);
+ ++addr_idx;
}
+ GPR_ASSERT(addr_idx == num_valid);
+
+ return lb_addresses;
+}
+
+static void lb_token_destroy(void* token) {
+ if (token != NULL) GRPC_MDELEM_UNREF(token);
+}
+
+static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx,
+ const grpc_grpclb_serverlist *serverlist,
+ glb_lb_policy *glb_policy) {
+ GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0);
+
+ grpc_lb_policy_args args;
+ memset(&args, 0, sizeof(args));
+ args.client_channel_factory = glb_policy->cc_factory;
+ args.addresses = process_serverlist(serverlist);
grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args);
- gpr_free(concat_ipports);
- for (size_t i = 0; i < serverlist->num_servers; i++) {
- gpr_free(host_ports[i]);
- }
- gpr_free(host_ports);
- grpc_addresses_destroy(args.addresses);
+ grpc_lb_addresses_destroy(args.addresses, lb_token_destroy);
return rr;
}
static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
grpc_error *error) {
- GRPC_ERROR_REF(error);
+ GPR_ASSERT(glb_policy->serverlist != NULL &&
+ glb_policy->serverlist->num_servers > 0);
glb_policy->rr_policy =
create_rr(exec_ctx, glb_policy->serverlist, glb_policy);
@@ -345,8 +455,8 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
exec_ctx, glb_policy->rr_policy, &glb_policy->rr_connectivity->state,
&glb_policy->rr_connectivity->on_change);
grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
- glb_policy->rr_connectivity->state, error,
- "rr_handover");
+ glb_policy->rr_connectivity->state,
+ GRPC_ERROR_REF(error), "rr_handover");
grpc_lb_policy_exit_idle(exec_ctx, glb_policy->rr_policy);
/* flush pending ops */
@@ -359,9 +469,12 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "",
(intptr_t)glb_policy->rr_policy);
}
- grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pp->pollent,
- pp->initial_metadata, pp->initial_metadata_flags,
- pp->target, &pp->wrapped_on_complete);
+ const grpc_lb_policy_pick_args pick_args = {
+ pp->pollent, pp->initial_metadata, pp->initial_metadata_flags,
+ pp->lb_token_mdelem_storage};
+ grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, &pick_args, pp->target,
+ (void **)&pp->wrapped_on_complete_arg.lb_token,
+ &pp->wrapped_on_complete);
pp->wrapped_on_complete_arg.owning_pending_node = pp;
}
@@ -378,13 +491,13 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
&pping->wrapped_notify);
pping->wrapped_notify_arg.owning_pending_node = pping;
}
- GRPC_ERROR_UNREF(error);
}
-static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
+static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
rr_connectivity_data *rr_conn_data = arg;
glb_lb_policy *glb_policy = rr_conn_data->glb_policy;
+
if (rr_conn_data->state == GRPC_CHANNEL_SHUTDOWN) {
if (glb_policy->serverlist != NULL) {
/* a RR policy is shutting down but there's a serverlist available ->
@@ -398,8 +511,8 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
if (error == GRPC_ERROR_NONE) {
/* RR not shutting down. Mimic the RR's policy state */
grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
- rr_conn_data->state, error,
- "rr_connectivity_changed");
+ rr_conn_data->state, GRPC_ERROR_REF(error),
+ "glb_rr_connectivity_changed");
/* resubscribe */
grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy,
&rr_conn_data->state,
@@ -408,7 +521,6 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
gpr_free(rr_conn_data);
}
}
- GRPC_ERROR_UNREF(error);
}
static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
@@ -431,27 +543,36 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
memset(glb_policy, 0, sizeof(*glb_policy));
/* All input addresses in args->addresses come from a resolver that claims
- * they are LB services. It's the resolver's responsibility to make sure this
+ * they are LB services. It's the resolver's responsibility to make sure
+ * this
* policy is only instantiated and used in that case.
*
* Create a client channel over them to communicate with a LB service */
glb_policy->cc_factory = args->client_channel_factory;
GPR_ASSERT(glb_policy->cc_factory != NULL);
- /* construct a target from the args->addresses, in the form
+ /* construct a target from the addresses in args, given in the form
* ipvX://ip1:port1,ip2:port2,...
* TODO(dgq): support mixed ip version */
char **addr_strs = gpr_malloc(sizeof(char *) * num_grpclb_addrs);
- addr_strs[0] = grpc_sockaddr_to_uri(
- (const struct sockaddr *)&args->addresses->addresses[0].address.addr);
- size_t addr_index = 1;
- for (size_t i = 1; i < args->addresses->num_addresses; i++) {
+ size_t addr_index = 0;
+ for (size_t i = 0; i < args->addresses->num_addresses; i++) {
+ if (args->addresses->addresses[i].user_data != NULL) {
+ gpr_log(GPR_ERROR,
+ "This LB policy doesn't support user data. It will be ignored");
+ }
if (args->addresses->addresses[i].is_balancer) {
- GPR_ASSERT(grpc_sockaddr_to_string(
- &addr_strs[addr_index++],
- (const struct sockaddr *)&args->addresses->addresses[i]
- .address.addr,
- true) == 0);
+ if (addr_index == 0) {
+ addr_strs[addr_index++] = grpc_sockaddr_to_uri(
+ (const struct sockaddr *)&args->addresses->addresses[i]
+ .address.addr);
+ } else {
+ GPR_ASSERT(grpc_sockaddr_to_string(
+ &addr_strs[addr_index++],
+ (const struct sockaddr *)&args->addresses->addresses[i]
+ .address.addr,
+ true) == 0);
+ }
}
}
size_t uri_path_len;
@@ -477,7 +598,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
rr_connectivity_data *rr_connectivity =
gpr_malloc(sizeof(rr_connectivity_data));
memset(rr_connectivity, 0, sizeof(rr_connectivity_data));
- grpc_closure_init(&rr_connectivity->on_change, rr_connectivity_changed,
+ grpc_closure_init(&rr_connectivity->on_change, glb_rr_connectivity_changed,
rr_connectivity);
rr_connectivity->glb_policy = glb_policy;
glb_policy->rr_connectivity = rr_connectivity;
@@ -560,7 +681,6 @@ static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
*target = NULL;
grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete,
GRPC_ERROR_CANCELLED, NULL);
- gpr_free(pp);
} else {
pp->next = glb_policy->pending_picks;
glb_policy->pending_picks = pp;
@@ -590,7 +710,6 @@ static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
exec_ctx, pp->pollent, glb_policy->base.interested_parties);
grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete,
GRPC_ERROR_CANCELLED, NULL);
- gpr_free(pp);
} else {
pp->next = glb_policy->pending_picks;
glb_policy->pending_picks = pp;
@@ -617,12 +736,21 @@ static void glb_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_polling_entity *pollent,
- grpc_metadata_batch *initial_metadata,
- uint32_t initial_metadata_flags,
- grpc_connected_subchannel **target,
+ const grpc_lb_policy_pick_args *pick_args,
+ grpc_connected_subchannel **target, void **user_data,
grpc_closure *on_complete) {
glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
+
+ if (pick_args->lb_token_mdelem_storage == NULL) {
+ *target = NULL;
+ grpc_exec_ctx_sched(
+ exec_ctx, on_complete,
+ GRPC_ERROR_CREATE("No mdelem storage for the LB token. Load reporting "
+ "won't work without it. Failing"),
+ NULL);
+ return 1;
+ }
+
gpr_mu_lock(&glb_policy->mu);
int r;
@@ -635,28 +763,34 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
memset(&glb_policy->wc_arg, 0, sizeof(wrapped_rr_closure_arg));
glb_policy->wc_arg.rr_policy = glb_policy->rr_policy;
glb_policy->wc_arg.wrapped_closure = on_complete;
+ glb_policy->wc_arg.lb_token_mdelem_storage =
+ pick_args->lb_token_mdelem_storage;
+ glb_policy->wc_arg.initial_metadata = pick_args->initial_metadata;
+ glb_policy->wc_arg.owning_pending_node = NULL;
grpc_closure_init(&glb_policy->wrapped_on_complete, wrapped_rr_closure,
&glb_policy->wc_arg);
- r = grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pollent,
- initial_metadata, initial_metadata_flags, target,
+
+ r = grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pick_args, target,
+ (void **)&glb_policy->wc_arg.lb_token,
&glb_policy->wrapped_on_complete);
if (r != 0) {
- /* the call to grpc_lb_policy_pick has been sychronous. Unreffing the RR
- * policy and notify the original callback */
- glb_policy->wc_arg.wrapped_closure = NULL;
+ /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */
if (grpc_lb_glb_trace) {
gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")",
(intptr_t)glb_policy->wc_arg.rr_policy);
}
GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->wc_arg.rr_policy, "glb_pick");
- grpc_exec_ctx_sched(exec_ctx, glb_policy->wc_arg.wrapped_closure,
- GRPC_ERROR_NONE, NULL);
+
+ /* add the load reporting initial metadata */
+ initial_metadata_add_lb_token(
+ pick_args->initial_metadata, pick_args->lb_token_mdelem_storage,
+ GRPC_MDELEM_REF(glb_policy->wc_arg.lb_token));
}
} else {
- grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent,
+ grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent,
glb_policy->base.interested_parties);
- add_pending_pick(&glb_policy->pending_picks, pollent, initial_metadata,
- initial_metadata_flags, target, on_complete);
+ add_pending_pick(&glb_policy->pending_picks, pick_args, target,
+ on_complete);
if (!glb_policy->started_picking) {
start_picking(exec_ctx, glb_policy);
@@ -716,9 +850,6 @@ typedef struct lb_client_data {
/* called once initial metadata's been sent */
grpc_closure md_sent;
- /* called once initial metadata's been received */
- grpc_closure md_rcvd;
-
/* called once the LoadBalanceRequest has been sent to the LB server. See
* src/proto/grpc/.../load_balancer.proto */
grpc_closure req_sent;
@@ -755,7 +886,6 @@ typedef struct lb_client_data {
} lb_client_data;
static void md_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
-static void md_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
static void close_sent_cb(grpc_exec_ctx *exec_ctx, void *arg,
@@ -770,7 +900,6 @@ static lb_client_data *lb_client_data_create(glb_lb_policy *glb_policy) {
gpr_mu_init(&lb_client->mu);
grpc_closure_init(&lb_client->md_sent, md_sent_cb, lb_client);
- grpc_closure_init(&lb_client->md_rcvd, md_recv_cb, lb_client);
grpc_closure_init(&lb_client->req_sent, req_sent_cb, lb_client);
grpc_closure_init(&lb_client->res_rcvd, res_recv_cb, lb_client);
grpc_closure_init(&lb_client->close_sent, close_sent_cb, lb_client);
@@ -869,23 +998,6 @@ static void md_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_op ops[1];
memset(ops, 0, sizeof(ops));
grpc_op *op = ops;
- op->op = GRPC_OP_RECV_INITIAL_METADATA;
- op->data.recv_initial_metadata = &lb_client->initial_metadata_recv;
- op->flags = 0;
- op->reserved = NULL;
- op++;
- grpc_call_error call_error = grpc_call_start_batch_and_execute(
- exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops),
- &lb_client->md_rcvd);
- GPR_ASSERT(GRPC_CALL_OK == call_error);
-}
-
-static void md_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
- lb_client_data *lb_client = arg;
- GPR_ASSERT(lb_client->lb_call);
- grpc_op ops[1];
- memset(ops, 0, sizeof(ops));
- grpc_op *op = ops;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message = lb_client->request_payload;
@@ -900,11 +1012,18 @@ static void md_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
lb_client_data *lb_client = arg;
+ GPR_ASSERT(lb_client->lb_call);
- grpc_op ops[1];
+ grpc_op ops[2];
memset(ops, 0, sizeof(ops));
grpc_op *op = ops;
+ op->op = GRPC_OP_RECV_INITIAL_METADATA;
+ op->data.recv_initial_metadata = &lb_client->initial_metadata_recv;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message = &lb_client->response_payload;
op->flags = 0;
@@ -923,8 +1042,7 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_op *op = ops;
if (lb_client->response_payload != NULL) {
/* Received data from the LB server. Look inside
- * lb_client->response_payload, for
- * a serverlist. */
+ * lb_client->response_payload, for a serverlist. */
grpc_byte_buffer_reader bbr;
grpc_byte_buffer_reader_init(&bbr, lb_client->response_payload);
gpr_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
@@ -961,7 +1079,7 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
} else {
/* unref the RR policy, eventually leading to its substitution with a
* new one constructed from the received serverlist (see
- * rr_connectivity_changed) */
+ * glb_rr_connectivity_changed) */
GRPC_LB_POLICY_UNREF(exec_ctx, lb_client->glb_policy->rr_policy,
"serverlist_received");
}
@@ -1024,8 +1142,8 @@ static void srv_status_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg,
lb_client->status, lb_client->status_details,
lb_client->status_details_capacity);
}
- /* TODO(dgq): deal with stream termination properly (fire up another one? fail
- * the original call?) */
+ /* TODO(dgq): deal with stream termination properly (fire up another one?
+ * fail the original call?) */
}
/* Code wiring the policy with the rest of the core */
diff --git a/src/core/ext/lb_policy/grpclb/load_balancer_api.c b/src/core/ext/lb_policy/grpclb/load_balancer_api.c
index f4720a1345..a8881004a0 100644
--- a/src/core/ext/lb_policy/grpclb/load_balancer_api.c
+++ b/src/core/ext/lb_policy/grpclb/load_balancer_api.c
@@ -57,6 +57,7 @@ static bool decode_serverlist(pb_istream_t *stream, const pb_field_t *field,
if (dec_arg->first_pass) { /* count how many server do we have */
grpc_grpclb_server server;
if (!pb_decode(stream, grpc_lb_v1_Server_fields, &server)) {
+ gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream));
return false;
}
dec_arg->num_servers++;
@@ -69,6 +70,7 @@ static bool decode_serverlist(pb_istream_t *stream, const pb_field_t *field,
gpr_malloc(sizeof(grpc_grpclb_server *) * dec_arg->num_servers);
}
if (!pb_decode(stream, grpc_lb_v1_Server_fields, server)) {
+ gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream));
return false;
}
dec_arg->servers[dec_arg->decoding_idx++] = server;
@@ -118,6 +120,7 @@ grpc_grpclb_initial_response *grpc_grpclb_initial_response_parse(
grpc_grpclb_response res;
memset(&res, 0, sizeof(grpc_grpclb_response));
if (!pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res)) {
+ gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream));
return NULL;
}
grpc_grpclb_initial_response *initial_res =
@@ -145,6 +148,7 @@ grpc_grpclb_serverlist *grpc_grpclb_response_parse_serverlist(
arg.first_pass = true;
status = pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res);
if (!status) {
+ gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream));
return NULL;
}
@@ -152,6 +156,7 @@ grpc_grpclb_serverlist *grpc_grpclb_response_parse_serverlist(
status =
pb_decode(&stream_at_start, grpc_lb_v1_LoadBalanceResponse_fields, &res);
if (!status) {
+ gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream));
return NULL;
}
diff --git a/src/core/ext/lb_policy/grpclb/load_balancer_api.h b/src/core/ext/lb_policy/grpclb/load_balancer_api.h
index 9726c87a37..c1e73d08ef 100644
--- a/src/core/ext/lb_policy/grpclb/load_balancer_api.h
+++ b/src/core/ext/lb_policy/grpclb/load_balancer_api.h
@@ -45,6 +45,7 @@ extern "C" {
#define GRPC_GRPCLB_SERVICE_NAME_MAX_LENGTH 128
+typedef grpc_lb_v1_Server_ip_address_t grpc_grpclb_ip_address;
typedef grpc_lb_v1_LoadBalanceRequest grpc_grpclb_request;
typedef grpc_lb_v1_InitialLoadBalanceResponse grpc_grpclb_initial_response;
typedef grpc_lb_v1_Server grpc_grpclb_server;
diff --git a/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c b/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
index 52e11c40bb..2676714175 100644
--- a/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
+++ b/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
@@ -31,10 +31,11 @@
*
*/
/* Automatically generated nanopb constant definitions */
-/* Generated by nanopb-0.3.5-dev */
+/* Generated by nanopb-0.3.7-dev */
#include "src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h"
+/* @@protoc_insertion_point(includes) */
#if PB_PROTO_HEADER_VERSION != 30
#error Regenerate this file with the current version of nanopb generator.
#endif
@@ -72,7 +73,7 @@ const pb_field_t grpc_lb_v1_LoadBalanceResponse_fields[3] = {
};
const pb_field_t grpc_lb_v1_InitialLoadBalanceResponse_fields[3] = {
- PB_FIELD( 2, STRING , OPTIONAL, STATIC , FIRST, grpc_lb_v1_InitialLoadBalanceResponse, load_balancer_delegate, load_balancer_delegate, 0),
+ PB_FIELD( 1, STRING , OPTIONAL, STATIC , FIRST, grpc_lb_v1_InitialLoadBalanceResponse, load_balancer_delegate, load_balancer_delegate, 0),
PB_FIELD( 3, MESSAGE , OPTIONAL, STATIC , OTHER, grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval, load_balancer_delegate, &grpc_lb_v1_Duration_fields),
PB_LAST_FIELD
};
@@ -84,7 +85,7 @@ const pb_field_t grpc_lb_v1_ServerList_fields[3] = {
};
const pb_field_t grpc_lb_v1_Server_fields[5] = {
- PB_FIELD( 1, STRING , OPTIONAL, STATIC , FIRST, grpc_lb_v1_Server, ip_address, ip_address, 0),
+ PB_FIELD( 1, BYTES , OPTIONAL, STATIC , FIRST, grpc_lb_v1_Server, ip_address, ip_address, 0),
PB_FIELD( 2, INT32 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, port, ip_address, 0),
PB_FIELD( 3, STRING , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, load_balance_token, port, 0),
PB_FIELD( 4, BOOL , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, drop_request, load_balance_token, 0),
@@ -116,3 +117,4 @@ PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request)
#endif
+/* @@protoc_insertion_point(eof) */
diff --git a/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h b/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
index 46fe588f72..4f1031ec7b 100644
--- a/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
+++ b/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
@@ -31,11 +31,12 @@
*
*/
/* Automatically generated nanopb header */
-/* Generated by nanopb-0.3.5-dev */
+/* Generated by nanopb-0.3.7-dev */
-#ifndef GRPC_CORE_EXT_LB_POLICY_GRPCLB_PROTO_GRPC_LB_V1_LOAD_BALANCER_PB_H
-#define GRPC_CORE_EXT_LB_POLICY_GRPCLB_PROTO_GRPC_LB_V1_LOAD_BALANCER_PB_H
+#ifndef PB_GRPC_LB_V1_LOAD_BALANCER_PB_H_INCLUDED
+#define PB_GRPC_LB_V1_LOAD_BALANCER_PB_H_INCLUDED
#include "third_party/nanopb/pb.h"
+/* @@protoc_insertion_point(includes) */
#if PB_PROTO_HEADER_VERSION != 30
#error Regenerate this file with the current version of nanopb generator.
#endif
@@ -52,6 +53,7 @@ typedef struct _grpc_lb_v1_ClientStats {
int64_t client_rpc_errors;
bool has_dropped_requests;
int64_t dropped_requests;
+/* @@protoc_insertion_point(struct:grpc_lb_v1_ClientStats) */
} grpc_lb_v1_ClientStats;
typedef struct _grpc_lb_v1_Duration {
@@ -59,22 +61,26 @@ typedef struct _grpc_lb_v1_Duration {
int64_t seconds;
bool has_nanos;
int32_t nanos;
+/* @@protoc_insertion_point(struct:grpc_lb_v1_Duration) */
} grpc_lb_v1_Duration;
typedef struct _grpc_lb_v1_InitialLoadBalanceRequest {
bool has_name;
char name[128];
+/* @@protoc_insertion_point(struct:grpc_lb_v1_InitialLoadBalanceRequest) */
} grpc_lb_v1_InitialLoadBalanceRequest;
+typedef PB_BYTES_ARRAY_T(16) grpc_lb_v1_Server_ip_address_t;
typedef struct _grpc_lb_v1_Server {
bool has_ip_address;
- char ip_address[46];
+ grpc_lb_v1_Server_ip_address_t ip_address;
bool has_port;
int32_t port;
bool has_load_balance_token;
- char load_balance_token[64];
+ char load_balance_token[65];
bool has_drop_request;
bool drop_request;
+/* @@protoc_insertion_point(struct:grpc_lb_v1_Server) */
} grpc_lb_v1_Server;
typedef struct _grpc_lb_v1_InitialLoadBalanceResponse {
@@ -82,6 +88,7 @@ typedef struct _grpc_lb_v1_InitialLoadBalanceResponse {
char load_balancer_delegate[64];
bool has_client_stats_report_interval;
grpc_lb_v1_Duration client_stats_report_interval;
+/* @@protoc_insertion_point(struct:grpc_lb_v1_InitialLoadBalanceResponse) */
} grpc_lb_v1_InitialLoadBalanceResponse;
typedef struct _grpc_lb_v1_LoadBalanceRequest {
@@ -89,12 +96,14 @@ typedef struct _grpc_lb_v1_LoadBalanceRequest {
grpc_lb_v1_InitialLoadBalanceRequest initial_request;
bool has_client_stats;
grpc_lb_v1_ClientStats client_stats;
+/* @@protoc_insertion_point(struct:grpc_lb_v1_LoadBalanceRequest) */
} grpc_lb_v1_LoadBalanceRequest;
typedef struct _grpc_lb_v1_ServerList {
pb_callback_t servers;
bool has_expiration_interval;
grpc_lb_v1_Duration expiration_interval;
+/* @@protoc_insertion_point(struct:grpc_lb_v1_ServerList) */
} grpc_lb_v1_ServerList;
typedef struct _grpc_lb_v1_LoadBalanceResponse {
@@ -102,6 +111,7 @@ typedef struct _grpc_lb_v1_LoadBalanceResponse {
grpc_lb_v1_InitialLoadBalanceResponse initial_response;
bool has_server_list;
grpc_lb_v1_ServerList server_list;
+/* @@protoc_insertion_point(struct:grpc_lb_v1_LoadBalanceResponse) */
} grpc_lb_v1_LoadBalanceResponse;
/* Default values for struct fields */
@@ -114,7 +124,7 @@ typedef struct _grpc_lb_v1_LoadBalanceResponse {
#define grpc_lb_v1_LoadBalanceResponse_init_default {false, grpc_lb_v1_InitialLoadBalanceResponse_init_default, false, grpc_lb_v1_ServerList_init_default}
#define grpc_lb_v1_InitialLoadBalanceResponse_init_default {false, "", false, grpc_lb_v1_Duration_init_default}
#define grpc_lb_v1_ServerList_init_default {{{NULL}, NULL}, false, grpc_lb_v1_Duration_init_default}
-#define grpc_lb_v1_Server_init_default {false, "", false, 0, false, "", false, 0}
+#define grpc_lb_v1_Server_init_default {false, {0, {0}}, false, 0, false, "", false, 0}
#define grpc_lb_v1_Duration_init_zero {false, 0, false, 0}
#define grpc_lb_v1_LoadBalanceRequest_init_zero {false, grpc_lb_v1_InitialLoadBalanceRequest_init_zero, false, grpc_lb_v1_ClientStats_init_zero}
#define grpc_lb_v1_InitialLoadBalanceRequest_init_zero {false, ""}
@@ -122,7 +132,7 @@ typedef struct _grpc_lb_v1_LoadBalanceResponse {
#define grpc_lb_v1_LoadBalanceResponse_init_zero {false, grpc_lb_v1_InitialLoadBalanceResponse_init_zero, false, grpc_lb_v1_ServerList_init_zero}
#define grpc_lb_v1_InitialLoadBalanceResponse_init_zero {false, "", false, grpc_lb_v1_Duration_init_zero}
#define grpc_lb_v1_ServerList_init_zero {{{NULL}, NULL}, false, grpc_lb_v1_Duration_init_zero}
-#define grpc_lb_v1_Server_init_zero {false, "", false, 0, false, "", false, 0}
+#define grpc_lb_v1_Server_init_zero {false, {0, {0}}, false, 0, false, "", false, 0}
/* Field tags (for use in manual encoding/decoding) */
#define grpc_lb_v1_ClientStats_total_requests_tag 1
@@ -135,7 +145,7 @@ typedef struct _grpc_lb_v1_LoadBalanceResponse {
#define grpc_lb_v1_Server_port_tag 2
#define grpc_lb_v1_Server_load_balance_token_tag 3
#define grpc_lb_v1_Server_drop_request_tag 4
-#define grpc_lb_v1_InitialLoadBalanceResponse_load_balancer_delegate_tag 2
+#define grpc_lb_v1_InitialLoadBalanceResponse_load_balancer_delegate_tag 1
#define grpc_lb_v1_InitialLoadBalanceResponse_client_stats_report_interval_tag 3
#define grpc_lb_v1_LoadBalanceRequest_initial_request_tag 1
#define grpc_lb_v1_LoadBalanceRequest_client_stats_tag 2
@@ -161,7 +171,8 @@ extern const pb_field_t grpc_lb_v1_Server_fields[5];
#define grpc_lb_v1_ClientStats_size 33
#define grpc_lb_v1_LoadBalanceResponse_size (98 + grpc_lb_v1_ServerList_size)
#define grpc_lb_v1_InitialLoadBalanceResponse_size 90
-#define grpc_lb_v1_Server_size 127
+/* grpc_lb_v1_ServerList_size depends on runtime parameters */
+#define grpc_lb_v1_Server_size 98
/* Message IDs (where set with "msgid" option) */
#ifdef PB_MSGID
@@ -174,5 +185,6 @@ extern const pb_field_t grpc_lb_v1_Server_fields[5];
#ifdef __cplusplus
} /* extern "C" */
#endif
+/* @@protoc_insertion_point(eof) */
#endif
diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c
index 9f3a6939e6..09df92dd99 100644
--- a/src/core/ext/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/lb_policy/pick_first/pick_first.c
@@ -199,10 +199,8 @@ static void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_polling_entity *pollent,
- grpc_metadata_batch *initial_metadata,
- uint32_t initial_metadata_flags,
- grpc_connected_subchannel **target,
+ const grpc_lb_policy_pick_args *pick_args,
+ grpc_connected_subchannel **target, void **user_data,
grpc_closure *on_complete) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
@@ -225,13 +223,13 @@ static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
if (!p->started_picking) {
start_picking(exec_ctx, p);
}
- grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent,
+ grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent,
p->base.interested_parties);
pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
- pp->pollent = pollent;
+ pp->pollent = pick_args->pollent;
pp->target = target;
- pp->initial_metadata_flags = initial_metadata_flags;
+ pp->initial_metadata_flags = pick_args->initial_metadata_flags;
pp->on_complete = on_complete;
p->pending_picks = pp;
gpr_mu_unlock(&p->mu);
@@ -462,6 +460,11 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
/* Skip balancer addresses, since we only know how to handle backends. */
if (args->addresses->addresses[i].is_balancer) continue;
+ if (args->addresses->addresses[i].user_data != NULL) {
+ gpr_log(GPR_ERROR,
+ "This LB policy doesn't support user data. It will be ignored");
+ }
+
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
sc_args.addr =
(struct sockaddr *)(&args->addresses->addresses[i].address.addr);
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c
index fc1534887a..5593a5eccf 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/lb_policy/round_robin/round_robin.c
@@ -66,6 +66,7 @@
#include "src/core/ext/client_config/lb_policy_registry.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/transport/connectivity_state.h"
+#include "src/core/lib/transport/static_metadata.h"
typedef struct round_robin_lb_policy round_robin_lb_policy;
@@ -76,15 +77,32 @@ int grpc_lb_round_robin_trace = 0;
* Once a pick is available, \a target is updated and \a on_complete called. */
typedef struct pending_pick {
struct pending_pick *next;
+
+ /* polling entity for the pick()'s async notification */
grpc_polling_entity *pollent;
+
+ /* output argument where to store the pick()ed user_data. It'll be NULL if no
+ * such data is present or there's an error (the definite test for errors is
+ * \a target being NULL). */
+ void **user_data;
+
+ /* bitmask passed to pick() and used for selective cancelling. See
+ * grpc_lb_policy_cancel_picks() */
uint32_t initial_metadata_flags;
+
+ /* output argument where to store the pick()ed connected subchannel, or NULL
+ * upon error. */
grpc_connected_subchannel **target;
+
+ /* to be invoked once the pick() has completed (regardless of success) */
grpc_closure *on_complete;
} pending_pick;
/** List of subchannels in a connectivity READY state */
typedef struct ready_list {
grpc_subchannel *subchannel;
+ /* references namesake entry in subchannel_data */
+ void *user_data;
struct ready_list *next;
struct ready_list *prev;
} ready_list;
@@ -102,12 +120,21 @@ typedef struct {
ready_list *ready_list_node;
/** last observed connectivity */
grpc_connectivity_state connectivity_state;
+ /** the subchannel's target user data */
+ void *user_data;
} subchannel_data;
struct round_robin_lb_policy {
/** base policy: must be first */
grpc_lb_policy base;
+ /** total number of addresses received at creation time */
+ size_t num_addresses;
+ /** array holding the borrowed and opaque pointers to incoming user data, one
+ * per incoming address. These individual pointers will be returned as-is in
+ * successful picks. */
+ void **user_data_pointers;
+
/** all our subchannels */
size_t num_subchannels;
subchannel_data **subchannels;
@@ -166,16 +193,19 @@ static void advance_last_picked_locked(round_robin_lb_policy *p) {
if (grpc_lb_round_robin_trace) {
gpr_log(GPR_DEBUG, "[READYLIST] ADVANCED LAST PICK. NOW AT NODE %p (SC %p)",
- p->ready_list_last_pick, p->ready_list_last_pick->subchannel);
+ (void *)p->ready_list_last_pick,
+ (void *)p->ready_list_last_pick->subchannel);
}
}
/** Prepends (relative to the root at p->ready_list) the connected subchannel \a
* csc to the list of ready subchannels. */
static ready_list *add_connected_sc_locked(round_robin_lb_policy *p,
- grpc_subchannel *sc) {
+ subchannel_data *sd) {
ready_list *new_elem = gpr_malloc(sizeof(ready_list));
- new_elem->subchannel = sc;
+ memset(new_elem, 0, sizeof(ready_list));
+ new_elem->subchannel = sd->subchannel;
+ new_elem->user_data = sd->user_data;
if (p->ready_list.prev == NULL) {
/* first element */
new_elem->next = &p->ready_list;
@@ -189,7 +219,8 @@ static ready_list *add_connected_sc_locked(round_robin_lb_policy *p,
p->ready_list.prev = new_elem;
}
if (grpc_lb_round_robin_trace) {
- gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (SC %p)", new_elem, sc);
+ gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (Conn. SC %p)",
+ (void *)new_elem, (void *)sd->subchannel);
}
return new_elem;
}
@@ -216,8 +247,8 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p,
}
if (grpc_lb_round_robin_trace) {
- gpr_log(GPR_DEBUG, "[READYLIST] REMOVED NODE %p (SC %p)", node,
- node->subchannel);
+ gpr_log(GPR_DEBUG, "[READYLIST] REMOVED NODE %p (SC %p)", (void *)node,
+ (void *)node->subchannel);
}
node->next = NULL;
@@ -229,9 +260,8 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p,
static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
- size_t i;
ready_list *elem;
- for (i = 0; i < p->num_subchannels; i++) {
+ for (size_t i = 0; i < p->num_subchannels; i++) {
subchannel_data *sd = p->subchannels[i];
GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "round_robin");
gpr_free(sd);
@@ -251,6 +281,8 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
gpr_free(elem);
elem = tmp;
}
+
+ gpr_free(p->user_data_pointers);
gpr_free(p);
}
@@ -337,7 +369,7 @@ static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) {
p->started_picking = 1;
if (grpc_lb_round_robin_trace) {
- gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%" PRIuPTR, p,
+ gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%" PRIuPTR, (void *)p,
p->num_subchannels);
}
@@ -361,38 +393,43 @@ static void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_polling_entity *pollent,
- grpc_metadata_batch *initial_metadata,
- uint32_t initial_metadata_flags,
- grpc_connected_subchannel **target,
+ const grpc_lb_policy_pick_args *pick_args,
+ grpc_connected_subchannel **target, void **user_data,
grpc_closure *on_complete) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
pending_pick *pp;
ready_list *selected;
gpr_mu_lock(&p->mu);
if ((selected = peek_next_connected_locked(p))) {
+ /* readily available, report right away */
gpr_mu_unlock(&p->mu);
*target = grpc_subchannel_get_connected_subchannel(selected->subchannel);
+
+ if (user_data != NULL) {
+ *user_data = selected->user_data;
+ }
if (grpc_lb_round_robin_trace) {
gpr_log(GPR_DEBUG,
- "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)", *target,
- selected);
+ "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)",
+ (void *)*target, (void *)selected);
}
/* only advance the last picked pointer if the selection was used */
advance_last_picked_locked(p);
return 1;
} else {
+ /* no pick currently available. Save for later in list of pending picks */
if (!p->started_picking) {
start_picking(exec_ctx, p);
}
- grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent,
+ grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent,
p->base.interested_parties);
pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
- pp->pollent = pollent;
+ pp->pollent = pick_args->pollent;
pp->target = target;
pp->on_complete = on_complete;
- pp->initial_metadata_flags = initial_metadata_flags;
+ pp->initial_metadata_flags = pick_args->initial_metadata_flags;
+ pp->user_data = user_data;
p->pending_picks = pp;
gpr_mu_unlock(&p->mu);
return 0;
@@ -421,7 +458,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
"connecting_ready");
/* add the newly connected subchannel to the list of connected ones.
* Note that it goes to the "end of the line". */
- sd->ready_list_node = add_connected_sc_locked(p, sd->subchannel);
+ sd->ready_list_node = add_connected_sc_locked(p, sd);
/* at this point we know there's at least one suitable subchannel. Go
* ahead and pick one and notify the pending suitors in
* p->pending_picks. This preemtively replicates rr_pick()'s actions. */
@@ -433,12 +470,16 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
}
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
+
*pp->target =
grpc_subchannel_get_connected_subchannel(selected->subchannel);
+ if (pp->user_data != NULL) {
+ *pp->user_data = selected->user_data;
+ }
if (grpc_lb_round_robin_trace) {
gpr_log(GPR_DEBUG,
"[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)",
- selected->subchannel, selected);
+ (void *)selected->subchannel, (void *)selected);
}
grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
p->base.interested_parties);
@@ -582,8 +623,11 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
round_robin_lb_policy *p = gpr_malloc(sizeof(*p));
memset(p, 0, sizeof(*p));
+ p->num_addresses = num_addrs;
p->subchannels = gpr_malloc(sizeof(*p->subchannels) * num_addrs);
memset(p->subchannels, 0, sizeof(*p->subchannels) * num_addrs);
+ p->user_data_pointers = gpr_malloc(sizeof(void *) * num_addrs);
+ memset(p->user_data_pointers, 0, sizeof(void *) * num_addrs);
grpc_subchannel_args sc_args;
size_t subchannel_idx = 0;
@@ -606,13 +650,18 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
sd->policy = p;
sd->index = subchannel_idx;
sd->subchannel = subchannel;
+ sd->user_data = p->user_data_pointers[i];
+ p->user_data_pointers[subchannel_idx] =
+ args->addresses->addresses[i].user_data;
++subchannel_idx;
grpc_closure_init(&sd->connectivity_changed_closure,
rr_connectivity_changed, sd);
}
}
if (subchannel_idx == 0) {
+ /* couldn't create any subchannel. Bail out */
gpr_free(p->subchannels);
+ gpr_free(p->user_data_pointers);
gpr_free(p);
return NULL;
}
diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c
index 8fc10d98a8..63682db7de 100644
--- a/src/core/ext/resolver/dns/native/dns_resolver.c
+++ b/src/core/ext/resolver/dns/native/dns_resolver.c
@@ -173,17 +173,19 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
if (r->addresses != NULL) {
grpc_lb_policy_args lb_policy_args;
memset(&lb_policy_args, 0, sizeof(lb_policy_args));
- lb_policy_args.addresses = grpc_addresses_create(r->addresses->naddrs);
+ lb_policy_args.addresses = grpc_lb_addresses_create(r->addresses->naddrs);
for (size_t i = 0; i < r->addresses->naddrs; ++i) {
- grpc_addresses_set_address(
+ grpc_lb_addresses_set_address(
lb_policy_args.addresses, i, &r->addresses->addrs[i].addr,
- r->addresses->addrs[i].len, false /* is_balancer */);
+ r->addresses->addrs[i].len, false /* is_balancer */,
+ NULL /* balancer_name */, NULL /* user_data */);
}
grpc_resolved_addresses_destroy(r->addresses);
lb_policy_args.client_channel_factory = r->client_channel_factory;
lb_policy =
grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args);
- grpc_addresses_destroy(lb_policy_args.addresses);
+ grpc_lb_addresses_destroy(lb_policy_args.addresses,
+ NULL /* user_data_destroy */);
result = grpc_resolver_result_create();
if (lb_policy != NULL) {
grpc_resolver_result_set_lb_policy(result, lb_policy);
diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
index 94d2f892eb..307e0698f9 100644
--- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
+++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
@@ -58,7 +58,7 @@ typedef struct {
char *lb_policy_name;
/** the addresses that we've 'resolved' */
- grpc_addresses *addresses;
+ grpc_lb_addresses *addresses;
/** mutex guarding the rest of the state */
gpr_mu mu;
@@ -129,6 +129,7 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
lb_policy_args.client_channel_factory = r->client_channel_factory;
grpc_lb_policy *lb_policy =
grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args);
+ gpr_free(lb_policy_args.addresses);
grpc_resolver_result_set_lb_policy(result, lb_policy);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "sockaddr");
r->published = true;
@@ -142,7 +143,7 @@ static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
sockaddr_resolver *r = (sockaddr_resolver *)gr;
gpr_mu_destroy(&r->mu);
grpc_client_channel_factory_unref(exec_ctx, r->client_channel_factory);
- grpc_addresses_destroy(r->addresses);
+ grpc_lb_addresses_destroy(r->addresses, NULL /* user_data_destroy */);
gpr_free(r->lb_policy_name);
gpr_free(r);
}
@@ -216,7 +217,7 @@ static grpc_resolver *sockaddr_create(
gpr_slice_buffer_init(&path_parts);
gpr_slice_split(path_slice, ",", &path_parts);
- r->addresses = grpc_addresses_create(path_parts.count);
+ r->addresses = grpc_lb_addresses_create(path_parts.count);
for (size_t i = 0; i < r->addresses->num_addresses; i++) {
grpc_uri ith_uri = *args->uri;
char *part_str = gpr_dump_slice(path_parts.slices[i], GPR_DUMP_ASCII);
@@ -235,7 +236,7 @@ static grpc_resolver *sockaddr_create(
gpr_slice_unref(path_slice);
if (errors_found) {
gpr_free(r->lb_policy_name);
- grpc_addresses_destroy(r->addresses);
+ grpc_lb_addresses_destroy(r->addresses, NULL /* user_data_destroy */);
gpr_free(r);
return NULL;
}
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
index 029c15014e..366690acf2 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.c
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -294,7 +294,7 @@ static void remove_from_storage(struct stream_obj *s,
/*
Cycle through ops and try to take next action. Break when either
an action with callback is taken, or no action is possible.
- This can be executed from the Cronet network thread via cronet callback
+ This can get executed from the Cronet network thread via cronet callback
or on the application supplied thread via the perform_stream_op function.
*/
static void execute_from_storage(stream_obj *s) {
@@ -329,6 +329,7 @@ static void execute_from_storage(stream_obj *s) {
static void on_failed(cronet_bidirectional_stream *stream, int net_error) {
CRONET_LOG(GPR_DEBUG, "on_failed(%p, %d)", stream, net_error);
stream_obj *s = (stream_obj *)stream->annotation;
+ gpr_mu_lock(&s->mu);
cronet_bidirectional_stream_destroy(s->cbs);
s->state.state_callback_received[OP_FAILED] = true;
s->cbs = NULL;
@@ -340,6 +341,7 @@ static void on_failed(cronet_bidirectional_stream *stream, int net_error) {
gpr_free(s->state.ws.write_buffer);
s->state.ws.write_buffer = NULL;
}
+ gpr_mu_unlock(&s->mu);
execute_from_storage(s);
}
@@ -349,6 +351,7 @@ static void on_failed(cronet_bidirectional_stream *stream, int net_error) {
static void on_canceled(cronet_bidirectional_stream *stream) {
CRONET_LOG(GPR_DEBUG, "on_canceled(%p)", stream);
stream_obj *s = (stream_obj *)stream->annotation;
+ gpr_mu_lock(&s->mu);
cronet_bidirectional_stream_destroy(s->cbs);
s->state.state_callback_received[OP_CANCELED] = true;
s->cbs = NULL;
@@ -360,6 +363,7 @@ static void on_canceled(cronet_bidirectional_stream *stream) {
gpr_free(s->state.ws.write_buffer);
s->state.ws.write_buffer = NULL;
}
+ gpr_mu_unlock(&s->mu);
execute_from_storage(s);
}
@@ -369,9 +373,11 @@ static void on_canceled(cronet_bidirectional_stream *stream) {
static void on_succeeded(cronet_bidirectional_stream *stream) {
CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream);
stream_obj *s = (stream_obj *)stream->annotation;
+ gpr_mu_lock(&s->mu);
cronet_bidirectional_stream_destroy(s->cbs);
s->state.state_callback_received[OP_SUCCEEDED] = true;
s->cbs = NULL;
+ gpr_mu_unlock(&s->mu);
execute_from_storage(s);
}
@@ -381,6 +387,7 @@ static void on_succeeded(cronet_bidirectional_stream *stream) {
static void on_request_headers_sent(cronet_bidirectional_stream *stream) {
CRONET_LOG(GPR_DEBUG, "W: on_request_headers_sent(%p)", stream);
stream_obj *s = (stream_obj *)stream->annotation;
+ gpr_mu_lock(&s->mu);
s->state.state_op_done[OP_SEND_INITIAL_METADATA] = true;
s->state.state_callback_received[OP_SEND_INITIAL_METADATA] = true;
/* Free the memory allocated for headers */
@@ -388,6 +395,7 @@ static void on_request_headers_sent(cronet_bidirectional_stream *stream) {
gpr_free(s->header_array.headers);
s->header_array.headers = NULL;
}
+ gpr_mu_unlock(&s->mu);
execute_from_storage(s);
}
@@ -401,6 +409,7 @@ static void on_response_headers_received(
CRONET_LOG(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream,
headers, negotiated_protocol);
stream_obj *s = (stream_obj *)stream->annotation;
+ gpr_mu_lock(&s->mu);
memset(&s->state.rs.initial_metadata, 0,
sizeof(s->state.rs.initial_metadata));
grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.initial_metadata);
@@ -412,6 +421,7 @@ static void on_response_headers_received(
grpc_mdstr_from_string(headers->headers[i].value)));
}
s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true;
+ gpr_mu_unlock(&s->mu);
execute_from_storage(s);
}
@@ -422,11 +432,13 @@ static void on_write_completed(cronet_bidirectional_stream *stream,
const char *data) {
stream_obj *s = (stream_obj *)stream->annotation;
CRONET_LOG(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data);
+ gpr_mu_lock(&s->mu);
if (s->state.ws.write_buffer) {
gpr_free(s->state.ws.write_buffer);
s->state.ws.write_buffer = NULL;
}
s->state.state_callback_received[OP_SEND_MESSAGE] = true;
+ gpr_mu_unlock(&s->mu);
execute_from_storage(s);
}
@@ -438,6 +450,7 @@ static void on_read_completed(cronet_bidirectional_stream *stream, char *data,
stream_obj *s = (stream_obj *)stream->annotation;
CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data,
count);
+ gpr_mu_lock(&s->mu);
s->state.state_callback_received[OP_RECV_MESSAGE] = true;
if (count > 0) {
s->state.rs.received_bytes += count;
@@ -448,11 +461,14 @@ static void on_read_completed(cronet_bidirectional_stream *stream, char *data,
cronet_bidirectional_stream_read(
s->cbs, s->state.rs.read_buffer + s->state.rs.received_bytes,
s->state.rs.remaining_bytes);
+ gpr_mu_unlock(&s->mu);
} else {
+ gpr_mu_unlock(&s->mu);
execute_from_storage(s);
}
} else {
s->state.rs.read_stream_closed = true;
+ gpr_mu_unlock(&s->mu);
execute_from_storage(s);
}
}
@@ -466,6 +482,7 @@ static void on_response_trailers_received(
CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream,
trailers);
stream_obj *s = (stream_obj *)stream->annotation;
+ gpr_mu_lock(&s->mu);
memset(&s->state.rs.trailing_metadata, 0,
sizeof(s->state.rs.trailing_metadata));
s->state.rs.trailing_metadata_valid = false;
@@ -481,6 +498,7 @@ static void on_response_trailers_received(
s->state.rs.trailing_metadata_valid = true;
}
s->state.state_callback_received[OP_RECV_TRAILING_METADATA] = true;
+ gpr_mu_unlock(&s->mu);
execute_from_storage(s);
}
@@ -757,14 +775,15 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
op_can_be_run(stream_op, stream_state, &oas->state,
OP_RECV_INITIAL_METADATA)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas);
- if (!stream_state->state_op_done[OP_CANCEL_ERROR]) {
+ if (stream_state->state_op_done[OP_CANCEL_ERROR] ||
+ stream_state->state_callback_received[OP_FAILED]) {
+ grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready,
+ GRPC_ERROR_CANCELLED, NULL);
+ } else {
grpc_chttp2_incoming_metadata_buffer_publish(
&oas->s->state.rs.initial_metadata, stream_op->recv_initial_metadata);
grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready,
GRPC_ERROR_NONE, NULL);
- } else {
- grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready,
- GRPC_ERROR_CANCELLED, NULL);
}
stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true;
result = ACTION_TAKEN_NO_CALLBACK;
@@ -772,32 +791,40 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
op_can_be_run(stream_op, stream_state, &oas->state,
OP_SEND_MESSAGE)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_MESSAGE", oas);
- gpr_slice_buffer write_slice_buffer;
- gpr_slice slice;
- gpr_slice_buffer_init(&write_slice_buffer);
- grpc_byte_stream_next(NULL, stream_op->send_message, &slice,
- stream_op->send_message->length, NULL);
- /* Check that compression flag is OFF. We don't support compression yet. */
- if (stream_op->send_message->flags != 0) {
- gpr_log(GPR_ERROR, "Compression is not supported");
- GPR_ASSERT(stream_op->send_message->flags == 0);
- }
- gpr_slice_buffer_add(&write_slice_buffer, slice);
- if (write_slice_buffer.count != 1) {
- /* Empty request not handled yet */
- gpr_log(GPR_ERROR, "Empty request is not supported");
- GPR_ASSERT(write_slice_buffer.count == 1);
- }
- if (write_slice_buffer.count > 0) {
- size_t write_buffer_size;
- create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer,
- &write_buffer_size);
- CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, %p)",
- s->cbs, stream_state->ws.write_buffer);
- stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
- cronet_bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer,
- (int)write_buffer_size, false);
- result = ACTION_TAKEN_WITH_CALLBACK;
+ if (stream_state->state_callback_received[OP_FAILED]) {
+ result = NO_ACTION_POSSIBLE;
+ CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed.");
+ } else {
+ gpr_slice_buffer write_slice_buffer;
+ gpr_slice slice;
+ gpr_slice_buffer_init(&write_slice_buffer);
+ grpc_byte_stream_next(NULL, stream_op->send_message, &slice,
+ stream_op->send_message->length, NULL);
+ /* Check that compression flag is OFF. We don't support compression yet.
+ */
+ if (stream_op->send_message->flags != 0) {
+ gpr_log(GPR_ERROR, "Compression is not supported");
+ GPR_ASSERT(stream_op->send_message->flags == 0);
+ }
+ gpr_slice_buffer_add(&write_slice_buffer, slice);
+ if (write_slice_buffer.count != 1) {
+ /* Empty request not handled yet */
+ gpr_log(GPR_ERROR, "Empty request is not supported");
+ GPR_ASSERT(write_slice_buffer.count == 1);
+ }
+ if (write_slice_buffer.count > 0) {
+ size_t write_buffer_size;
+ create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer,
+ &write_buffer_size);
+ CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, %p)",
+ s->cbs, stream_state->ws.write_buffer);
+ stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
+ cronet_bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer,
+ (int)write_buffer_size, false);
+ result = ACTION_TAKEN_WITH_CALLBACK;
+ } else {
+ result = NO_ACTION_POSSIBLE;
+ }
}
stream_state->state_op_done[OP_SEND_MESSAGE] = true;
oas->state.state_op_done[OP_SEND_MESSAGE] = true;
@@ -805,7 +832,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
op_can_be_run(stream_op, stream_state, &oas->state,
OP_RECV_MESSAGE)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas);
- if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
+ if (stream_state->state_op_done[OP_CANCEL_ERROR] ||
+ stream_state->state_callback_received[OP_FAILED]) {
+ CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed.");
grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready,
GRPC_ERROR_CANCELLED, NULL);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
@@ -861,8 +890,10 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
true; /* Indicates that at least one read request has been made */
cronet_bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
stream_state->rs.remaining_bytes);
+ result = ACTION_TAKEN_WITH_CALLBACK;
+ } else {
+ result = NO_ACTION_POSSIBLE;
}
- result = ACTION_TAKEN_WITH_CALLBACK;
} else if (stream_state->rs.remaining_bytes == 0) {
CRONET_LOG(GPR_DEBUG, "read operation complete");
gpr_slice read_data_slice =
@@ -903,11 +934,17 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
op_can_be_run(stream_op, stream_state, &oas->state,
OP_SEND_TRAILING_METADATA)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_TRAILING_METADATA", oas);
- CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, 0)", s->cbs);
- stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
- cronet_bidirectional_stream_write(s->cbs, "", 0, true);
+ if (stream_state->state_callback_received[OP_FAILED]) {
+ result = NO_ACTION_POSSIBLE;
+ CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed.");
+ } else {
+ CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, 0)",
+ s->cbs);
+ stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
+ cronet_bidirectional_stream_write(s->cbs, "", 0, true);
+ result = ACTION_TAKEN_WITH_CALLBACK;
+ }
stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true;
- result = ACTION_TAKEN_WITH_CALLBACK;
} else if (stream_op->cancel_error &&
op_can_be_run(stream_op, stream_state, &oas->state,
OP_CANCEL_ERROR)) {