aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/lb_policy/grpclb/grpclb.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/lb_policy/grpclb/grpclb.c')
-rw-r--r--src/core/ext/lb_policy/grpclb/grpclb.c140
1 files changed, 84 insertions, 56 deletions
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c
index 4262d2b9a4..97f98df03a 100644
--- a/src/core/ext/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/lb_policy/grpclb/grpclb.c
@@ -106,6 +106,7 @@
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
+#include "src/core/ext/client_channel/client_channel.h"
#include "src/core/ext/client_channel/client_channel_factory.h"
#include "src/core/ext/client_channel/lb_policy_factory.h"
#include "src/core/ext/client_channel/lb_policy_registry.h"
@@ -116,6 +117,7 @@
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/iomgr/timer.h"
+#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/support/backoff.h"
#include "src/core/lib/support/string.h"
@@ -123,10 +125,11 @@
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/transport/static_metadata.h"
-#define BACKOFF_MULTIPLIER 1.6
-#define BACKOFF_JITTER 0.2
-#define BACKOFF_MIN_SECONDS 10
-#define BACKOFF_MAX_SECONDS 60
+#define GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS 20
+#define GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS 1
+#define GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER 1.6
+#define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120
+#define GRPC_GRPCLB_RECONNECT_JITTER 0.2
int grpc_lb_glb_trace = 0;
@@ -178,8 +181,7 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
wrapped_rr_closure_arg *wc_arg = arg;
GPR_ASSERT(wc_arg->wrapped_closure != NULL);
- grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error),
- NULL);
+ grpc_closure_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error));
if (wc_arg->rr_policy != NULL) {
/* if *target is NULL, no pick has been made by the RR policy (eg, all
@@ -246,7 +248,8 @@ static void add_pending_pick(pending_pick **root,
pick_args->lb_token_mdelem_storage;
pp->wrapped_on_complete_arg.free_when_done = pp;
grpc_closure_init(&pp->wrapped_on_complete_arg.wrapper_closure,
- wrapped_rr_closure, &pp->wrapped_on_complete_arg);
+ wrapped_rr_closure, &pp->wrapped_on_complete_arg,
+ grpc_schedule_on_exec_ctx);
*root = pp;
}
@@ -266,7 +269,8 @@ static void add_pending_ping(pending_ping **root, grpc_closure *notify) {
pping->wrapped_notify_arg.free_when_done = pping;
pping->next = *root;
grpc_closure_init(&pping->wrapped_notify_arg.wrapper_closure,
- wrapped_rr_closure, &pping->wrapped_notify_arg);
+ wrapped_rr_closure, &pping->wrapped_notify_arg,
+ grpc_schedule_on_exec_ctx);
*root = pping;
}
@@ -323,6 +327,9 @@ typedef struct glb_lb_policy {
/* A response from the LB server has been received. Process it */
grpc_closure lb_on_response_received;
+ /* LB call retry timer callback. */
+ grpc_closure lb_on_call_retry;
+
grpc_call *lb_call; /* streaming call to the LB server, */
grpc_metadata_array lb_initial_metadata_recv; /* initial MD from LB server */
@@ -383,8 +390,8 @@ static bool is_server_valid(const grpc_grpclb_server *server, size_t idx,
static void *lb_token_copy(void *token) {
return token == NULL ? NULL : GRPC_MDELEM_REF(token);
}
-static void lb_token_destroy(void *token) {
- if (token != NULL) GRPC_MDELEM_UNREF(token);
+static void lb_token_destroy(grpc_exec_ctx *exec_ctx, void *token) {
+ if (token != NULL) GRPC_MDELEM_UNREF(exec_ctx, token);
}
static int lb_token_cmp(void *token1, void *token2) {
if (token1 > token2) return 1;
@@ -418,7 +425,7 @@ static void parse_server(const grpc_grpclb_server *server,
/* Returns addresses extracted from \a serverlist. */
static grpc_lb_addresses *process_serverlist_locked(
- const grpc_grpclb_serverlist *serverlist) {
+ grpc_exec_ctx *exec_ctx, const grpc_grpclb_serverlist *serverlist) {
size_t num_valid = 0;
/* first pass: count how many are valid in order to allocate the necessary
* memory in a single block */
@@ -454,8 +461,8 @@ static grpc_lb_addresses *process_serverlist_locked(
strnlen(server->load_balance_token, lb_token_max_length);
grpc_mdstr *lb_token_mdstr = grpc_mdstr_from_buffer(
(uint8_t *)server->load_balance_token, lb_token_length);
- user_data = grpc_mdelem_from_metadata_strings(GRPC_MDSTR_LB_TOKEN,
- lb_token_mdstr);
+ user_data = grpc_mdelem_from_metadata_strings(
+ exec_ctx, GRPC_MDSTR_LB_TOKEN, lb_token_mdstr);
} else {
char *uri = grpc_sockaddr_to_uri(&addr);
gpr_log(GPR_INFO,
@@ -578,7 +585,8 @@ static grpc_lb_policy *create_rr_locked(
grpc_lb_policy_args args;
memset(&args, 0, sizeof(args));
args.client_channel_factory = glb_policy->cc_factory;
- grpc_lb_addresses *addresses = process_serverlist_locked(serverlist);
+ grpc_lb_addresses *addresses =
+ process_serverlist_locked(exec_ctx, serverlist);
// Replace the LB addresses in the channel args that we pass down to
// the subchannel.
@@ -590,8 +598,8 @@ static grpc_lb_policy *create_rr_locked(
grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args);
GPR_ASSERT(rr != NULL);
- grpc_lb_addresses_destroy(addresses);
- grpc_channel_args_destroy(args.args);
+ grpc_lb_addresses_destroy(exec_ctx, addresses);
+ grpc_channel_args_destroy(exec_ctx, args.args);
return rr;
}
@@ -665,7 +673,7 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
gpr_malloc(sizeof(rr_connectivity_data));
memset(rr_connectivity, 0, sizeof(rr_connectivity_data));
grpc_closure_init(&rr_connectivity->on_change, glb_rr_connectivity_changed,
- rr_connectivity);
+ rr_connectivity, grpc_schedule_on_exec_ctx);
rr_connectivity->glb_policy = glb_policy;
rr_connectivity->state = new_rr_state;
@@ -742,12 +750,6 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args) {
- /* Get server name. */
- const grpc_arg *arg =
- grpc_channel_args_find(args->args, GRPC_ARG_SERVER_NAME);
- const char *server_name =
- arg != NULL && arg->type == GRPC_ARG_STRING ? arg->value.string : NULL;
-
/* Count the number of gRPC-LB addresses. There must be at least one.
* TODO(roth): For now, we ignore non-balancer addresses, but in the
* future, we may change the behavior such that we fall back to using
@@ -755,7 +757,8 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
* time, this should be changed to allow a list with no balancer addresses,
* since the resolver might fail to return a balancer address even when
* this is the right LB policy to use. */
- arg = grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
+ const grpc_arg *arg =
+ grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
GPR_ASSERT(arg != NULL && arg->type == GRPC_ARG_POINTER);
grpc_lb_addresses *addresses = arg->value.pointer.p;
size_t num_grpclb_addrs = 0;
@@ -767,13 +770,25 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
glb_lb_policy *glb_policy = gpr_malloc(sizeof(*glb_policy));
memset(glb_policy, 0, sizeof(*glb_policy));
+ /* Get server name. */
+ arg = grpc_channel_args_find(args->args, GRPC_ARG_SERVER_URI);
+ GPR_ASSERT(arg != NULL);
+ GPR_ASSERT(arg->type == GRPC_ARG_STRING);
+ grpc_uri *uri = grpc_uri_parse(arg->value.string, true);
+ GPR_ASSERT(uri->path[0] != '\0');
+ glb_policy->server_name =
+ gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path);
+ if (grpc_lb_glb_trace) {
+ gpr_log(GPR_INFO, "Will use '%s' as the server name for LB request.",
+ glb_policy->server_name);
+ }
+ grpc_uri_destroy(uri);
+
/* All input addresses in addresses come from a resolver that claims
* they are LB services. It's the resolver's responsibility to make sure
- * this
- * policy is only instantiated and used in that case.
+ * this policy is only instantiated and used in that case.
*
* Create a client channel over them to communicate with a LB service */
- glb_policy->server_name = gpr_strdup(server_name);
glb_policy->cc_factory = args->client_channel_factory;
glb_policy->args = grpc_channel_args_copy(args->args);
GPR_ASSERT(glb_policy->cc_factory != NULL);
@@ -817,15 +832,20 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
* We need the LB channel to return addresses with is_balancer=false
* so that it does not wind up recursively using the grpclb LB policy,
* as per the special case logic in client_channel.c.
+ *
+ * Finally, we also strip out the channel arg for the server URI,
+ * since that will be different for the LB channel than for the parent
+ * channel. (The client channel factory will re-add this arg with
+ * the right value.)
*/
- static const char *keys_to_remove[] = {GRPC_ARG_LB_POLICY_NAME,
- GRPC_ARG_LB_ADDRESSES};
+ static const char *keys_to_remove[] = {
+ GRPC_ARG_LB_POLICY_NAME, GRPC_ARG_LB_ADDRESSES, GRPC_ARG_SERVER_URI};
grpc_channel_args *new_args = grpc_channel_args_copy_and_remove(
args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove));
glb_policy->lb_channel = grpc_client_channel_factory_create_channel(
exec_ctx, glb_policy->cc_factory, target_uri_str,
GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, new_args);
- grpc_channel_args_destroy(new_args);
+ grpc_channel_args_destroy(exec_ctx, new_args);
gpr_free(target_uri_str);
for (size_t i = 0; i < num_grpclb_addrs; i++) {
@@ -851,7 +871,7 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
GPR_ASSERT(glb_policy->pending_picks == NULL);
GPR_ASSERT(glb_policy->pending_pings == NULL);
gpr_free((void *)glb_policy->server_name);
- grpc_channel_args_destroy(glb_policy->args);
+ grpc_channel_args_destroy(exec_ctx, glb_policy->args);
grpc_channel_destroy(glb_policy->lb_channel);
glb_policy->lb_channel = NULL;
grpc_connectivity_state_destroy(exec_ctx, &glb_policy->state_tracker);
@@ -894,15 +914,15 @@ static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
while (pp != NULL) {
pending_pick *next = pp->next;
*pp->target = NULL;
- grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
- GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
+ GRPC_ERROR_NONE);
pp = next;
}
while (pping != NULL) {
pending_ping *next = pping->next;
- grpc_exec_ctx_sched(exec_ctx, &pping->wrapped_notify_arg.wrapper_closure,
- GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, &pping->wrapped_notify_arg.wrapper_closure,
+ GRPC_ERROR_NONE);
pping = next;
}
}
@@ -918,9 +938,9 @@ static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pending_pick *next = pp->next;
if (pp->target == target) {
*target = NULL;
- grpc_exec_ctx_sched(
+ grpc_closure_sched(
exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
- GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL);
+ GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1));
} else {
pp->next = glb_policy->pending_picks;
glb_policy->pending_picks = pp;
@@ -943,9 +963,9 @@ static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pending_pick *next = pp->next;
if ((pp->pick_args.initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
- grpc_exec_ctx_sched(
+ grpc_closure_sched(
exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
- GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL);
+ GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1));
} else {
pp->next = glb_policy->pending_picks;
glb_policy->pending_picks = pp;
@@ -980,11 +1000,10 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_closure *on_complete) {
if (pick_args->lb_token_mdelem_storage == NULL) {
*target = NULL;
- grpc_exec_ctx_sched(
+ grpc_closure_sched(
exec_ctx, on_complete,
GRPC_ERROR_CREATE("No mdelem storage for the LB token. Load reporting "
- "won't work without it. Failing"),
- NULL);
+ "won't work without it. Failing"));
return 0;
}
@@ -1003,7 +1022,8 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
wrapped_rr_closure_arg *wc_arg = gpr_malloc(sizeof(wrapped_rr_closure_arg));
memset(wc_arg, 0, sizeof(wrapped_rr_closure_arg));
- grpc_closure_init(&wc_arg->wrapper_closure, wrapped_rr_closure, wc_arg);
+ grpc_closure_init(&wc_arg->wrapper_closure, wrapped_rr_closure, wc_arg,
+ grpc_schedule_on_exec_ctx);
wc_arg->rr_policy = glb_policy->rr_policy;
wc_arg->target = target;
wc_arg->wrapped_closure = on_complete;
@@ -1074,7 +1094,8 @@ static void lb_on_server_status_received(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
-static void lb_call_init_locked(glb_lb_policy *glb_policy) {
+static void lb_call_init_locked(grpc_exec_ctx *exec_ctx,
+ glb_lb_policy *glb_policy) {
GPR_ASSERT(glb_policy->server_name != NULL);
GPR_ASSERT(glb_policy->server_name[0] != '\0');
GPR_ASSERT(!glb_policy->shutting_down);
@@ -1083,7 +1104,7 @@ static void lb_call_init_locked(glb_lb_policy *glb_policy) {
* glb_policy->base.interested_parties, which is comprised of the polling
* entities from \a client_channel. */
glb_policy->lb_call = grpc_channel_create_pollset_set_call(
- glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS,
+ exec_ctx, glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS,
glb_policy->base.interested_parties,
"/grpc.lb.v1.LoadBalancer/BalanceLoad", glb_policy->server_name,
glb_policy->deadline, NULL);
@@ -1096,20 +1117,25 @@ static void lb_call_init_locked(glb_lb_policy *glb_policy) {
grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
glb_policy->lb_request_payload =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
- grpc_slice_unref(request_payload_slice);
+ grpc_slice_unref_internal(exec_ctx, request_payload_slice);
grpc_grpclb_request_destroy(request);
glb_policy->lb_call_status_details = NULL;
glb_policy->lb_call_status_details_capacity = 0;
grpc_closure_init(&glb_policy->lb_on_server_status_received,
- lb_on_server_status_received, glb_policy);
+ lb_on_server_status_received, glb_policy,
+ grpc_schedule_on_exec_ctx);
grpc_closure_init(&glb_policy->lb_on_response_received,
- lb_on_response_received, glb_policy);
-
- gpr_backoff_init(&glb_policy->lb_call_backoff_state, BACKOFF_MULTIPLIER,
- BACKOFF_JITTER, BACKOFF_MIN_SECONDS * 1000,
- BACKOFF_MAX_SECONDS * 1000);
+ lb_on_response_received, glb_policy,
+ grpc_schedule_on_exec_ctx);
+
+ gpr_backoff_init(&glb_policy->lb_call_backoff_state,
+ GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS,
+ GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER,
+ GRPC_GRPCLB_RECONNECT_JITTER,
+ GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS * 1000,
+ GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
}
static void lb_call_destroy_locked(glb_lb_policy *glb_policy) {
@@ -1132,7 +1158,7 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(glb_policy->lb_channel != NULL);
if (glb_policy->shutting_down) return;
- lb_call_init_locked(glb_policy);
+ lb_call_init_locked(exec_ctx, glb_policy);
if (grpc_lb_glb_trace) {
gpr_log(GPR_INFO, "Query for backends (grpclb: %p, lb_call: %p)",
@@ -1217,7 +1243,7 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg,
grpc_grpclb_response_parse_serverlist(response_slice);
if (serverlist != NULL) {
GPR_ASSERT(glb_policy->lb_call != NULL);
- grpc_slice_unref(response_slice);
+ grpc_slice_unref_internal(exec_ctx, response_slice);
if (grpc_lb_glb_trace) {
gpr_log(GPR_INFO, "Serverlist with %lu servers received",
(unsigned long)serverlist->num_servers);
@@ -1261,7 +1287,7 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg,
} else { /* serverlist == NULL */
gpr_log(GPR_ERROR, "Invalid LB response received: '%s'. Ignoring.",
grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX));
- grpc_slice_unref(response_slice);
+ grpc_slice_unref_internal(exec_ctx, response_slice);
}
if (!glb_policy->shutting_down) {
@@ -1341,8 +1367,10 @@ static void lb_on_server_status_received(grpc_exec_ctx *exec_ctx, void *arg,
}
}
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_retry_timer");
+ grpc_closure_init(&glb_policy->lb_on_call_retry, lb_call_on_retry_timer,
+ glb_policy, grpc_schedule_on_exec_ctx);
grpc_timer_init(exec_ctx, &glb_policy->lb_call_retry_timer, next_try,
- lb_call_on_retry_timer, glb_policy, now);
+ &glb_policy->lb_on_call_retry, now);
}
gpr_mu_unlock(&glb_policy->mu);
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,