aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc')
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc528
1 files changed, 264 insertions, 264 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index 03116b420c..065beb4890 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -130,17 +130,17 @@ grpc_tracer_flag grpc_lb_glb_trace = GRPC_TRACER_INITIALIZER(false, "glb");
/* add lb_token of selected subchannel (address) to the call's initial
* metadata */
-static grpc_error *initial_metadata_add_lb_token(
- grpc_exec_ctx *exec_ctx, grpc_metadata_batch *initial_metadata,
- grpc_linked_mdelem *lb_token_mdelem_storage, grpc_mdelem lb_token) {
+static grpc_error* initial_metadata_add_lb_token(
+ grpc_exec_ctx* exec_ctx, grpc_metadata_batch* initial_metadata,
+ grpc_linked_mdelem* lb_token_mdelem_storage, grpc_mdelem lb_token) {
GPR_ASSERT(lb_token_mdelem_storage != NULL);
GPR_ASSERT(!GRPC_MDISNULL(lb_token));
return grpc_metadata_batch_add_tail(exec_ctx, initial_metadata,
lb_token_mdelem_storage, lb_token);
}
-static void destroy_client_stats(void *arg) {
- grpc_grpclb_client_stats_unref((grpc_grpclb_client_stats *)arg);
+static void destroy_client_stats(void* arg) {
+ grpc_grpclb_client_stats_unref((grpc_grpclb_client_stats*)arg);
}
typedef struct wrapped_rr_closure_arg {
@@ -149,42 +149,42 @@ typedef struct wrapped_rr_closure_arg {
/* the original closure. Usually a on_complete/notify cb for pick() and ping()
* calls against the internal RR instance, respectively. */
- grpc_closure *wrapped_closure;
+ grpc_closure* wrapped_closure;
/* the pick's initial metadata, kept in order to append the LB token for the
* pick */
- grpc_metadata_batch *initial_metadata;
+ grpc_metadata_batch* initial_metadata;
/* the picked target, used to determine which LB token to add to the pick's
* initial metadata */
- grpc_connected_subchannel **target;
+ grpc_connected_subchannel** target;
/* the context to be populated for the subchannel call */
- grpc_call_context_element *context;
+ grpc_call_context_element* context;
/* Stats for client-side load reporting. Note that this holds a
* reference, which must be either passed on via context or unreffed. */
- grpc_grpclb_client_stats *client_stats;
+ grpc_grpclb_client_stats* client_stats;
/* the LB token associated with the pick */
grpc_mdelem lb_token;
/* storage for the lb token initial metadata mdelem */
- grpc_linked_mdelem *lb_token_mdelem_storage;
+ grpc_linked_mdelem* lb_token_mdelem_storage;
/* The RR instance related to the closure */
- grpc_lb_policy *rr_policy;
+ grpc_lb_policy* rr_policy;
/* heap memory to be freed upon closure execution. */
- void *free_when_done;
+ void* free_when_done;
} wrapped_rr_closure_arg;
/* The \a on_complete closure passed as part of the pick requires keeping a
* reference to its associated round robin instance. We wrap this closure in
* order to unref the round robin instance upon its invocation */
-static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- wrapped_rr_closure_arg *wc_arg = (wrapped_rr_closure_arg *)arg;
+static void wrapped_rr_closure(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ wrapped_rr_closure_arg* wc_arg = (wrapped_rr_closure_arg*)arg;
GPR_ASSERT(wc_arg->wrapped_closure != NULL);
GRPC_CLOSURE_SCHED(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error));
@@ -202,7 +202,7 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
gpr_log(GPR_ERROR,
"No LB token for connected subchannel pick %p (from RR "
"instance %p).",
- (void *)*wc_arg->target, (void *)wc_arg->rr_policy);
+ (void*)*wc_arg->target, (void*)wc_arg->rr_policy);
abort();
}
// Pass on client stats via context. Passes ownership of the reference.
@@ -213,7 +213,7 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
grpc_grpclb_client_stats_unref(wc_arg->client_stats);
}
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
- gpr_log(GPR_INFO, "Unreffing RR %p", (void *)wc_arg->rr_policy);
+ gpr_log(GPR_INFO, "Unreffing RR %p", (void*)wc_arg->rr_policy);
}
GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "wrapped_rr_closure");
}
@@ -230,25 +230,25 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
* order to correctly unref the RR policy instance upon completion of the pick.
* See \a wrapped_rr_closure for details. */
typedef struct pending_pick {
- struct pending_pick *next;
+ struct pending_pick* next;
/* original pick()'s arguments */
grpc_lb_policy_pick_args pick_args;
/* output argument where to store the pick()ed connected subchannel, or NULL
* upon error. */
- grpc_connected_subchannel **target;
+ grpc_connected_subchannel** target;
/* args for wrapped_on_complete */
wrapped_rr_closure_arg wrapped_on_complete_arg;
} pending_pick;
-static void add_pending_pick(pending_pick **root,
- const grpc_lb_policy_pick_args *pick_args,
- grpc_connected_subchannel **target,
- grpc_call_context_element *context,
- grpc_closure *on_complete) {
- pending_pick *pp = (pending_pick *)gpr_zalloc(sizeof(*pp));
+static void add_pending_pick(pending_pick** root,
+ const grpc_lb_policy_pick_args* pick_args,
+ grpc_connected_subchannel** target,
+ grpc_call_context_element* context,
+ grpc_closure* on_complete) {
+ pending_pick* pp = (pending_pick*)gpr_zalloc(sizeof(*pp));
pp->next = *root;
pp->pick_args = *pick_args;
pp->target = target;
@@ -267,14 +267,14 @@ static void add_pending_pick(pending_pick **root,
/* Same as the \a pending_pick struct but for ping operations */
typedef struct pending_ping {
- struct pending_ping *next;
+ struct pending_ping* next;
/* args for wrapped_notify */
wrapped_rr_closure_arg wrapped_notify_arg;
} pending_ping;
-static void add_pending_ping(pending_ping **root, grpc_closure *notify) {
- pending_ping *pping = (pending_ping *)gpr_zalloc(sizeof(*pping));
+static void add_pending_ping(pending_ping** root, grpc_closure* notify) {
+ pending_ping* pping = (pending_ping*)gpr_zalloc(sizeof(*pping));
pping->wrapped_notify_arg.wrapped_closure = notify;
pping->wrapped_notify_arg.free_when_done = pping;
pping->next = *root;
@@ -294,9 +294,9 @@ typedef struct glb_lb_policy {
grpc_lb_policy base;
/** who the client is trying to communicate with */
- const char *server_name;
- grpc_client_channel_factory *cc_factory;
- grpc_channel_args *args;
+ const char* server_name;
+ grpc_client_channel_factory* cc_factory;
+ grpc_channel_args* args;
/** timeout in milliseconds for the LB call. 0 means no deadline. */
int lb_call_timeout_ms;
@@ -306,13 +306,13 @@ typedef struct glb_lb_policy {
int lb_fallback_timeout_ms;
/** for communicating with the LB server */
- grpc_channel *lb_channel;
+ grpc_channel* lb_channel;
/** response generator to inject address updates into \a lb_channel */
- grpc_fake_resolver_response_generator *response_generator;
+ grpc_fake_resolver_response_generator* response_generator;
/** the RR policy to use of the backend servers returned by the LB server */
- grpc_lb_policy *rr_policy;
+ grpc_lb_policy* rr_policy;
bool started_picking;
@@ -324,7 +324,7 @@ typedef struct glb_lb_policy {
/** stores the deserialized response from the LB. May be NULL until one such
* response has arrived. */
- grpc_grpclb_serverlist *serverlist;
+ grpc_grpclb_serverlist* serverlist;
/** Index into serverlist for next pick.
* If the server at this index is a drop, we return a drop.
@@ -332,13 +332,13 @@ typedef struct glb_lb_policy {
size_t serverlist_index;
/** stores the backend addresses from the resolver */
- grpc_lb_addresses *fallback_backend_addresses;
+ grpc_lb_addresses* fallback_backend_addresses;
/** list of picks that are waiting on RR's policy connectivity */
- pending_pick *pending_picks;
+ pending_pick* pending_picks;
/** list of pings that are waiting on RR's policy connectivity */
- pending_ping *pending_pings;
+ pending_ping* pending_pings;
bool shutting_down;
@@ -373,7 +373,7 @@ typedef struct glb_lb_policy {
/* LB fallback timer callback. */
grpc_closure lb_on_fallback;
- grpc_call *lb_call; /* streaming call to the LB server, */
+ grpc_call* lb_call; /* streaming call to the LB server, */
grpc_metadata_array lb_initial_metadata_recv; /* initial MD from LB server */
grpc_metadata_array
@@ -381,10 +381,10 @@ typedef struct glb_lb_policy {
/* what's being sent to the LB server. Note that its value may vary if the LB
* server indicates a redirect. */
- grpc_byte_buffer *lb_request_payload;
+ grpc_byte_buffer* lb_request_payload;
/* response the LB server, if any. Processed in lb_on_response_received() */
- grpc_byte_buffer *lb_response_payload;
+ grpc_byte_buffer* lb_response_payload;
/* call status code and details, set in lb_on_server_status_received() */
grpc_status_code lb_call_status;
@@ -403,7 +403,7 @@ typedef struct glb_lb_policy {
/* Stats for client-side load reporting. Should be unreffed and
* recreated whenever lb_call is replaced. */
- grpc_grpclb_client_stats *client_stats;
+ grpc_grpclb_client_stats* client_stats;
/* Interval and timer for next client load report. */
grpc_millis client_stats_report_interval;
grpc_timer client_load_report_timer;
@@ -413,20 +413,20 @@ typedef struct glb_lb_policy {
* completion of sending the load report. */
grpc_closure client_load_report_closure;
/* Client load report message payload. */
- grpc_byte_buffer *client_load_report_payload;
+ grpc_byte_buffer* client_load_report_payload;
} glb_lb_policy;
/* Keeps track and reacts to changes in connectivity of the RR instance */
struct rr_connectivity_data {
grpc_closure on_change;
grpc_connectivity_state state;
- glb_lb_policy *glb_policy;
+ glb_lb_policy* glb_policy;
};
-static bool is_server_valid(const grpc_grpclb_server *server, size_t idx,
+static bool is_server_valid(const grpc_grpclb_server* server, size_t idx,
bool log) {
if (server->drop) return false;
- const grpc_grpclb_ip_address *ip = &server->ip_address;
+ const grpc_grpclb_ip_address* ip = &server->ip_address;
if (server->port >> 16 != 0) {
if (log) {
gpr_log(GPR_ERROR,
@@ -448,17 +448,17 @@ static bool is_server_valid(const grpc_grpclb_server *server, size_t idx,
}
/* vtable for LB tokens in grpc_lb_addresses. */
-static void *lb_token_copy(void *token) {
+static void* lb_token_copy(void* token) {
return token == NULL
? NULL
- : (void *)GRPC_MDELEM_REF(grpc_mdelem{(uintptr_t)token}).payload;
+ : (void*)GRPC_MDELEM_REF(grpc_mdelem{(uintptr_t)token}).payload;
}
-static void lb_token_destroy(grpc_exec_ctx *exec_ctx, void *token) {
+static void lb_token_destroy(grpc_exec_ctx* exec_ctx, void* token) {
if (token != NULL) {
GRPC_MDELEM_UNREF(exec_ctx, grpc_mdelem{(uintptr_t)token});
}
}
-static int lb_token_cmp(void *token1, void *token2) {
+static int lb_token_cmp(void* token1, void* token2) {
if (token1 > token2) return 1;
if (token1 < token2) return -1;
return 0;
@@ -466,23 +466,23 @@ static int lb_token_cmp(void *token1, void *token2) {
static const grpc_lb_user_data_vtable lb_token_vtable = {
lb_token_copy, lb_token_destroy, lb_token_cmp};
-static void parse_server(const grpc_grpclb_server *server,
- grpc_resolved_address *addr) {
+static void parse_server(const grpc_grpclb_server* server,
+ grpc_resolved_address* addr) {
memset(addr, 0, sizeof(*addr));
if (server->drop) return;
const uint16_t netorder_port = htons((uint16_t)server->port);
/* the addresses are given in binary format (a in(6)_addr struct) in
* server->ip_address.bytes. */
- const grpc_grpclb_ip_address *ip = &server->ip_address;
+ const grpc_grpclb_ip_address* ip = &server->ip_address;
if (ip->size == 4) {
addr->len = sizeof(struct sockaddr_in);
- struct sockaddr_in *addr4 = (struct sockaddr_in *)&addr->addr;
+ struct sockaddr_in* addr4 = (struct sockaddr_in*)&addr->addr;
addr4->sin_family = AF_INET;
memcpy(&addr4->sin_addr, ip->bytes, ip->size);
addr4->sin_port = netorder_port;
} else if (ip->size == 16) {
addr->len = sizeof(struct sockaddr_in6);
- struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)&addr->addr;
+ struct sockaddr_in6* addr6 = (struct sockaddr_in6*)&addr->addr;
addr6->sin6_family = AF_INET6;
memcpy(&addr6->sin6_addr, ip->bytes, ip->size);
addr6->sin6_port = netorder_port;
@@ -490,15 +490,15 @@ static void parse_server(const grpc_grpclb_server *server,
}
/* Returns addresses extracted from \a serverlist. */
-static grpc_lb_addresses *process_serverlist_locked(
- grpc_exec_ctx *exec_ctx, const grpc_grpclb_serverlist *serverlist) {
+static grpc_lb_addresses* process_serverlist_locked(
+ grpc_exec_ctx* exec_ctx, const grpc_grpclb_serverlist* serverlist) {
size_t num_valid = 0;
/* first pass: count how many are valid in order to allocate the necessary
* memory in a single block */
for (size_t i = 0; i < serverlist->num_servers; ++i) {
if (is_server_valid(serverlist->servers[i], i, true)) ++num_valid;
}
- grpc_lb_addresses *lb_addresses =
+ grpc_lb_addresses* lb_addresses =
grpc_lb_addresses_create(num_valid, &lb_token_vtable);
/* second pass: actually populate the addresses and LB tokens (aka user data
* to the outside world) to be read by the RR policy during its creation.
@@ -507,14 +507,14 @@ static grpc_lb_addresses *process_serverlist_locked(
* incurr in an allocation due to the arbitrary number of server */
size_t addr_idx = 0;
for (size_t sl_idx = 0; sl_idx < serverlist->num_servers; ++sl_idx) {
- const grpc_grpclb_server *server = serverlist->servers[sl_idx];
+ const grpc_grpclb_server* server = serverlist->servers[sl_idx];
if (!is_server_valid(serverlist->servers[sl_idx], sl_idx, false)) continue;
GPR_ASSERT(addr_idx < num_valid);
/* address processing */
grpc_resolved_address addr;
parse_server(server, &addr);
/* lb token processing */
- void *user_data;
+ void* user_data;
if (server->has_load_balance_token) {
const size_t lb_token_max_length =
GPR_ARRAY_SIZE(server->load_balance_token);
@@ -522,17 +522,17 @@ static grpc_lb_addresses *process_serverlist_locked(
strnlen(server->load_balance_token, lb_token_max_length);
grpc_slice lb_token_mdstr = grpc_slice_from_copied_buffer(
server->load_balance_token, lb_token_length);
- user_data = (void *)grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_LB_TOKEN,
- lb_token_mdstr)
+ user_data = (void*)grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_LB_TOKEN,
+ lb_token_mdstr)
.payload;
} else {
- char *uri = grpc_sockaddr_to_uri(&addr);
+ char* uri = grpc_sockaddr_to_uri(&addr);
gpr_log(GPR_INFO,
"Missing LB token for backend address '%s'. The empty token will "
"be used instead",
uri);
gpr_free(uri);
- user_data = (void *)GRPC_MDELEM_LB_TOKEN_EMPTY.payload;
+ user_data = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload;
}
grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr, addr.len,
@@ -545,8 +545,8 @@ static grpc_lb_addresses *process_serverlist_locked(
}
/* Returns the backend addresses extracted from the given addresses */
-static grpc_lb_addresses *extract_backend_addresses_locked(
- grpc_exec_ctx *exec_ctx, const grpc_lb_addresses *addresses) {
+static grpc_lb_addresses* extract_backend_addresses_locked(
+ grpc_exec_ctx* exec_ctx, const grpc_lb_addresses* addresses) {
/* first pass: count the number of backend addresses */
size_t num_backends = 0;
for (size_t i = 0; i < addresses->num_addresses; ++i) {
@@ -555,24 +555,24 @@ static grpc_lb_addresses *extract_backend_addresses_locked(
}
}
/* second pass: actually populate the addresses and (empty) LB tokens */
- grpc_lb_addresses *backend_addresses =
+ grpc_lb_addresses* backend_addresses =
grpc_lb_addresses_create(num_backends, &lb_token_vtable);
size_t num_copied = 0;
for (size_t i = 0; i < addresses->num_addresses; ++i) {
if (addresses->addresses[i].is_balancer) continue;
- const grpc_resolved_address *addr = &addresses->addresses[i].address;
+ const grpc_resolved_address* addr = &addresses->addresses[i].address;
grpc_lb_addresses_set_address(backend_addresses, num_copied, &addr->addr,
addr->len, false /* is_balancer */,
NULL /* balancer_name */,
- (void *)GRPC_MDELEM_LB_TOKEN_EMPTY.payload);
+ (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload);
++num_copied;
}
return backend_addresses;
}
static void update_lb_connectivity_status_locked(
- grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
- grpc_connectivity_state rr_state, grpc_error *rr_state_error) {
+ grpc_exec_ctx* exec_ctx, glb_lb_policy* glb_policy,
+ grpc_connectivity_state rr_state, grpc_error* rr_state_error) {
const grpc_connectivity_state curr_glb_state =
grpc_connectivity_state_check(&glb_policy->state_tracker);
@@ -620,7 +620,7 @@ static void update_lb_connectivity_status_locked(
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(
GPR_INFO, "Setting grpclb's state to %s from new RR policy %p state.",
- grpc_connectivity_state_name(rr_state), (void *)glb_policy->rr_policy);
+ grpc_connectivity_state_name(rr_state), (void*)glb_policy->rr_policy);
}
grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, rr_state,
rr_state_error,
@@ -633,13 +633,13 @@ static void update_lb_connectivity_status_locked(
* If \a force_async is true, then we will manually schedule the
* completion callback even if the pick is available immediately. */
static bool pick_from_internal_rr_locked(
- grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
- const grpc_lb_policy_pick_args *pick_args, bool force_async,
- grpc_connected_subchannel **target, wrapped_rr_closure_arg *wc_arg) {
+ grpc_exec_ctx* exec_ctx, glb_lb_policy* glb_policy,
+ const grpc_lb_policy_pick_args* pick_args, bool force_async,
+ grpc_connected_subchannel** target, wrapped_rr_closure_arg* wc_arg) {
// Check for drops if we are not using fallback backend addresses.
if (glb_policy->serverlist != NULL) {
// Look at the index into the serverlist to see if we should drop this call.
- grpc_grpclb_server *server =
+ grpc_grpclb_server* server =
glb_policy->serverlist->servers[glb_policy->serverlist_index++];
if (glb_policy->serverlist_index == glb_policy->serverlist->num_servers) {
glb_policy->serverlist_index = 0; // Wrap-around.
@@ -672,7 +672,7 @@ static bool pick_from_internal_rr_locked(
// Pick via the RR policy.
const bool pick_done = grpc_lb_policy_pick_locked(
exec_ctx, wc_arg->rr_policy, pick_args, target, wc_arg->context,
- (void **)&wc_arg->lb_token, &wc_arg->wrapper_closure);
+ (void**)&wc_arg->lb_token, &wc_arg->wrapper_closure);
if (pick_done) {
/* synchronous grpc_lb_policy_pick call. Unref the RR policy. */
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
@@ -703,9 +703,9 @@ static bool pick_from_internal_rr_locked(
return pick_done;
}
-static grpc_lb_policy_args *lb_policy_args_create(grpc_exec_ctx *exec_ctx,
- glb_lb_policy *glb_policy) {
- grpc_lb_addresses *addresses;
+static grpc_lb_policy_args* lb_policy_args_create(grpc_exec_ctx* exec_ctx,
+ glb_lb_policy* glb_policy) {
+ grpc_lb_addresses* addresses;
if (glb_policy->serverlist != NULL) {
GPR_ASSERT(glb_policy->serverlist->num_servers > 0);
addresses = process_serverlist_locked(exec_ctx, glb_policy->serverlist);
@@ -718,12 +718,12 @@ static grpc_lb_policy_args *lb_policy_args_create(grpc_exec_ctx *exec_ctx,
addresses = grpc_lb_addresses_copy(glb_policy->fallback_backend_addresses);
}
GPR_ASSERT(addresses != NULL);
- grpc_lb_policy_args *args = (grpc_lb_policy_args *)gpr_zalloc(sizeof(*args));
+ grpc_lb_policy_args* args = (grpc_lb_policy_args*)gpr_zalloc(sizeof(*args));
args->client_channel_factory = glb_policy->cc_factory;
args->combiner = glb_policy->base.combiner;
// Replace the LB addresses in the channel args that we pass down to
// the subchannel.
- static const char *keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES};
+ static const char* keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES};
const grpc_arg arg = grpc_lb_addresses_create_channel_arg(addresses);
args->args = grpc_channel_args_copy_and_add_and_remove(
glb_policy->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &arg,
@@ -732,19 +732,19 @@ static grpc_lb_policy_args *lb_policy_args_create(grpc_exec_ctx *exec_ctx,
return args;
}
-static void lb_policy_args_destroy(grpc_exec_ctx *exec_ctx,
- grpc_lb_policy_args *args) {
+static void lb_policy_args_destroy(grpc_exec_ctx* exec_ctx,
+ grpc_lb_policy_args* args) {
grpc_channel_args_destroy(exec_ctx, args->args);
gpr_free(args);
}
-static void glb_rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx,
- void *arg, grpc_error *error);
-static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
- grpc_lb_policy_args *args) {
+static void glb_rr_connectivity_changed_locked(grpc_exec_ctx* exec_ctx,
+ void* arg, grpc_error* error);
+static void create_rr_locked(grpc_exec_ctx* exec_ctx, glb_lb_policy* glb_policy,
+ grpc_lb_policy_args* args) {
GPR_ASSERT(glb_policy->rr_policy == NULL);
- grpc_lb_policy *new_rr_policy =
+ grpc_lb_policy* new_rr_policy =
grpc_lb_policy_create(exec_ctx, "round_robin", args);
if (new_rr_policy == NULL) {
gpr_log(GPR_ERROR,
@@ -753,11 +753,11 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
"to be used. Future updates from the LB will attempt to create new "
"instances.",
(unsigned long)glb_policy->serverlist->num_servers,
- (void *)glb_policy->rr_policy);
+ (void*)glb_policy->rr_policy);
return;
}
glb_policy->rr_policy = new_rr_policy;
- grpc_error *rr_state_error = NULL;
+ grpc_error* rr_state_error = NULL;
const grpc_connectivity_state rr_state =
grpc_lb_policy_check_connectivity_locked(exec_ctx, glb_policy->rr_policy,
&rr_state_error);
@@ -773,8 +773,8 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
/* Allocate the data for the tracking of the new RR policy's connectivity.
* It'll be deallocated in glb_rr_connectivity_changed() */
- rr_connectivity_data *rr_connectivity =
- (rr_connectivity_data *)gpr_zalloc(sizeof(rr_connectivity_data));
+ rr_connectivity_data* rr_connectivity =
+ (rr_connectivity_data*)gpr_zalloc(sizeof(rr_connectivity_data));
GRPC_CLOSURE_INIT(&rr_connectivity->on_change,
glb_rr_connectivity_changed_locked, rr_connectivity,
grpc_combiner_scheduler(glb_policy->base.combiner));
@@ -789,7 +789,7 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
grpc_lb_policy_exit_idle_locked(exec_ctx, glb_policy->rr_policy);
/* Update picks and pings in wait */
- pending_pick *pp;
+ pending_pick* pp;
while ((pp = glb_policy->pending_picks)) {
glb_policy->pending_picks = pp->next;
GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_pick");
@@ -798,14 +798,14 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
grpc_grpclb_client_stats_ref(glb_policy->client_stats);
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "Pending pick about to (async) PICK from %p",
- (void *)glb_policy->rr_policy);
+ (void*)glb_policy->rr_policy);
}
pick_from_internal_rr_locked(exec_ctx, glb_policy, &pp->pick_args,
true /* force_async */, pp->target,
&pp->wrapped_on_complete_arg);
}
- pending_ping *pping;
+ pending_ping* pping;
while ((pping = glb_policy->pending_pings)) {
glb_policy->pending_pings = pping->next;
GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping");
@@ -820,31 +820,31 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
}
/* glb_policy->rr_policy may be NULL (initial handover) */
-static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
- glb_lb_policy *glb_policy) {
+static void rr_handover_locked(grpc_exec_ctx* exec_ctx,
+ glb_lb_policy* glb_policy) {
if (glb_policy->shutting_down) return;
- grpc_lb_policy_args *args = lb_policy_args_create(exec_ctx, glb_policy);
+ grpc_lb_policy_args* args = lb_policy_args_create(exec_ctx, glb_policy);
GPR_ASSERT(args != NULL);
if (glb_policy->rr_policy != NULL) {
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_DEBUG, "Updating Round Robin policy (%p)",
- (void *)glb_policy->rr_policy);
+ (void*)glb_policy->rr_policy);
}
grpc_lb_policy_update_locked(exec_ctx, glb_policy->rr_policy, args);
} else {
create_rr_locked(exec_ctx, glb_policy, args);
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_DEBUG, "Created new Round Robin policy (%p)",
- (void *)glb_policy->rr_policy);
+ (void*)glb_policy->rr_policy);
}
}
lb_policy_args_destroy(exec_ctx, args);
}
-static void glb_rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx,
- void *arg, grpc_error *error) {
- rr_connectivity_data *rr_connectivity = (rr_connectivity_data *)arg;
- glb_lb_policy *glb_policy = rr_connectivity->glb_policy;
+static void glb_rr_connectivity_changed_locked(grpc_exec_ctx* exec_ctx,
+ void* arg, grpc_error* error) {
+ rr_connectivity_data* rr_connectivity = (rr_connectivity_data*)arg;
+ glb_lb_policy* glb_policy = rr_connectivity->glb_policy;
if (glb_policy->shutting_down) {
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
"glb_rr_connectivity_cb");
@@ -872,22 +872,22 @@ static void glb_rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx,
&rr_connectivity->on_change);
}
-static void destroy_balancer_name(grpc_exec_ctx *exec_ctx,
- void *balancer_name) {
+static void destroy_balancer_name(grpc_exec_ctx* exec_ctx,
+ void* balancer_name) {
gpr_free(balancer_name);
}
static grpc_slice_hash_table_entry targets_info_entry_create(
- const char *address, const char *balancer_name) {
+ const char* address, const char* balancer_name) {
grpc_slice_hash_table_entry entry;
entry.key = grpc_slice_from_copied_string(address);
entry.value = gpr_strdup(balancer_name);
return entry;
}
-static int balancer_name_cmp_fn(void *a, void *b) {
- const char *a_str = (const char *)a;
- const char *b_str = (const char *)b;
+static int balancer_name_cmp_fn(void* a, void* b) {
+ const char* a_str = (const char*)a;
+ const char* b_str = (const char*)b;
return strcmp(a_str, b_str);
}
@@ -899,10 +899,10 @@ static int balancer_name_cmp_fn(void *a, void *b) {
* - \a response_generator: in order to propagate updates from the resolver
* above the grpclb policy.
* - \a args: other args inherited from the grpclb policy. */
-static grpc_channel_args *build_lb_channel_args(
- grpc_exec_ctx *exec_ctx, const grpc_lb_addresses *addresses,
- grpc_fake_resolver_response_generator *response_generator,
- const grpc_channel_args *args) {
+static grpc_channel_args* build_lb_channel_args(
+ grpc_exec_ctx* exec_ctx, const grpc_lb_addresses* addresses,
+ grpc_fake_resolver_response_generator* response_generator,
+ const grpc_channel_args* args) {
size_t num_grpclb_addrs = 0;
for (size_t i = 0; i < addresses->num_addresses; ++i) {
if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
@@ -911,11 +911,11 @@ static grpc_channel_args *build_lb_channel_args(
* It's the resolver's responsibility to make sure this policy is only
* instantiated and used in that case. Otherwise, something has gone wrong. */
GPR_ASSERT(num_grpclb_addrs > 0);
- grpc_lb_addresses *lb_addresses =
+ grpc_lb_addresses* lb_addresses =
grpc_lb_addresses_create(num_grpclb_addrs, NULL);
- grpc_slice_hash_table_entry *targets_info_entries =
- (grpc_slice_hash_table_entry *)gpr_zalloc(sizeof(*targets_info_entries) *
- num_grpclb_addrs);
+ grpc_slice_hash_table_entry* targets_info_entries =
+ (grpc_slice_hash_table_entry*)gpr_zalloc(sizeof(*targets_info_entries) *
+ num_grpclb_addrs);
size_t lb_addresses_idx = 0;
for (size_t i = 0; i < addresses->num_addresses; ++i) {
@@ -924,7 +924,7 @@ static grpc_channel_args *build_lb_channel_args(
gpr_log(GPR_ERROR,
"This LB policy doesn't support user data. It will be ignored");
}
- char *addr_str;
+ char* addr_str;
GPR_ASSERT(grpc_sockaddr_to_string(
&addr_str, &addresses->addresses[i].address, true) > 0);
targets_info_entries[lb_addresses_idx] = targets_info_entry_create(
@@ -937,19 +937,19 @@ static grpc_channel_args *build_lb_channel_args(
addresses->addresses[i].balancer_name, NULL /* user data */);
}
GPR_ASSERT(num_grpclb_addrs == lb_addresses_idx);
- grpc_slice_hash_table *targets_info =
+ grpc_slice_hash_table* targets_info =
grpc_slice_hash_table_create(num_grpclb_addrs, targets_info_entries,
destroy_balancer_name, balancer_name_cmp_fn);
gpr_free(targets_info_entries);
- grpc_channel_args *lb_channel_args =
+ grpc_channel_args* lb_channel_args =
grpc_lb_policy_grpclb_build_lb_channel_args(exec_ctx, targets_info,
response_generator, args);
grpc_arg lb_channel_addresses_arg =
grpc_lb_addresses_create_channel_arg(lb_addresses);
- grpc_channel_args *result = grpc_channel_args_copy_and_add(
+ grpc_channel_args* result = grpc_channel_args_copy_and_add(
lb_channel_args, &lb_channel_addresses_arg, 1);
grpc_slice_hash_table_unref(exec_ctx, targets_info);
grpc_channel_args_destroy(exec_ctx, lb_channel_args);
@@ -957,11 +957,11 @@ static grpc_channel_args *build_lb_channel_args(
return result;
}
-static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
- glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
+static void glb_destroy(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
+ glb_lb_policy* glb_policy = (glb_lb_policy*)pol;
GPR_ASSERT(glb_policy->pending_picks == NULL);
GPR_ASSERT(glb_policy->pending_pings == NULL);
- gpr_free((void *)glb_policy->server_name);
+ gpr_free((void*)glb_policy->server_name);
grpc_channel_args_destroy(exec_ctx, glb_policy->args);
if (glb_policy->client_stats != NULL) {
grpc_grpclb_client_stats_unref(glb_policy->client_stats);
@@ -978,14 +978,14 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
gpr_free(glb_policy);
}
-static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
- glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
+static void glb_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
+ glb_lb_policy* glb_policy = (glb_lb_policy*)pol;
glb_policy->shutting_down = true;
/* We need a copy of the lb_call pointer because we can't cancell the call
* while holding glb_policy->mu: lb_on_server_status_received, invoked due to
* the cancel, needs to acquire that same lock */
- grpc_call *lb_call = glb_policy->lb_call;
+ grpc_call* lb_call = glb_policy->lb_call;
/* glb_policy->lb_call and this local lb_call must be consistent at this point
* because glb_policy->lb_call is only assigned in lb_call_init_locked as part
@@ -1004,9 +1004,9 @@ static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
glb_policy->fallback_timer_active = false;
}
- pending_pick *pp = glb_policy->pending_picks;
+ pending_pick* pp = glb_policy->pending_picks;
glb_policy->pending_picks = NULL;
- pending_ping *pping = glb_policy->pending_pings;
+ pending_ping* pping = glb_policy->pending_pings;
glb_policy->pending_pings = NULL;
if (glb_policy->rr_policy != NULL) {
GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown");
@@ -1024,7 +1024,7 @@ static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "glb_shutdown");
while (pp != NULL) {
- pending_pick *next = pp->next;
+ pending_pick* next = pp->next;
*pp->target = NULL;
GRPC_CLOSURE_SCHED(
exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
@@ -1034,7 +1034,7 @@ static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
while (pping != NULL) {
- pending_ping *next = pping->next;
+ pending_ping* next = pping->next;
GRPC_CLOSURE_SCHED(
exec_ctx, &pping->wrapped_notify_arg.wrapper_closure,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"));
@@ -1053,14 +1053,14 @@ static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
// - Otherwise, without an RR instance, picks stay pending at this policy's
// level (grpclb), inside the glb_policy->pending_picks list. To cancel these,
// we invoke the completion closure and set *target to NULL right here.
-static void glb_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_connected_subchannel **target,
- grpc_error *error) {
- glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
- pending_pick *pp = glb_policy->pending_picks;
+static void glb_cancel_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
+ grpc_connected_subchannel** target,
+ grpc_error* error) {
+ glb_lb_policy* glb_policy = (glb_lb_policy*)pol;
+ pending_pick* pp = glb_policy->pending_picks;
glb_policy->pending_picks = NULL;
while (pp != NULL) {
- pending_pick *next = pp->next;
+ pending_pick* next = pp->next;
if (pp->target == target) {
*target = NULL;
GRPC_CLOSURE_SCHED(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
@@ -1089,16 +1089,16 @@ static void glb_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
// - Otherwise, without an RR instance, picks stay pending at this policy's
// level (grpclb), inside the glb_policy->pending_picks list. To cancel these,
// we invoke the completion closure and set *target to NULL right here.
-static void glb_cancel_picks_locked(grpc_exec_ctx *exec_ctx,
- grpc_lb_policy *pol,
+static void glb_cancel_picks_locked(grpc_exec_ctx* exec_ctx,
+ grpc_lb_policy* pol,
uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq,
- grpc_error *error) {
- glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
- pending_pick *pp = glb_policy->pending_picks;
+ grpc_error* error) {
+ glb_lb_policy* glb_policy = (glb_lb_policy*)pol;
+ pending_pick* pp = glb_policy->pending_picks;
glb_policy->pending_picks = NULL;
while (pp != NULL) {
- pending_pick *next = pp->next;
+ pending_pick* next = pp->next;
if ((pp->pick_args.initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
GRPC_CLOSURE_SCHED(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
@@ -1118,12 +1118,12 @@ static void glb_cancel_picks_locked(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_UNREF(error);
}
-static void lb_on_fallback_timer_locked(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error);
-static void query_for_backends_locked(grpc_exec_ctx *exec_ctx,
- glb_lb_policy *glb_policy);
-static void start_picking_locked(grpc_exec_ctx *exec_ctx,
- glb_lb_policy *glb_policy) {
+static void lb_on_fallback_timer_locked(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error);
+static void query_for_backends_locked(grpc_exec_ctx* exec_ctx,
+ glb_lb_policy* glb_policy);
+static void start_picking_locked(grpc_exec_ctx* exec_ctx,
+ glb_lb_policy* glb_policy) {
/* start a timer to fall back */
if (glb_policy->lb_fallback_timeout_ms > 0 &&
glb_policy->serverlist == NULL && !glb_policy->fallback_timer_active) {
@@ -1143,18 +1143,18 @@ static void start_picking_locked(grpc_exec_ctx *exec_ctx,
query_for_backends_locked(exec_ctx, glb_policy);
}
-static void glb_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
- glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
+static void glb_exit_idle_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
+ glb_lb_policy* glb_policy = (glb_lb_policy*)pol;
if (!glb_policy->started_picking) {
start_picking_locked(exec_ctx, glb_policy);
}
}
-static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- const grpc_lb_policy_pick_args *pick_args,
- grpc_connected_subchannel **target,
- grpc_call_context_element *context, void **user_data,
- grpc_closure *on_complete) {
+static int glb_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
+ const grpc_lb_policy_pick_args* pick_args,
+ grpc_connected_subchannel** target,
+ grpc_call_context_element* context, void** user_data,
+ grpc_closure* on_complete) {
if (pick_args->lb_token_mdelem_storage == NULL) {
*target = NULL;
GRPC_CLOSURE_SCHED(exec_ctx, on_complete,
@@ -1164,18 +1164,18 @@ static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
return 0;
}
- glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
+ glb_lb_policy* glb_policy = (glb_lb_policy*)pol;
bool pick_done;
if (glb_policy->rr_policy != NULL) {
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
- gpr_log(GPR_INFO, "grpclb %p about to PICK from RR %p",
- (void *)glb_policy, (void *)glb_policy->rr_policy);
+ gpr_log(GPR_INFO, "grpclb %p about to PICK from RR %p", (void*)glb_policy,
+ (void*)glb_policy->rr_policy);
}
GRPC_LB_POLICY_REF(glb_policy->rr_policy, "glb_pick");
- wrapped_rr_closure_arg *wc_arg =
- (wrapped_rr_closure_arg *)gpr_zalloc(sizeof(wrapped_rr_closure_arg));
+ wrapped_rr_closure_arg* wc_arg =
+ (wrapped_rr_closure_arg*)gpr_zalloc(sizeof(wrapped_rr_closure_arg));
GRPC_CLOSURE_INIT(&wc_arg->wrapper_closure, wrapped_rr_closure, wc_arg,
grpc_schedule_on_exec_ctx);
@@ -1197,7 +1197,7 @@ static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
gpr_log(GPR_DEBUG,
"No RR policy in grpclb instance %p. Adding to grpclb's pending "
"picks",
- (void *)(glb_policy));
+ (void*)(glb_policy));
}
add_pending_pick(&glb_policy->pending_picks, pick_args, target, context,
on_complete);
@@ -1211,16 +1211,16 @@ static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
static grpc_connectivity_state glb_check_connectivity_locked(
- grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_error **connectivity_error) {
- glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
+ grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
+ grpc_error** connectivity_error) {
+ glb_lb_policy* glb_policy = (glb_lb_policy*)pol;
return grpc_connectivity_state_get(&glb_policy->state_tracker,
connectivity_error);
}
-static void glb_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_closure *closure) {
- glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
+static void glb_ping_one_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
+ grpc_closure* closure) {
+ glb_lb_policy* glb_policy = (glb_lb_policy*)pol;
if (glb_policy->rr_policy) {
grpc_lb_policy_ping_one_locked(exec_ctx, glb_policy->rr_policy, closure);
} else {
@@ -1231,23 +1231,23 @@ static void glb_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
}
-static void glb_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx,
- grpc_lb_policy *pol,
- grpc_connectivity_state *current,
- grpc_closure *notify) {
- glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
+static void glb_notify_on_state_change_locked(grpc_exec_ctx* exec_ctx,
+ grpc_lb_policy* pol,
+ grpc_connectivity_state* current,
+ grpc_closure* notify) {
+ glb_lb_policy* glb_policy = (glb_lb_policy*)pol;
grpc_connectivity_state_notify_on_state_change(
exec_ctx, &glb_policy->state_tracker, current, notify);
}
-static void lb_call_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- glb_lb_policy *glb_policy = (glb_lb_policy *)arg;
+static void lb_call_on_retry_timer_locked(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
glb_policy->retry_timer_active = false;
if (!glb_policy->shutting_down && error == GRPC_ERROR_NONE) {
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "Restaring call to LB server (grpclb %p)",
- (void *)glb_policy);
+ (void*)glb_policy);
}
GPR_ASSERT(glb_policy->lb_call == NULL);
query_for_backends_locked(exec_ctx, glb_policy);
@@ -1255,8 +1255,8 @@ static void lb_call_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, "grpclb_retry_timer");
}
-static void maybe_restart_lb_call(grpc_exec_ctx *exec_ctx,
- glb_lb_policy *glb_policy) {
+static void maybe_restart_lb_call(grpc_exec_ctx* exec_ctx,
+ glb_lb_policy* glb_policy) {
if (glb_policy->started_picking && glb_policy->updating_lb_call) {
if (glb_policy->retry_timer_active) {
grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer);
@@ -1270,7 +1270,7 @@ static void maybe_restart_lb_call(grpc_exec_ctx *exec_ctx,
.next_attempt_start_time;
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_DEBUG, "Connection to LB server lost (grpclb: %p)...",
- (void *)glb_policy);
+ (void*)glb_policy);
grpc_millis timeout = next_try - grpc_exec_ctx_now(exec_ctx);
if (timeout > 0) {
gpr_log(GPR_DEBUG, "... retry_timer_active in %" PRIdPTR "ms.",
@@ -1291,11 +1291,11 @@ static void maybe_restart_lb_call(grpc_exec_ctx *exec_ctx,
"lb_on_server_status_received_locked");
}
-static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error);
+static void send_client_load_report_locked(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error);
-static void schedule_next_client_load_report(grpc_exec_ctx *exec_ctx,
- glb_lb_policy *glb_policy) {
+static void schedule_next_client_load_report(grpc_exec_ctx* exec_ctx,
+ glb_lb_policy* glb_policy) {
const grpc_millis next_client_load_report_time =
grpc_exec_ctx_now(exec_ctx) + glb_policy->client_stats_report_interval;
GRPC_CLOSURE_INIT(&glb_policy->client_load_report_closure,
@@ -1306,9 +1306,9 @@ static void schedule_next_client_load_report(grpc_exec_ctx *exec_ctx,
&glb_policy->client_load_report_closure);
}
-static void client_load_report_done_locked(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- glb_lb_policy *glb_policy = (glb_lb_policy *)arg;
+static void client_load_report_done_locked(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
grpc_byte_buffer_destroy(glb_policy->client_load_report_payload);
glb_policy->client_load_report_payload = NULL;
if (error != GRPC_ERROR_NONE || glb_policy->lb_call == NULL) {
@@ -1320,9 +1320,9 @@ static void client_load_report_done_locked(grpc_exec_ctx *exec_ctx, void *arg,
schedule_next_client_load_report(exec_ctx, glb_policy);
}
-static bool load_report_counters_are_zero(grpc_grpclb_request *request) {
- grpc_grpclb_dropped_call_counts *drop_entries =
- (grpc_grpclb_dropped_call_counts *)
+static bool load_report_counters_are_zero(grpc_grpclb_request* request) {
+ grpc_grpclb_dropped_call_counts* drop_entries =
+ (grpc_grpclb_dropped_call_counts*)
request->client_stats.calls_finished_with_drop.arg;
return request->client_stats.num_calls_started == 0 &&
request->client_stats.num_calls_finished == 0 &&
@@ -1332,9 +1332,9 @@ static bool load_report_counters_are_zero(grpc_grpclb_request *request) {
(drop_entries == NULL || drop_entries->num_entries == 0);
}
-static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- glb_lb_policy *glb_policy = (glb_lb_policy *)arg;
+static void send_client_load_report_locked(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
if (error == GRPC_ERROR_CANCELLED || glb_policy->lb_call == NULL) {
glb_policy->client_load_report_timer_pending = false;
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
@@ -1346,7 +1346,7 @@ static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg,
}
// Construct message payload.
GPR_ASSERT(glb_policy->client_load_report_payload == NULL);
- grpc_grpclb_request *request =
+ grpc_grpclb_request* request =
grpc_grpclb_load_report_request_create_locked(glb_policy->client_stats);
// Skip client load report if the counters were all zero in the last
// report and they are still zero in this one.
@@ -1382,12 +1382,12 @@ static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg,
}
}
-static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx,
- void *arg, grpc_error *error);
-static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error);
-static void lb_call_init_locked(grpc_exec_ctx *exec_ctx,
- glb_lb_policy *glb_policy) {
+static void lb_on_server_status_received_locked(grpc_exec_ctx* exec_ctx,
+ void* arg, grpc_error* error);
+static void lb_on_response_received_locked(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error);
+static void lb_call_init_locked(grpc_exec_ctx* exec_ctx,
+ glb_lb_policy* glb_policy) {
GPR_ASSERT(glb_policy->server_name != NULL);
GPR_ASSERT(glb_policy->server_name[0] != '\0');
GPR_ASSERT(glb_policy->lb_call == NULL);
@@ -1416,7 +1416,7 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx,
grpc_metadata_array_init(&glb_policy->lb_initial_metadata_recv);
grpc_metadata_array_init(&glb_policy->lb_trailing_metadata_recv);
- grpc_grpclb_request *request =
+ grpc_grpclb_request* request =
grpc_grpclb_request_create(glb_policy->server_name);
grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
glb_policy->lb_request_payload =
@@ -1442,8 +1442,8 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx,
glb_policy->last_client_load_report_counters_were_zero = false;
}
-static void lb_call_destroy_locked(grpc_exec_ctx *exec_ctx,
- glb_lb_policy *glb_policy) {
+static void lb_call_destroy_locked(grpc_exec_ctx* exec_ctx,
+ glb_lb_policy* glb_policy) {
GPR_ASSERT(glb_policy->lb_call != NULL);
grpc_call_unref(glb_policy->lb_call);
glb_policy->lb_call = NULL;
@@ -1462,8 +1462,8 @@ static void lb_call_destroy_locked(grpc_exec_ctx *exec_ctx,
/*
* Auxiliary functions and LB client callbacks.
*/
-static void query_for_backends_locked(grpc_exec_ctx *exec_ctx,
- glb_lb_policy *glb_policy) {
+static void query_for_backends_locked(grpc_exec_ctx* exec_ctx,
+ glb_lb_policy* glb_policy) {
GPR_ASSERT(glb_policy->lb_channel != NULL);
if (glb_policy->shutting_down) return;
@@ -1472,8 +1472,8 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx,
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO,
"Query for backends (grpclb: %p, lb_channel: %p, lb_call: %p)",
- (void *)glb_policy, (void *)glb_policy->lb_channel,
- (void *)glb_policy->lb_call);
+ (void*)glb_policy, (void*)glb_policy->lb_channel,
+ (void*)glb_policy->lb_call);
}
GPR_ASSERT(glb_policy->lb_call != NULL);
@@ -1481,7 +1481,7 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx,
grpc_op ops[3];
memset(ops, 0, sizeof(ops));
- grpc_op *op = ops;
+ grpc_op* op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->flags = 0;
@@ -1537,12 +1537,12 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(GRPC_CALL_OK == call_error);
}
-static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- glb_lb_policy *glb_policy = (glb_lb_policy *)arg;
+static void lb_on_response_received_locked(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
grpc_op ops[2];
memset(ops, 0, sizeof(ops));
- grpc_op *op = ops;
+ grpc_op* op = ops;
if (glb_policy->lb_response_payload != NULL) {
grpc_backoff_reset(&glb_policy->lb_call_backoff_state);
/* Received data from the LB server. Look inside
@@ -1553,7 +1553,7 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_byte_buffer_reader_destroy(&bbr);
grpc_byte_buffer_destroy(glb_policy->lb_response_payload);
- grpc_grpclb_initial_response *response = NULL;
+ grpc_grpclb_initial_response* response = NULL;
if (!glb_policy->seen_initial_response &&
(response = grpc_grpclb_initial_response_parse(response_slice)) !=
NULL) {
@@ -1581,7 +1581,7 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_grpclb_initial_response_destroy(response);
glb_policy->seen_initial_response = true;
} else {
- grpc_grpclb_serverlist *serverlist =
+ grpc_grpclb_serverlist* serverlist =
grpc_grpclb_response_parse_serverlist(response_slice);
if (serverlist != NULL) {
GPR_ASSERT(glb_policy->lb_call != NULL);
@@ -1591,7 +1591,7 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg,
for (size_t i = 0; i < serverlist->num_servers; ++i) {
grpc_resolved_address addr;
parse_server(serverlist->servers[i], &addr);
- char *ipport;
+ char* ipport;
grpc_sockaddr_to_string(&ipport, &addr, false);
gpr_log(GPR_INFO, "Serverlist[%lu]: %s", (unsigned long)i, ipport);
gpr_free(ipport);
@@ -1664,9 +1664,9 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg,
}
}
-static void lb_on_fallback_timer_locked(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- glb_lb_policy *glb_policy = (glb_lb_policy *)arg;
+static void lb_on_fallback_timer_locked(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
glb_policy->fallback_timer_active = false;
/* If we receive a serverlist after the timer fires but before this callback
* actually runs, don't fall back. */
@@ -1675,7 +1675,7 @@ static void lb_on_fallback_timer_locked(grpc_exec_ctx *exec_ctx, void *arg,
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO,
"Falling back to use backends from resolver (grpclb %p)",
- (void *)glb_policy);
+ (void*)glb_policy);
}
GPR_ASSERT(glb_policy->fallback_backend_addresses != NULL);
rr_handover_locked(exec_ctx, glb_policy);
@@ -1685,18 +1685,18 @@ static void lb_on_fallback_timer_locked(grpc_exec_ctx *exec_ctx, void *arg,
"grpclb_fallback_timer");
}
-static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx,
- void *arg, grpc_error *error) {
- glb_lb_policy *glb_policy = (glb_lb_policy *)arg;
+static void lb_on_server_status_received_locked(grpc_exec_ctx* exec_ctx,
+ void* arg, grpc_error* error) {
+ glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
GPR_ASSERT(glb_policy->lb_call != NULL);
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
- char *status_details =
+ char* status_details =
grpc_slice_to_c_string(glb_policy->lb_call_status_details);
gpr_log(GPR_INFO,
"Status from LB server received. Status = %d, Details = '%s', "
"(call: %p), error %p",
glb_policy->lb_call_status, status_details,
- (void *)glb_policy->lb_call, (void *)error);
+ (void*)glb_policy->lb_call, (void*)error);
gpr_free(status_details);
}
/* We need to perform cleanups no matter what. */
@@ -1709,9 +1709,9 @@ static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx,
}
}
-static void fallback_update_locked(grpc_exec_ctx *exec_ctx,
- glb_lb_policy *glb_policy,
- const grpc_lb_addresses *addresses) {
+static void fallback_update_locked(grpc_exec_ctx* exec_ctx,
+ glb_lb_policy* glb_policy,
+ const grpc_lb_addresses* addresses) {
GPR_ASSERT(glb_policy->fallback_backend_addresses != NULL);
grpc_lb_addresses_destroy(exec_ctx, glb_policy->fallback_backend_addresses);
glb_policy->fallback_backend_addresses =
@@ -1722,10 +1722,10 @@ static void fallback_update_locked(grpc_exec_ctx *exec_ctx,
}
}
-static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- const grpc_lb_policy_args *args) {
- glb_lb_policy *glb_policy = (glb_lb_policy *)policy;
- const grpc_arg *arg =
+static void glb_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
+ const grpc_lb_policy_args* args) {
+ glb_lb_policy* glb_policy = (glb_lb_policy*)policy;
+ const grpc_arg* arg =
grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
if (glb_policy->lb_channel == NULL) {
@@ -1740,12 +1740,12 @@ static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
gpr_log(GPR_ERROR,
"No valid LB addresses channel arg for grpclb %p update, "
"ignoring.",
- (void *)glb_policy);
+ (void*)glb_policy);
}
return;
}
- const grpc_lb_addresses *addresses =
- (const grpc_lb_addresses *)arg->value.pointer.p;
+ const grpc_lb_addresses* addresses =
+ (const grpc_lb_addresses*)arg->value.pointer.p;
// If a non-empty serverlist hasn't been received from the balancer,
// propagate the update to fallback_backend_addresses.
if (glb_policy->serverlist == NULL) {
@@ -1754,7 +1754,7 @@ static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
GPR_ASSERT(glb_policy->lb_channel != NULL);
// Propagate updates to the LB channel (pick_first) through the fake
// resolver.
- grpc_channel_args *lb_channel_args = build_lb_channel_args(
+ grpc_channel_args* lb_channel_args = build_lb_channel_args(
exec_ctx, addresses, glb_policy->response_generator, args->args);
grpc_fake_resolver_response_generator_set_response(
exec_ctx, glb_policy->response_generator, lb_channel_args);
@@ -1764,7 +1764,7 @@ static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
if (!glb_policy->watching_lb_channel) {
glb_policy->lb_channel_connectivity = grpc_channel_check_connectivity_state(
glb_policy->lb_channel, true /* try to connect */);
- grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element(
+ grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element(
grpc_channel_get_channel_stack(glb_policy->lb_channel));
GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
glb_policy->watching_lb_channel = true;
@@ -1781,10 +1781,10 @@ static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
// Invoked as part of the update process. It continues watching the LB channel
// until it shuts down or becomes READY. It's invoked even if the LB channel
// stayed READY throughout the update (for example if the update is identical).
-static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx,
- void *arg,
- grpc_error *error) {
- glb_lb_policy *glb_policy = (glb_lb_policy *)arg;
+static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx* exec_ctx,
+ void* arg,
+ grpc_error* error) {
+ glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
if (glb_policy->shutting_down) goto done;
// Re-initialize the lb_call. This should also take care of updating the
// embedded RR policy. Note that the current RR policy, if any, will stay in
@@ -1793,7 +1793,7 @@ static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx,
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_TRANSIENT_FAILURE: {
/* resub. */
- grpc_channel_element *client_channel_elem =
+ grpc_channel_element* client_channel_elem =
grpc_channel_stack_last_element(
grpc_channel_get_channel_stack(glb_policy->lb_channel));
GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
@@ -1845,29 +1845,29 @@ static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
glb_notify_on_state_change_locked,
glb_update_locked};
-static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
- grpc_lb_policy_factory *factory,
- grpc_lb_policy_args *args) {
+static grpc_lb_policy* glb_create(grpc_exec_ctx* exec_ctx,
+ grpc_lb_policy_factory* factory,
+ grpc_lb_policy_args* args) {
/* Count the number of gRPC-LB addresses. There must be at least one. */
- const grpc_arg *arg =
+ const grpc_arg* arg =
grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
return NULL;
}
- grpc_lb_addresses *addresses = (grpc_lb_addresses *)arg->value.pointer.p;
+ grpc_lb_addresses* addresses = (grpc_lb_addresses*)arg->value.pointer.p;
size_t num_grpclb_addrs = 0;
for (size_t i = 0; i < addresses->num_addresses; ++i) {
if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
}
if (num_grpclb_addrs == 0) return NULL;
- glb_lb_policy *glb_policy = (glb_lb_policy *)gpr_zalloc(sizeof(*glb_policy));
+ glb_lb_policy* glb_policy = (glb_lb_policy*)gpr_zalloc(sizeof(*glb_policy));
/* Get server name. */
arg = grpc_channel_args_find(args->args, GRPC_ARG_SERVER_URI);
GPR_ASSERT(arg != NULL);
GPR_ASSERT(arg->type == GRPC_ARG_STRING);
- grpc_uri *uri = grpc_uri_parse(exec_ctx, arg->value.string, true);
+ grpc_uri* uri = grpc_uri_parse(exec_ctx, arg->value.string, true);
GPR_ASSERT(uri->path[0] != '\0');
glb_policy->server_name =
gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path);
@@ -1891,8 +1891,8 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
// Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
// since we use this to trigger the client_load_reporting filter.
grpc_arg new_arg = grpc_channel_arg_string_create(
- (char *)GRPC_ARG_LB_POLICY_NAME, (char *)"grpclb");
- static const char *args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME};
+ (char*)GRPC_ARG_LB_POLICY_NAME, (char*)"grpclb");
+ static const char* args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME};
glb_policy->args = grpc_channel_args_copy_and_add_and_remove(
args->args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
@@ -1904,9 +1904,9 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
/* Create a client channel over them to communicate with a LB service */
glb_policy->response_generator =
grpc_fake_resolver_response_generator_create();
- grpc_channel_args *lb_channel_args = build_lb_channel_args(
+ grpc_channel_args* lb_channel_args = build_lb_channel_args(
exec_ctx, addresses, glb_policy->response_generator, args->args);
- char *uri_str;
+ char* uri_str;
gpr_asprintf(&uri_str, "fake:///%s", glb_policy->server_name);
glb_policy->lb_channel = grpc_lb_policy_grpclb_create_lb_channel(
exec_ctx, uri_str, args->client_channel_factory, lb_channel_args);
@@ -1917,7 +1917,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
grpc_channel_args_destroy(exec_ctx, lb_channel_args);
gpr_free(uri_str);
if (glb_policy->lb_channel == NULL) {
- gpr_free((void *)glb_policy->server_name);
+ gpr_free((void*)glb_policy->server_name);
grpc_channel_args_destroy(exec_ctx, glb_policy->args);
gpr_free(glb_policy);
return NULL;
@@ -1932,16 +1932,16 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
return &glb_policy->base;
}
-static void glb_factory_ref(grpc_lb_policy_factory *factory) {}
+static void glb_factory_ref(grpc_lb_policy_factory* factory) {}
-static void glb_factory_unref(grpc_lb_policy_factory *factory) {}
+static void glb_factory_unref(grpc_lb_policy_factory* factory) {}
static const grpc_lb_policy_factory_vtable glb_factory_vtable = {
glb_factory_ref, glb_factory_unref, glb_create, "grpclb"};
static grpc_lb_policy_factory glb_lb_policy_factory = {&glb_factory_vtable};
-grpc_lb_policy_factory *grpc_glb_lb_factory_create() {
+grpc_lb_policy_factory* grpc_glb_lb_factory_create() {
return &glb_lb_policy_factory;
}
@@ -1949,15 +1949,15 @@ grpc_lb_policy_factory *grpc_glb_lb_factory_create() {
// Only add client_load_reporting filter if the grpclb LB policy is used.
static bool maybe_add_client_load_reporting_filter(
- grpc_exec_ctx *exec_ctx, grpc_channel_stack_builder *builder, void *arg) {
- const grpc_channel_args *args =
+ grpc_exec_ctx* exec_ctx, grpc_channel_stack_builder* builder, void* arg) {
+ const grpc_channel_args* args =
grpc_channel_stack_builder_get_channel_arguments(builder);
- const grpc_arg *channel_arg =
+ const grpc_arg* channel_arg =
grpc_channel_args_find(args, GRPC_ARG_LB_POLICY_NAME);
if (channel_arg != NULL && channel_arg->type == GRPC_ARG_STRING &&
strcmp(channel_arg->value.string, "grpclb") == 0) {
return grpc_channel_stack_builder_append_filter(
- builder, (const grpc_channel_filter *)arg, NULL, NULL);
+ builder, (const grpc_channel_filter*)arg, NULL, NULL);
}
return true;
}
@@ -1971,7 +1971,7 @@ extern "C" void grpc_lb_policy_grpclb_init() {
grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL,
GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
maybe_add_client_load_reporting_filter,
- (void *)&grpc_client_load_reporting_filter);
+ (void*)&grpc_client_load_reporting_filter);
}
extern "C" void grpc_lb_policy_grpclb_shutdown() {}