aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/census/grpc_filter.c4
-rw-r--r--src/core/ext/client_config/client_channel.c2
-rw-r--r--src/core/ext/client_config/lb_policy.h11
-rw-r--r--src/core/ext/lb_policy/grpclb/grpclb.c1039
-rw-r--r--src/core/ext/lb_policy/grpclb/grpclb.h44
-rw-r--r--src/core/ext/lb_policy/grpclb/load_balancer_api.c141
-rw-r--r--src/core/ext/lb_policy/grpclb/load_balancer_api.h36
-rw-r--r--src/core/ext/load_reporting/load_reporting.c60
-rw-r--r--src/core/ext/load_reporting/load_reporting.h61
-rw-r--r--src/core/ext/load_reporting/load_reporting_filter.c153
-rw-r--r--src/core/ext/load_reporting/load_reporting_filter.h1
-rw-r--r--src/core/ext/transport/chttp2/transport/writing.c6
-rw-r--r--src/core/lib/channel/channel_stack.c4
-rw-r--r--src/core/lib/channel/channel_stack.h18
-rw-r--r--src/core/lib/channel/compress_filter.c3
-rw-r--r--src/core/lib/channel/connected_channel.c2
-rw-r--r--src/core/lib/channel/http_client_filter.c3
-rw-r--r--src/core/lib/channel/http_server_filter.c3
-rw-r--r--src/core/lib/iomgr/tcp_posix.c2
-rw-r--r--src/core/lib/security/transport/client_auth_filter.c3
-rw-r--r--src/core/lib/security/transport/server_auth_filter.c3
-rw-r--r--src/core/lib/surface/call.c60
-rw-r--r--src/core/lib/surface/lame_client.c2
-rw-r--r--src/core/lib/surface/server.c3
-rw-r--r--src/core/lib/transport/static_metadata.c19
-rw-r--r--src/core/lib/transport/static_metadata.h133
-rw-r--r--src/core/plugin_registry/grpc_plugin_registry.c4
-rw-r--r--src/core/plugin_registry/grpc_unsecure_plugin_registry.c4
-rw-r--r--src/python/grpcio/grpc_core_dependencies.py1
29 files changed, 1543 insertions, 282 deletions
diff --git a/src/core/ext/census/grpc_filter.c b/src/core/ext/census/grpc_filter.c
index f51d850e01..3004a1fc97 100644
--- a/src/core/ext/census/grpc_filter.c
+++ b/src/core/ext/census/grpc_filter.c
@@ -138,7 +138,7 @@ static void client_init_call_elem(grpc_exec_ctx *exec_ctx,
static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
- const grpc_call_stats *stats,
+ const grpc_call_final_info *final_info,
void *ignored) {
call_data *d = elem->call_data;
GPR_ASSERT(d != NULL);
@@ -158,7 +158,7 @@ static void server_init_call_elem(grpc_exec_ctx *exec_ctx,
static void server_destroy_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
- const grpc_call_stats *stats,
+ const grpc_call_final_info *final_info,
void *ignored) {
call_data *d = elem->call_data;
GPR_ASSERT(d != NULL);
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c
index a096435c98..739487a06b 100644
--- a/src/core/ext/client_config/client_channel.c
+++ b/src/core/ext/client_config/client_channel.c
@@ -444,7 +444,7 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
/* Destructor for call_data */
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- const grpc_call_stats *stats,
+ const grpc_call_final_info *final_info,
void *and_free_memory) {
grpc_subchannel_call_holder_destroy(exec_ctx, elem->call_data);
gpr_free(and_free_memory);
diff --git a/src/core/ext/client_config/lb_policy.h b/src/core/ext/client_config/lb_policy.h
index 3cfd041d3a..a2f5446fc6 100644
--- a/src/core/ext/client_config/lb_policy.h
+++ b/src/core/ext/client_config/lb_policy.h
@@ -73,7 +73,7 @@ struct grpc_lb_policy_vtable {
void (*ping_one)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_closure *closure);
- /** try to enter a READY connectivity state */
+ /** Try to enter a READY connectivity state */
void (*exit_idle)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
/** check the current connectivity of the lb_policy */
@@ -82,7 +82,9 @@ struct grpc_lb_policy_vtable {
grpc_error **connectivity_error);
/** call notify when the connectivity state of a channel changes from *state.
- Updates *state with the new state of the policy */
+ Updates *state with the new state of the policy. Calling with a NULL \a
+ state cancels the subscription.
+ */
void (*notify_on_state_change)(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *policy,
grpc_connectivity_state *state,
@@ -125,7 +127,7 @@ void grpc_lb_policy_init(grpc_lb_policy *policy,
/** Given initial metadata in \a initial_metadata, find an appropriate
target for this rpc, and 'return' it by calling \a on_complete after setting
\a target.
- Picking can be asynchronous. Any IO should be done under \a pollset. */
+ Picking can be asynchronous. Any IO should be done under \a pollent. */
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_polling_entity *pollent,
grpc_metadata_batch *initial_metadata,
@@ -147,8 +149,11 @@ void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx,
uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq);
+/** Try to enter a READY connectivity state */
void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
+/* Call notify when the connectivity state of a channel changes from \a *state.
+ * Updates \a *state with the new state of the policy */
void grpc_lb_policy_notify_on_state_change(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *policy,
grpc_connectivity_state *state,
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c
new file mode 100644
index 0000000000..dec25efe61
--- /dev/null
+++ b/src/core/ext/lb_policy/grpclb/grpclb.c
@@ -0,0 +1,1039 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/** Implementation of the gRPC LB policy.
+ *
+ * This policy takes as input a set of resolved addresses {a1..an} for which the
+ * LB set was set (it's the resolver's responsibility to ensure this). That is
+ * to say, {a1..an} represent a collection of LB servers.
+ *
+ * An internal channel (\a glb_lb_policy.lb_channel) is created over {a1..an}.
+ * This channel behaves just like a regular channel. In particular, the
+ * constructed URI over the addresses a1..an will use the default pick first
+ * policy to select from this list of LB server backends.
+ *
+ * The first time the policy gets a request for a pick, a ping, or to exit the
+ * idle state, \a query_for_backends() is called. It creates an instance of \a
+ * lb_client_data, an internal struct meant to contain the data associated with
+ * the internal communication with the LB server. This instance is created via
+ * \a lb_client_data_create(). There, the call over lb_channel to pick-first
+ * from {a1..an} is created, the \a LoadBalancingRequest message is assembled
+ * and all necessary callbacks for the progress of the internal call configured.
+ *
+ * Back in \a query_for_backends(), the internal *streaming* call to the LB
+ * server (whichever address from {a1..an} pick-first chose) is kicked off.
+ * It'll progress over the callbacks configured in \a lb_client_data_create()
+ * (see the field docstrings of \a lb_client_data for more details).
+ *
+ * If the call fails with UNIMPLEMENTED, the original call will also fail.
+ * There's a misconfiguration somewhere: at least one of {a1..an} isn't a LB
+ * server, which contradicts the LB bit being set. If the internal call times
+ * out, the usual behavior of pick-first applies, continuing to pick from the
+ * list {a1..an}.
+ *
+ * Upon sucesss, a \a LoadBalancingResponse is expected in \a res_recv_cb. An
+ * invalid one results in the termination of the streaming call. A new streaming
+ * call should be created if possible, failing the original call otherwise.
+ * For a valid \a LoadBalancingResponse, the server list of actual backends is
+ * extracted. A Round Robin policy will be created from this list. There are two
+ * possible scenarios:
+ *
+ * 1. This is the first server list received. There was no previous instance of
+ * the Round Robin policy. \a rr_handover() will instantiate the RR policy
+ * and perform all the pending operations over it.
+ * 2. There's already a RR policy instance active. We need to introduce the new
+ * one build from the new serverlist, but taking care not to disrupt the
+ * operations in progress over the old RR instance. This is done by
+ * decreasing the reference count on the old policy. The moment no more
+ * references are held on the old RR policy, it'll be destroyed and \a
+ * rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN state.
+ * At this point we can transition to a new RR instance safely, which is done
+ * once again via \a rr_handover().
+ *
+ *
+ * Once a RR policy instance is in place (and getting updated as described),
+ * calls to for a pick, a ping or a cancellation will be serviced right away by
+ * forwarding them to the RR instance. Any time there's no RR policy available
+ * (ie, right after the creation of the gRPCLB policy, if an empty serverlist
+ * is received, etc), pick/ping requests are added to a list of pending
+ * picks/pings to be flushed and serviced as part of \a rr_handover() the moment
+ * the RR policy instance becomes available.
+ *
+ * \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the
+ * high level design and details. */
+
+/* TODO(dgq):
+ * - Implement LB service forwarding (point 2c. in the doc's diagram).
+ */
+
+#include <string.h>
+
+#include <grpc/byte_buffer_reader.h>
+#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/host_port.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/ext/client_config/client_channel_factory.h"
+#include "src/core/ext/client_config/lb_policy_registry.h"
+#include "src/core/ext/client_config/parse_address.h"
+#include "src/core/ext/lb_policy/grpclb/grpclb.h"
+#include "src/core/ext/lb_policy/grpclb/load_balancer_api.h"
+#include "src/core/lib/iomgr/sockaddr_utils.h"
+#include "src/core/lib/support/string.h"
+#include "src/core/lib/surface/call.h"
+#include "src/core/lib/surface/channel.h"
+
+int grpc_lb_glb_trace = 0;
+
+typedef struct wrapped_rr_closure_arg {
+ /* the original closure. Usually a on_complete/notify cb for pick() and ping()
+ * calls against the internal RR instance, respectively. */
+ grpc_closure *wrapped_closure;
+
+ /* The RR instance related to the closure */
+ grpc_lb_policy *rr_policy;
+
+ /* when not NULL, represents a pending_{pick,ping} node to be freed upon
+ * closure execution */
+ void *owning_pending_node; /* to be freed if not NULL */
+} wrapped_rr_closure_arg;
+
+/* The \a on_complete closure passed as part of the pick requires keeping a
+ * reference to its associated round robin instance. We wrap this closure in
+ * order to unref the round robin instance upon its invocation */
+static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ wrapped_rr_closure_arg *wc_arg = arg;
+ if (wc_arg->rr_policy != NULL) {
+ if (grpc_lb_glb_trace) {
+ gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")",
+ (intptr_t)wc_arg->rr_policy);
+ }
+ GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "wrapped_rr_closure");
+ }
+ GPR_ASSERT(wc_arg->wrapped_closure != NULL);
+ grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, error, NULL);
+ gpr_free(wc_arg->owning_pending_node);
+}
+
+/* Linked list of pending pick requests. It stores all information needed to
+ * eventually call (Round Robin's) pick() on them. They mainly stay pending
+ * waiting for the RR policy to be created/updated.
+ *
+ * One particularity is the wrapping of the user-provided \a on_complete closure
+ * (in \a wrapped_on_complete and \a wrapped_on_complete_arg). This is needed in
+ * order to correctly unref the RR policy instance upon completion of the pick.
+ * See \a wrapped_rr_closure for details. */
+typedef struct pending_pick {
+ struct pending_pick *next;
+
+ /* polling entity for the pick()'s async notification */
+ grpc_polling_entity *pollent;
+
+ /* the initial metadata for the pick. See grpc_lb_policy_pick() */
+ grpc_metadata_batch *initial_metadata;
+
+ /* bitmask passed to pick() and used for selective cancelling. See
+ * grpc_lb_policy_cancel_picks() */
+ uint32_t initial_metadata_flags;
+
+ /* output argument where to store the pick()ed connected subchannel, or NULL
+ * upon error. */
+ grpc_connected_subchannel **target;
+
+ /* a closure wrapping the original on_complete one to be invoked once the
+ * pick() has completed (regardless of success) */
+ grpc_closure wrapped_on_complete;
+
+ /* args for wrapped_on_complete */
+ wrapped_rr_closure_arg wrapped_on_complete_arg;
+} pending_pick;
+
+static void add_pending_pick(pending_pick **root, grpc_polling_entity *pollent,
+ grpc_metadata_batch *initial_metadata,
+ uint32_t initial_metadata_flags,
+ grpc_connected_subchannel **target,
+ grpc_closure *on_complete) {
+ pending_pick *pp = gpr_malloc(sizeof(*pp));
+ memset(pp, 0, sizeof(pending_pick));
+ memset(&pp->wrapped_on_complete_arg, 0, sizeof(wrapped_rr_closure_arg));
+ pp->next = *root;
+ pp->pollent = pollent;
+ pp->target = target;
+ pp->initial_metadata = initial_metadata;
+ pp->initial_metadata_flags = initial_metadata_flags;
+ pp->wrapped_on_complete_arg.wrapped_closure = on_complete;
+ grpc_closure_init(&pp->wrapped_on_complete, wrapped_rr_closure,
+ &pp->wrapped_on_complete_arg);
+ *root = pp;
+}
+
+/* Same as the \a pending_pick struct but for ping operations */
+typedef struct pending_ping {
+ struct pending_ping *next;
+
+ /* a closure wrapping the original on_complete one to be invoked once the
+ * ping() has completed (regardless of success) */
+ grpc_closure wrapped_notify;
+
+ /* args for wrapped_notify */
+ wrapped_rr_closure_arg wrapped_notify_arg;
+} pending_ping;
+
+static void add_pending_ping(pending_ping **root, grpc_closure *notify) {
+ pending_ping *pping = gpr_malloc(sizeof(*pping));
+ memset(pping, 0, sizeof(pending_ping));
+ memset(&pping->wrapped_notify_arg, 0, sizeof(wrapped_rr_closure_arg));
+ pping->next = *root;
+ grpc_closure_init(&pping->wrapped_notify, wrapped_rr_closure,
+ &pping->wrapped_notify_arg);
+ pping->wrapped_notify_arg.wrapped_closure = notify;
+ *root = pping;
+}
+
+/*
+ * glb_lb_policy
+ */
+typedef struct rr_connectivity_data rr_connectivity_data;
+struct lb_client_data;
+static const grpc_lb_policy_vtable glb_lb_policy_vtable;
+typedef struct glb_lb_policy {
+ /** base policy: must be first */
+ grpc_lb_policy base;
+
+ /** mutex protecting remaining members */
+ gpr_mu mu;
+
+ grpc_client_channel_factory *cc_factory;
+
+ /** for communicating with the LB server */
+ grpc_channel *lb_channel;
+
+ /** the RR policy to use of the backend servers returned by the LB server */
+ grpc_lb_policy *rr_policy;
+
+ bool started_picking;
+
+ /** our connectivity state tracker */
+ grpc_connectivity_state_tracker state_tracker;
+
+ /** stores the deserialized response from the LB. May be NULL until one such
+ * response has arrived. */
+ grpc_grpclb_serverlist *serverlist;
+
+ /** list of picks that are waiting on RR's policy connectivity */
+ pending_pick *pending_picks;
+
+ /** list of pings that are waiting on RR's policy connectivity */
+ pending_ping *pending_pings;
+
+ /** client data associated with the LB server communication */
+ struct lb_client_data *lb_client;
+
+ /** for tracking of the RR connectivity */
+ rr_connectivity_data *rr_connectivity;
+
+ /* a wrapped (see \a wrapped_rr_closure) on-complete closure for readily
+ * available RR picks */
+ grpc_closure wrapped_on_complete;
+
+ /* arguments for the wrapped_on_complete closure */
+ wrapped_rr_closure_arg wc_arg;
+} glb_lb_policy;
+
+/* Keeps track and reacts to changes in connectivity of the RR instance */
+struct rr_connectivity_data {
+ grpc_closure on_change;
+ grpc_connectivity_state state;
+ glb_lb_policy *glb_policy;
+};
+
+static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx,
+ const grpc_grpclb_serverlist *serverlist,
+ glb_lb_policy *glb_policy) {
+ /* TODO(dgq): support mixed ip version */
+ GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0);
+ char **host_ports = gpr_malloc(sizeof(char *) * serverlist->num_servers);
+ for (size_t i = 0; i < serverlist->num_servers; ++i) {
+ gpr_join_host_port(&host_ports[i], serverlist->servers[i]->ip_address,
+ serverlist->servers[i]->port);
+ }
+
+ size_t uri_path_len;
+ char *concat_ipports = gpr_strjoin_sep(
+ (const char **)host_ports, serverlist->num_servers, ",", &uri_path_len);
+
+ grpc_lb_policy_args args;
+ args.client_channel_factory = glb_policy->cc_factory;
+ args.addresses = gpr_malloc(sizeof(grpc_resolved_addresses));
+ args.addresses->naddrs = serverlist->num_servers;
+ args.addresses->addrs =
+ gpr_malloc(sizeof(grpc_resolved_address) * args.addresses->naddrs);
+ size_t out_addrs_idx = 0;
+ for (size_t i = 0; i < serverlist->num_servers; ++i) {
+ grpc_uri uri;
+ struct sockaddr_storage sa;
+ size_t sa_len;
+ uri.path = host_ports[i];
+ if (parse_ipv4(&uri, &sa, &sa_len)) { /* TODO(dgq): add support for ipv6 */
+ memcpy(args.addresses->addrs[out_addrs_idx].addr, &sa, sa_len);
+ args.addresses->addrs[out_addrs_idx].len = sa_len;
+ ++out_addrs_idx;
+ } else {
+ gpr_log(GPR_ERROR, "Invalid LB service address '%s', ignoring.",
+ host_ports[i]);
+ }
+ }
+
+ grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args);
+
+ gpr_free(concat_ipports);
+ for (size_t i = 0; i < serverlist->num_servers; i++) {
+ gpr_free(host_ports[i]);
+ }
+ gpr_free(host_ports);
+ gpr_free(args.addresses->addrs);
+ gpr_free(args.addresses);
+ return rr;
+}
+
+static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
+ grpc_error *error) {
+ GRPC_ERROR_REF(error);
+ glb_policy->rr_policy =
+ create_rr(exec_ctx, glb_policy->serverlist, glb_policy);
+
+ if (grpc_lb_glb_trace) {
+ gpr_log(GPR_INFO, "Created RR policy (0x%" PRIxPTR ")",
+ (intptr_t)glb_policy->rr_policy);
+ }
+ GPR_ASSERT(glb_policy->rr_policy != NULL);
+ glb_policy->rr_connectivity->state = grpc_lb_policy_check_connectivity(
+ exec_ctx, glb_policy->rr_policy, &error);
+ grpc_lb_policy_notify_on_state_change(
+ exec_ctx, glb_policy->rr_policy, &glb_policy->rr_connectivity->state,
+ &glb_policy->rr_connectivity->on_change);
+ grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
+ glb_policy->rr_connectivity->state, error,
+ "rr_handover");
+ grpc_lb_policy_exit_idle(exec_ctx, glb_policy->rr_policy);
+
+ /* flush pending ops */
+ pending_pick *pp;
+ while ((pp = glb_policy->pending_picks)) {
+ glb_policy->pending_picks = pp->next;
+ GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_pick");
+ pp->wrapped_on_complete_arg.rr_policy = glb_policy->rr_policy;
+ if (grpc_lb_glb_trace) {
+ gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "",
+ (intptr_t)glb_policy->rr_policy);
+ }
+ grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pp->pollent,
+ pp->initial_metadata, pp->initial_metadata_flags,
+ pp->target, &pp->wrapped_on_complete);
+ pp->wrapped_on_complete_arg.owning_pending_node = pp;
+ }
+
+ pending_ping *pping;
+ while ((pping = glb_policy->pending_pings)) {
+ glb_policy->pending_pings = pping->next;
+ GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping");
+ pping->wrapped_notify_arg.rr_policy = glb_policy->rr_policy;
+ if (grpc_lb_glb_trace) {
+ gpr_log(GPR_INFO, "Pending ping about to PING from 0x%" PRIxPTR "",
+ (intptr_t)glb_policy->rr_policy);
+ }
+ grpc_lb_policy_ping_one(exec_ctx, glb_policy->rr_policy,
+ &pping->wrapped_notify);
+ pping->wrapped_notify_arg.owning_pending_node = pping;
+ }
+ GRPC_ERROR_UNREF(error);
+}
+
+static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ rr_connectivity_data *rr_conn_data = arg;
+ glb_lb_policy *glb_policy = rr_conn_data->glb_policy;
+ if (rr_conn_data->state == GRPC_CHANNEL_SHUTDOWN) {
+ if (glb_policy->serverlist != NULL) {
+ /* a RR policy is shutting down but there's a serverlist available ->
+ * perform a handover */
+ rr_handover(exec_ctx, glb_policy, error);
+ } else {
+ /* shutting down and no new serverlist available. Bail out. */
+ gpr_free(rr_conn_data);
+ }
+ } else {
+ if (error == GRPC_ERROR_NONE) {
+ /* RR not shutting down. Mimic the RR's policy state */
+ grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
+ rr_conn_data->state, error,
+ "rr_connectivity_changed");
+ /* resubscribe */
+ grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy,
+ &rr_conn_data->state,
+ &rr_conn_data->on_change);
+ } else { /* error */
+ gpr_free(rr_conn_data);
+ }
+ }
+ GRPC_ERROR_UNREF(error);
+}
+
+static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy_factory *factory,
+ grpc_lb_policy_args *args) {
+ glb_lb_policy *glb_policy = gpr_malloc(sizeof(*glb_policy));
+ memset(glb_policy, 0, sizeof(*glb_policy));
+
+ /* All input addresses in args->addresses come from a resolver that claims
+ * they are LB services. It's the resolver's responsibility to make sure this
+ * policy is only instantiated and used in that case.
+ *
+ * Create a client channel over them to communicate with a LB service */
+ glb_policy->cc_factory = args->client_channel_factory;
+ GPR_ASSERT(glb_policy->cc_factory != NULL);
+ if (args->addresses->naddrs == 0) {
+ return NULL;
+ }
+
+ /* construct a target from the args->addresses, in the form
+ * ipvX://ip1:port1,ip2:port2,...
+ * TODO(dgq): support mixed ip version */
+ char **addr_strs = gpr_malloc(sizeof(char *) * args->addresses->naddrs);
+ addr_strs[0] =
+ grpc_sockaddr_to_uri((const struct sockaddr *)&args->addresses->addrs[0]);
+ for (size_t i = 1; i < args->addresses->naddrs; i++) {
+ GPR_ASSERT(grpc_sockaddr_to_string(
+ &addr_strs[i],
+ (const struct sockaddr *)&args->addresses->addrs[i],
+ true) == 0);
+ }
+ size_t uri_path_len;
+ char *target_uri_str = gpr_strjoin_sep(
+ (const char **)addr_strs, args->addresses->naddrs, ",", &uri_path_len);
+
+ /* will pick using pick_first */
+ glb_policy->lb_channel = grpc_client_channel_factory_create_channel(
+ exec_ctx, glb_policy->cc_factory, target_uri_str,
+ GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, NULL);
+
+ gpr_free(target_uri_str);
+ for (size_t i = 0; i < args->addresses->naddrs; i++) {
+ gpr_free(addr_strs[i]);
+ }
+ gpr_free(addr_strs);
+
+ if (glb_policy->lb_channel == NULL) {
+ gpr_free(glb_policy);
+ return NULL;
+ }
+
+ rr_connectivity_data *rr_connectivity =
+ gpr_malloc(sizeof(rr_connectivity_data));
+ memset(rr_connectivity, 0, sizeof(rr_connectivity_data));
+ grpc_closure_init(&rr_connectivity->on_change, rr_connectivity_changed,
+ rr_connectivity);
+ rr_connectivity->glb_policy = glb_policy;
+ glb_policy->rr_connectivity = rr_connectivity;
+
+ grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable);
+ gpr_mu_init(&glb_policy->mu);
+ grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE,
+ "grpclb");
+ return &glb_policy->base;
+}
+
+static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
+ glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
+ GPR_ASSERT(glb_policy->pending_picks == NULL);
+ GPR_ASSERT(glb_policy->pending_pings == NULL);
+ grpc_channel_destroy(glb_policy->lb_channel);
+ glb_policy->lb_channel = NULL;
+ grpc_connectivity_state_destroy(exec_ctx, &glb_policy->state_tracker);
+ if (glb_policy->serverlist != NULL) {
+ grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
+ }
+ gpr_mu_destroy(&glb_policy->mu);
+ gpr_free(glb_policy);
+}
+
+static void lb_client_data_destroy(struct lb_client_data *lb_client);
+static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
+ glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
+ gpr_mu_lock(&glb_policy->mu);
+
+ pending_pick *pp = glb_policy->pending_picks;
+ glb_policy->pending_picks = NULL;
+ pending_ping *pping = glb_policy->pending_pings;
+ glb_policy->pending_pings = NULL;
+ gpr_mu_unlock(&glb_policy->mu);
+
+ while (pp != NULL) {
+ pending_pick *next = pp->next;
+ *pp->target = NULL;
+ grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete, GRPC_ERROR_NONE,
+ NULL);
+ gpr_free(pp);
+ pp = next;
+ }
+
+ while (pping != NULL) {
+ pending_ping *next = pping->next;
+ grpc_exec_ctx_sched(exec_ctx, &pping->wrapped_notify, GRPC_ERROR_NONE,
+ NULL);
+ pping = next;
+ }
+
+ if (glb_policy->rr_policy) {
+ /* unsubscribe */
+ grpc_lb_policy_notify_on_state_change(
+ exec_ctx, glb_policy->rr_policy, NULL,
+ &glb_policy->rr_connectivity->on_change);
+ GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown");
+ }
+
+ lb_client_data_destroy(glb_policy->lb_client);
+ glb_policy->lb_client = NULL;
+
+ grpc_connectivity_state_set(
+ exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN,
+ GRPC_ERROR_CREATE("Channel Shutdown"), "glb_shutdown");
+}
+
+static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+ grpc_connected_subchannel **target) {
+ glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
+ gpr_mu_lock(&glb_policy->mu);
+ pending_pick *pp = glb_policy->pending_picks;
+ glb_policy->pending_picks = NULL;
+ while (pp != NULL) {
+ pending_pick *next = pp->next;
+ if (pp->target == target) {
+ grpc_polling_entity_del_from_pollset_set(
+ exec_ctx, pp->pollent, glb_policy->base.interested_parties);
+ *target = NULL;
+ grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete,
+ GRPC_ERROR_CANCELLED, NULL);
+ gpr_free(pp);
+ } else {
+ pp->next = glb_policy->pending_picks;
+ glb_policy->pending_picks = pp;
+ }
+ pp = next;
+ }
+ gpr_mu_unlock(&glb_policy->mu);
+}
+
+static grpc_call *lb_client_data_get_call(struct lb_client_data *lb_client);
+static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+ uint32_t initial_metadata_flags_mask,
+ uint32_t initial_metadata_flags_eq) {
+ glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
+ gpr_mu_lock(&glb_policy->mu);
+ if (glb_policy->lb_client != NULL) {
+ /* cancel the call to the load balancer service, if any */
+ grpc_call_cancel(lb_client_data_get_call(glb_policy->lb_client), NULL);
+ }
+ pending_pick *pp = glb_policy->pending_picks;
+ glb_policy->pending_picks = NULL;
+ while (pp != NULL) {
+ pending_pick *next = pp->next;
+ if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
+ initial_metadata_flags_eq) {
+ grpc_polling_entity_del_from_pollset_set(
+ exec_ctx, pp->pollent, glb_policy->base.interested_parties);
+ grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete,
+ GRPC_ERROR_CANCELLED, NULL);
+ gpr_free(pp);
+ } else {
+ pp->next = glb_policy->pending_picks;
+ glb_policy->pending_picks = pp;
+ }
+ pp = next;
+ }
+ gpr_mu_unlock(&glb_policy->mu);
+}
+
+static void query_for_backends(grpc_exec_ctx *exec_ctx,
+ glb_lb_policy *glb_policy);
+static void start_picking(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy) {
+ glb_policy->started_picking = true;
+ query_for_backends(exec_ctx, glb_policy);
+}
+
+static void glb_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
+ glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
+ gpr_mu_lock(&glb_policy->mu);
+ if (!glb_policy->started_picking) {
+ start_picking(exec_ctx, glb_policy);
+ }
+ gpr_mu_unlock(&glb_policy->mu);
+}
+
+static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+ grpc_polling_entity *pollent,
+ grpc_metadata_batch *initial_metadata,
+ uint32_t initial_metadata_flags,
+ grpc_connected_subchannel **target,
+ grpc_closure *on_complete) {
+ glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
+ gpr_mu_lock(&glb_policy->mu);
+ int r;
+
+ if (glb_policy->rr_policy != NULL) {
+ if (grpc_lb_glb_trace) {
+ gpr_log(GPR_INFO, "about to PICK from 0x%" PRIxPTR "",
+ (intptr_t)glb_policy->rr_policy);
+ }
+ GRPC_LB_POLICY_REF(glb_policy->rr_policy, "glb_pick");
+ memset(&glb_policy->wc_arg, 0, sizeof(wrapped_rr_closure_arg));
+ glb_policy->wc_arg.rr_policy = glb_policy->rr_policy;
+ glb_policy->wc_arg.wrapped_closure = on_complete;
+ grpc_closure_init(&glb_policy->wrapped_on_complete, wrapped_rr_closure,
+ &glb_policy->wc_arg);
+ r = grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pollent,
+ initial_metadata, initial_metadata_flags, target,
+ &glb_policy->wrapped_on_complete);
+ if (r != 0) {
+ /* the call to grpc_lb_policy_pick has been sychronous. Unreffing the RR
+ * policy and notify the original callback */
+ glb_policy->wc_arg.wrapped_closure = NULL;
+ if (grpc_lb_glb_trace) {
+ gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")",
+ (intptr_t)glb_policy->wc_arg.rr_policy);
+ }
+ GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->wc_arg.rr_policy, "glb_pick");
+ grpc_exec_ctx_sched(exec_ctx, glb_policy->wc_arg.wrapped_closure,
+ GRPC_ERROR_NONE, NULL);
+ }
+ } else {
+ grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent,
+ glb_policy->base.interested_parties);
+ add_pending_pick(&glb_policy->pending_picks, pollent, initial_metadata,
+ initial_metadata_flags, target, on_complete);
+
+ if (!glb_policy->started_picking) {
+ start_picking(exec_ctx, glb_policy);
+ }
+ r = 0;
+ }
+ gpr_mu_unlock(&glb_policy->mu);
+ return r;
+}
+
+static grpc_connectivity_state glb_check_connectivity(
+ grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+ grpc_error **connectivity_error) {
+ glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
+ grpc_connectivity_state st;
+ gpr_mu_lock(&glb_policy->mu);
+ st = grpc_connectivity_state_check(&glb_policy->state_tracker,
+ connectivity_error);
+ gpr_mu_unlock(&glb_policy->mu);
+ return st;
+}
+
+static void glb_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+ grpc_closure *closure) {
+ glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
+ gpr_mu_lock(&glb_policy->mu);
+ if (glb_policy->rr_policy) {
+ grpc_lb_policy_ping_one(exec_ctx, glb_policy->rr_policy, closure);
+ } else {
+ add_pending_ping(&glb_policy->pending_pings, closure);
+ if (!glb_policy->started_picking) {
+ start_picking(exec_ctx, glb_policy);
+ }
+ }
+ gpr_mu_unlock(&glb_policy->mu);
+}
+
+static void glb_notify_on_state_change(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy *pol,
+ grpc_connectivity_state *current,
+ grpc_closure *notify) {
+ glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
+ gpr_mu_lock(&glb_policy->mu);
+ grpc_connectivity_state_notify_on_state_change(
+ exec_ctx, &glb_policy->state_tracker, current, notify);
+
+ gpr_mu_unlock(&glb_policy->mu);
+}
+
+/*
+ * lb_client_data
+ *
+ * Used internally for the client call to the LB */
+typedef struct lb_client_data {
+ gpr_mu mu;
+
+ /* called once initial metadata's been sent */
+ grpc_closure md_sent;
+
+ /* called once initial metadata's been received */
+ grpc_closure md_rcvd;
+
+ /* called once the LoadBalanceRequest has been sent to the LB server. See
+ * src/proto/grpc/.../load_balancer.proto */
+ grpc_closure req_sent;
+
+ /* A response from the LB server has been received (or error). Process it */
+ grpc_closure res_rcvd;
+
+ /* After the client has sent a close to the LB server */
+ grpc_closure close_sent;
+
+ /* ... and the status from the LB server has been received */
+ grpc_closure srv_status_rcvd;
+
+ grpc_call *lb_call; /* streaming call to the LB server, */
+ gpr_timespec deadline; /* for the streaming call to the LB server */
+
+ grpc_metadata_array initial_metadata_recv; /* initial MD from LB server */
+ grpc_metadata_array trailing_metadata_recv; /* trailing MD from LB server */
+
+ /* what's being sent to the LB server. Note that its value may vary if the LB
+ * server indicates a redirect. */
+ grpc_byte_buffer *request_payload;
+
+ /* response from the LB server, if any. Processed in res_recv_cb() */
+ grpc_byte_buffer *response_payload;
+
+ /* the call's status and status detailset in srv_status_rcvd_cb() */
+ grpc_status_code status;
+ char *status_details;
+ size_t status_details_capacity;
+
+ /* pointer back to the enclosing policy */
+ glb_lb_policy *glb_policy;
+} lb_client_data;
+
+static void md_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
+static void md_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
+static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
+static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
+static void close_sent_cb(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error);
+static void srv_status_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error);
+
+static lb_client_data *lb_client_data_create(glb_lb_policy *glb_policy) {
+ lb_client_data *lb_client = gpr_malloc(sizeof(lb_client_data));
+ memset(lb_client, 0, sizeof(lb_client_data));
+
+ gpr_mu_init(&lb_client->mu);
+ grpc_closure_init(&lb_client->md_sent, md_sent_cb, lb_client);
+
+ grpc_closure_init(&lb_client->md_rcvd, md_recv_cb, lb_client);
+ grpc_closure_init(&lb_client->req_sent, req_sent_cb, lb_client);
+ grpc_closure_init(&lb_client->res_rcvd, res_recv_cb, lb_client);
+ grpc_closure_init(&lb_client->close_sent, close_sent_cb, lb_client);
+ grpc_closure_init(&lb_client->srv_status_rcvd, srv_status_rcvd_cb, lb_client);
+
+ /* TODO(dgq): get the deadline from the client config instead of fabricating
+ * one here. */
+ lb_client->deadline = gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
+ gpr_time_from_seconds(3, GPR_TIMESPAN));
+
+ lb_client->lb_call = grpc_channel_create_pollset_set_call(
+ glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS,
+ glb_policy->base.interested_parties, "/BalanceLoad",
+ NULL, /* FIXME(dgq): which "host" value to use? */
+ lb_client->deadline, NULL);
+
+ grpc_metadata_array_init(&lb_client->initial_metadata_recv);
+ grpc_metadata_array_init(&lb_client->trailing_metadata_recv);
+
+ grpc_grpclb_request *request = grpc_grpclb_request_create(
+ "load.balanced.service.name"); /* FIXME(dgq): get the name of the load
+ balanced service from the resolver */
+ gpr_slice request_payload_slice = grpc_grpclb_request_encode(request);
+ lb_client->request_payload =
+ grpc_raw_byte_buffer_create(&request_payload_slice, 1);
+ gpr_slice_unref(request_payload_slice);
+ grpc_grpclb_request_destroy(request);
+
+ lb_client->status_details = NULL;
+ lb_client->status_details_capacity = 0;
+ lb_client->glb_policy = glb_policy;
+ return lb_client;
+}
+
+static void lb_client_data_destroy(lb_client_data *lb_client) {
+ grpc_call_destroy(lb_client->lb_call);
+ grpc_metadata_array_destroy(&lb_client->initial_metadata_recv);
+ grpc_metadata_array_destroy(&lb_client->trailing_metadata_recv);
+
+ grpc_byte_buffer_destroy(lb_client->request_payload);
+
+ gpr_free(lb_client->status_details);
+ gpr_mu_destroy(&lb_client->mu);
+ gpr_free(lb_client);
+}
+static grpc_call *lb_client_data_get_call(lb_client_data *lb_client) {
+ return lb_client->lb_call;
+}
+
+/*
+ * Auxiliary functions and LB client callbacks.
+ */
+static void query_for_backends(grpc_exec_ctx *exec_ctx,
+ glb_lb_policy *glb_policy) {
+ GPR_ASSERT(glb_policy->lb_channel != NULL);
+
+ glb_policy->lb_client = lb_client_data_create(glb_policy);
+ grpc_call_error call_error;
+ grpc_op ops[1];
+ memset(ops, 0, sizeof(ops));
+ grpc_op *op = ops;
+ op->op = GRPC_OP_SEND_INITIAL_METADATA;
+ op->data.send_initial_metadata.count = 0;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ call_error = grpc_call_start_batch_and_execute(
+ exec_ctx, glb_policy->lb_client->lb_call, ops, (size_t)(op - ops),
+ &glb_policy->lb_client->md_sent);
+ GPR_ASSERT(GRPC_CALL_OK == call_error);
+
+ op = ops;
+ op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
+ op->data.recv_status_on_client.trailing_metadata =
+ &glb_policy->lb_client->trailing_metadata_recv;
+ op->data.recv_status_on_client.status = &glb_policy->lb_client->status;
+ op->data.recv_status_on_client.status_details =
+ &glb_policy->lb_client->status_details;
+ op->data.recv_status_on_client.status_details_capacity =
+ &glb_policy->lb_client->status_details_capacity;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ call_error = grpc_call_start_batch_and_execute(
+ exec_ctx, glb_policy->lb_client->lb_call, ops, (size_t)(op - ops),
+ &glb_policy->lb_client->srv_status_rcvd);
+ GPR_ASSERT(GRPC_CALL_OK == call_error);
+}
+
+static void md_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
+ lb_client_data *lb_client = arg;
+ GPR_ASSERT(lb_client->lb_call);
+ grpc_op ops[1];
+ memset(ops, 0, sizeof(ops));
+ grpc_op *op = ops;
+ op->op = GRPC_OP_RECV_INITIAL_METADATA;
+ op->data.recv_initial_metadata = &lb_client->initial_metadata_recv;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ grpc_call_error call_error = grpc_call_start_batch_and_execute(
+ exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops),
+ &lb_client->md_rcvd);
+ GPR_ASSERT(GRPC_CALL_OK == call_error);
+}
+
+static void md_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
+ lb_client_data *lb_client = arg;
+ GPR_ASSERT(lb_client->lb_call);
+ grpc_op ops[1];
+ memset(ops, 0, sizeof(ops));
+ grpc_op *op = ops;
+
+ op->op = GRPC_OP_SEND_MESSAGE;
+ op->data.send_message = lb_client->request_payload;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ grpc_call_error call_error = grpc_call_start_batch_and_execute(
+ exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops),
+ &lb_client->req_sent);
+ GPR_ASSERT(GRPC_CALL_OK == call_error);
+}
+
+static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
+ lb_client_data *lb_client = arg;
+
+ grpc_op ops[1];
+ memset(ops, 0, sizeof(ops));
+ grpc_op *op = ops;
+
+ op->op = GRPC_OP_RECV_MESSAGE;
+ op->data.recv_message = &lb_client->response_payload;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ grpc_call_error call_error = grpc_call_start_batch_and_execute(
+ exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops),
+ &lb_client->res_rcvd);
+ GPR_ASSERT(GRPC_CALL_OK == call_error);
+}
+
+static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
+ lb_client_data *lb_client = arg;
+ grpc_op ops[2];
+ memset(ops, 0, sizeof(ops));
+ grpc_op *op = ops;
+ if (lb_client->response_payload != NULL) {
+ /* Received data from the LB server. Look inside
+ * lb_client->response_payload, for
+ * a serverlist. */
+ grpc_byte_buffer_reader bbr;
+ grpc_byte_buffer_reader_init(&bbr, lb_client->response_payload);
+ gpr_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
+ grpc_byte_buffer_destroy(lb_client->response_payload);
+ grpc_grpclb_serverlist *serverlist =
+ grpc_grpclb_response_parse_serverlist(response_slice);
+ if (serverlist != NULL) {
+ gpr_slice_unref(response_slice);
+ if (grpc_lb_glb_trace) {
+ gpr_log(GPR_INFO, "Serverlist with %zu servers received",
+ serverlist->num_servers);
+ }
+
+ /* update serverlist */
+ if (serverlist->num_servers > 0) {
+ if (grpc_grpclb_serverlist_equals(lb_client->glb_policy->serverlist,
+ serverlist)) {
+ if (grpc_lb_glb_trace) {
+ gpr_log(GPR_INFO,
+ "Incoming server list identical to current, ignoring.");
+ }
+ } else { /* new serverlist */
+ if (lb_client->glb_policy->serverlist != NULL) {
+ /* dispose of the old serverlist */
+ grpc_grpclb_destroy_serverlist(lb_client->glb_policy->serverlist);
+ }
+ /* and update the copy in the glb_lb_policy instance */
+ lb_client->glb_policy->serverlist = serverlist;
+ }
+ if (lb_client->glb_policy->rr_policy == NULL) {
+ /* initial "handover", in this case from a null RR policy, meaning
+ * it'll just create the first RR policy instance */
+ rr_handover(exec_ctx, lb_client->glb_policy, error);
+ } else {
+ /* unref the RR policy, eventually leading to its substitution with a
+ * new one constructed from the received serverlist (see
+ * rr_connectivity_changed) */
+ GRPC_LB_POLICY_UNREF(exec_ctx, lb_client->glb_policy->rr_policy,
+ "serverlist_received");
+ }
+ } else {
+ if (grpc_lb_glb_trace) {
+ gpr_log(GPR_INFO,
+ "Received empty server list. Picks will stay pending until a "
+ "response with > 0 servers is received");
+ }
+ }
+
+ /* keep listening for serverlist updates */
+ op->op = GRPC_OP_RECV_MESSAGE;
+ op->data.recv_message = &lb_client->response_payload;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ const grpc_call_error call_error = grpc_call_start_batch_and_execute(
+ exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops),
+ &lb_client->res_rcvd); /* loop */
+ GPR_ASSERT(GRPC_CALL_OK == call_error);
+ return;
+ }
+
+ GPR_ASSERT(serverlist == NULL);
+ gpr_log(GPR_ERROR, "Invalid LB response received: '%s'",
+ gpr_dump_slice(response_slice, GPR_DUMP_ASCII));
+ gpr_slice_unref(response_slice);
+
+ /* Disconnect from server returning invalid response. */
+ op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ grpc_call_error call_error = grpc_call_start_batch_and_execute(
+ exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops),
+ &lb_client->close_sent);
+ GPR_ASSERT(GRPC_CALL_OK == call_error);
+ }
+ /* empty payload: call cancelled by server. Cleanups happening in
+ * srv_status_rcvd_cb */
+}
+
+static void close_sent_cb(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ if (grpc_lb_glb_trace) {
+ gpr_log(GPR_INFO,
+ "Close from LB client sent. Waiting from server status now");
+ }
+}
+
+static void srv_status_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ lb_client_data *lb_client = arg;
+ if (grpc_lb_glb_trace) {
+ gpr_log(GPR_INFO,
+ "status from lb server received. Status = %d, Details = '%s', "
+ "Capaticy "
+ "= %zu",
+ lb_client->status, lb_client->status_details,
+ lb_client->status_details_capacity);
+ }
+ /* TODO(dgq): deal with stream termination properly (fire up another one? fail
+ * the original call?) */
+}
+
+/* Code wiring the policy with the rest of the core */
+static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
+ glb_destroy, glb_shutdown, glb_pick,
+ glb_cancel_pick, glb_cancel_picks, glb_ping_one,
+ glb_exit_idle, glb_check_connectivity, glb_notify_on_state_change};
+
+static void glb_factory_ref(grpc_lb_policy_factory *factory) {}
+
+static void glb_factory_unref(grpc_lb_policy_factory *factory) {}
+
+static const grpc_lb_policy_factory_vtable glb_factory_vtable = {
+ glb_factory_ref, glb_factory_unref, glb_create, "grpclb"};
+
+static grpc_lb_policy_factory glb_lb_policy_factory = {&glb_factory_vtable};
+
+grpc_lb_policy_factory *grpc_glb_lb_factory_create() {
+ return &glb_lb_policy_factory;
+}
+
+/* Plugin registration */
+void grpc_lb_policy_grpclb_init() {
+ grpc_register_lb_policy(grpc_glb_lb_factory_create());
+ grpc_register_tracer("glb", &grpc_lb_glb_trace);
+}
+
+void grpc_lb_policy_grpclb_shutdown() {}
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.h b/src/core/ext/lb_policy/grpclb/grpclb.h
new file mode 100644
index 0000000000..83552b4fa0
--- /dev/null
+++ b/src/core/ext/lb_policy/grpclb/grpclb.h
@@ -0,0 +1,44 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_LB_POLICY_GRPCLB_GRPCLB_H
+#define GRPC_CORE_EXT_LB_POLICY_GRPCLB_GRPCLB_H
+
+#include "src/core/ext/client_config/lb_policy_factory.h"
+
+/** Returns a load balancing factory for the glb policy, which tries to connect
+ * to a load balancing server to decide the next successfully connected
+ * subchannel to pick. */
+grpc_lb_policy_factory *grpc_glb_lb_factory_create();
+
+#endif /* GRPC_CORE_EXT_LB_POLICY_GRPCLB_GRPCLB_H */
diff --git a/src/core/ext/lb_policy/grpclb/load_balancer_api.c b/src/core/ext/lb_policy/grpclb/load_balancer_api.c
index 59b89997dd..f4720a1345 100644
--- a/src/core/ext/lb_policy/grpclb/load_balancer_api.c
+++ b/src/core/ext/lb_policy/grpclb/load_balancer_api.c
@@ -38,9 +38,15 @@
#include <grpc/support/alloc.h>
typedef struct decode_serverlist_arg {
- int first_pass;
- int i;
+ /* The first pass counts the number of servers in the server list. The second
+ * one allocates and decodes. */
+ bool first_pass;
+ /* The decoding callback is invoked once per server in serverlist. Remember
+ * which index of the serverlist are we currently decoding */
+ size_t decoding_idx;
+ /* Populated after the first pass. Number of server in the input serverlist */
size_t num_servers;
+ /* The decoded serverlist */
grpc_grpclb_server **servers;
} decode_serverlist_arg;
@@ -48,23 +54,24 @@ typedef struct decode_serverlist_arg {
static bool decode_serverlist(pb_istream_t *stream, const pb_field_t *field,
void **arg) {
decode_serverlist_arg *dec_arg = *arg;
- if (dec_arg->first_pass != 0) { /* first pass */
+ if (dec_arg->first_pass) { /* count how many server do we have */
grpc_grpclb_server server;
if (!pb_decode(stream, grpc_lb_v1_Server_fields, &server)) {
return false;
}
dec_arg->num_servers++;
- } else { /* second pass */
+ } else { /* second pass. Actually decode. */
grpc_grpclb_server *server = gpr_malloc(sizeof(grpc_grpclb_server));
+ memset(server, 0, sizeof(grpc_grpclb_server));
GPR_ASSERT(dec_arg->num_servers > 0);
- if (dec_arg->i == 0) { /* first iteration of second pass */
+ if (dec_arg->decoding_idx == 0) { /* first iteration of second pass */
dec_arg->servers =
gpr_malloc(sizeof(grpc_grpclb_server *) * dec_arg->num_servers);
}
if (!pb_decode(stream, grpc_lb_v1_Server_fields, server)) {
return false;
}
- dec_arg->servers[dec_arg->i++] = server;
+ dec_arg->servers[dec_arg->decoding_idx++] = server;
}
return true;
@@ -102,57 +109,59 @@ void grpc_grpclb_request_destroy(grpc_grpclb_request *request) {
gpr_free(request);
}
-grpc_grpclb_response *grpc_grpclb_response_parse(gpr_slice encoded_response) {
- bool status;
+typedef grpc_lb_v1_LoadBalanceResponse grpc_grpclb_response;
+grpc_grpclb_initial_response *grpc_grpclb_initial_response_parse(
+ gpr_slice encoded_grpc_grpclb_response) {
pb_istream_t stream =
- pb_istream_from_buffer(GPR_SLICE_START_PTR(encoded_response),
- GPR_SLICE_LENGTH(encoded_response));
- grpc_grpclb_response *res = gpr_malloc(sizeof(grpc_grpclb_response));
- memset(res, 0, sizeof(*res));
- status = pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, res);
- if (!status) {
- grpc_grpclb_response_destroy(res);
+ pb_istream_from_buffer(GPR_SLICE_START_PTR(encoded_grpc_grpclb_response),
+ GPR_SLICE_LENGTH(encoded_grpc_grpclb_response));
+ grpc_grpclb_response res;
+ memset(&res, 0, sizeof(grpc_grpclb_response));
+ if (!pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res)) {
return NULL;
}
- return res;
+ grpc_grpclb_initial_response *initial_res =
+ gpr_malloc(sizeof(grpc_grpclb_initial_response));
+ memcpy(initial_res, &res.initial_response,
+ sizeof(grpc_grpclb_initial_response));
+
+ return initial_res;
}
grpc_grpclb_serverlist *grpc_grpclb_response_parse_serverlist(
- gpr_slice encoded_response) {
+ gpr_slice encoded_grpc_grpclb_response) {
bool status;
decode_serverlist_arg arg;
pb_istream_t stream =
- pb_istream_from_buffer(GPR_SLICE_START_PTR(encoded_response),
- GPR_SLICE_LENGTH(encoded_response));
+ pb_istream_from_buffer(GPR_SLICE_START_PTR(encoded_grpc_grpclb_response),
+ GPR_SLICE_LENGTH(encoded_grpc_grpclb_response));
pb_istream_t stream_at_start = stream;
- grpc_grpclb_response *res = gpr_malloc(sizeof(grpc_grpclb_response));
- memset(res, 0, sizeof(*res));
+ grpc_grpclb_response res;
+ memset(&res, 0, sizeof(grpc_grpclb_response));
memset(&arg, 0, sizeof(decode_serverlist_arg));
- res->server_list.servers.funcs.decode = decode_serverlist;
- res->server_list.servers.arg = &arg;
- arg.first_pass = 1;
- status = pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, res);
+ res.server_list.servers.funcs.decode = decode_serverlist;
+ res.server_list.servers.arg = &arg;
+ arg.first_pass = true;
+ status = pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res);
if (!status) {
- grpc_grpclb_response_destroy(res);
return NULL;
}
- arg.first_pass = 0;
+ arg.first_pass = false;
status =
- pb_decode(&stream_at_start, grpc_lb_v1_LoadBalanceResponse_fields, res);
+ pb_decode(&stream_at_start, grpc_lb_v1_LoadBalanceResponse_fields, &res);
if (!status) {
- grpc_grpclb_response_destroy(res);
return NULL;
}
grpc_grpclb_serverlist *sl = gpr_malloc(sizeof(grpc_grpclb_serverlist));
+ memset(sl, 0, sizeof(*sl));
sl->num_servers = arg.num_servers;
sl->servers = arg.servers;
- if (res->server_list.has_expiration_interval) {
- sl->expiration_interval = res->server_list.expiration_interval;
+ if (res.server_list.has_expiration_interval) {
+ sl->expiration_interval = res.server_list.expiration_interval;
}
- grpc_grpclb_response_destroy(res);
return sl;
}
@@ -167,6 +176,72 @@ void grpc_grpclb_destroy_serverlist(grpc_grpclb_serverlist *serverlist) {
gpr_free(serverlist);
}
-void grpc_grpclb_response_destroy(grpc_grpclb_response *response) {
+grpc_grpclb_serverlist *grpc_grpclb_serverlist_copy(
+ const grpc_grpclb_serverlist *sl) {
+ grpc_grpclb_serverlist *copy = gpr_malloc(sizeof(grpc_grpclb_serverlist));
+ memset(copy, 0, sizeof(grpc_grpclb_serverlist));
+ copy->num_servers = sl->num_servers;
+ memcpy(&copy->expiration_interval, &sl->expiration_interval,
+ sizeof(grpc_grpclb_duration));
+ copy->servers = gpr_malloc(sizeof(grpc_grpclb_server *) * sl->num_servers);
+ for (size_t i = 0; i < sl->num_servers; i++) {
+ copy->servers[i] = gpr_malloc(sizeof(grpc_grpclb_server));
+ memcpy(copy->servers[i], sl->servers[i], sizeof(grpc_grpclb_server));
+ }
+ return copy;
+}
+
+bool grpc_grpclb_serverlist_equals(const grpc_grpclb_serverlist *lhs,
+ const grpc_grpclb_serverlist *rhs) {
+ if ((lhs == NULL) || (rhs == NULL)) {
+ return false;
+ }
+ if (lhs->num_servers != rhs->num_servers) {
+ return false;
+ }
+ if (grpc_grpclb_duration_compare(&lhs->expiration_interval,
+ &rhs->expiration_interval) != 0) {
+ return false;
+ }
+ for (size_t i = 0; i < lhs->num_servers; i++) {
+ if (!grpc_grpclb_server_equals(lhs->servers[i], rhs->servers[i])) {
+ return false;
+ }
+ }
+ return true;
+}
+
+bool grpc_grpclb_server_equals(const grpc_grpclb_server *lhs,
+ const grpc_grpclb_server *rhs) {
+ return memcmp(lhs, rhs, sizeof(grpc_grpclb_server)) == 0;
+}
+
+int grpc_grpclb_duration_compare(const grpc_grpclb_duration *lhs,
+ const grpc_grpclb_duration *rhs) {
+ GPR_ASSERT(lhs && rhs);
+ if (lhs->has_seconds && rhs->has_seconds) {
+ if (lhs->seconds < rhs->seconds) return -1;
+ if (lhs->seconds > rhs->seconds) return 1;
+ } else if (lhs->has_seconds) {
+ return 1;
+ } else if (rhs->has_seconds) {
+ return -1;
+ }
+
+ GPR_ASSERT(lhs->seconds == rhs->seconds);
+ if (lhs->has_nanos && rhs->has_nanos) {
+ if (lhs->nanos < rhs->nanos) return -1;
+ if (lhs->nanos > rhs->nanos) return 1;
+ } else if (lhs->has_nanos) {
+ return 1;
+ } else if (rhs->has_nanos) {
+ return -1;
+ }
+
+ return 0;
+}
+
+void grpc_grpclb_initial_response_destroy(
+ grpc_grpclb_initial_response *response) {
gpr_free(response);
}
diff --git a/src/core/ext/lb_policy/grpclb/load_balancer_api.h b/src/core/ext/lb_policy/grpclb/load_balancer_api.h
index 71b5616d0c..9726c87a37 100644
--- a/src/core/ext/lb_policy/grpclb/load_balancer_api.h
+++ b/src/core/ext/lb_policy/grpclb/load_balancer_api.h
@@ -46,7 +46,7 @@ extern "C" {
#define GRPC_GRPCLB_SERVICE_NAME_MAX_LENGTH 128
typedef grpc_lb_v1_LoadBalanceRequest grpc_grpclb_request;
-typedef grpc_lb_v1_LoadBalanceResponse grpc_grpclb_response;
+typedef grpc_lb_v1_InitialLoadBalanceResponse grpc_grpclb_initial_response;
typedef grpc_lb_v1_Server grpc_grpclb_server;
typedef grpc_lb_v1_Duration grpc_grpclb_duration;
typedef struct grpc_grpclb_serverlist {
@@ -64,19 +64,37 @@ gpr_slice grpc_grpclb_request_encode(const grpc_grpclb_request *request);
/** Destroy \a request */
void grpc_grpclb_request_destroy(grpc_grpclb_request *request);
-/** Parse (ie, decode) the bytes in \a encoded_response as a \a
- * grpc_grpclb_response */
-grpc_grpclb_response *grpc_grpclb_response_parse(gpr_slice encoded_response);
+/** Parse (ie, decode) the bytes in \a encoded_grpc_grpclb_response as a \a
+ * grpc_grpclb_initial_response */
+grpc_grpclb_initial_response *grpc_grpclb_initial_response_parse(
+ gpr_slice encoded_grpc_grpclb_response);
+
+/** Parse the list of servers from an encoded \a grpc_grpclb_response */
+grpc_grpclb_serverlist *grpc_grpclb_response_parse_serverlist(
+ gpr_slice encoded_grpc_grpclb_response);
+
+/** Return a copy of \a sl. The caller is responsible for calling \a
+ * grpc_grpclb_destroy_serverlist on the returned copy. */
+grpc_grpclb_serverlist *grpc_grpclb_serverlist_copy(
+ const grpc_grpclb_serverlist *sl);
+
+bool grpc_grpclb_serverlist_equals(const grpc_grpclb_serverlist *lhs,
+ const grpc_grpclb_serverlist *rhs);
+
+bool grpc_grpclb_server_equals(const grpc_grpclb_server *lhs,
+ const grpc_grpclb_server *rhs);
/** Destroy \a serverlist */
void grpc_grpclb_destroy_serverlist(grpc_grpclb_serverlist *serverlist);
-/** Parse the list of servers from an encoded \a grpc_grpclb_response */
-grpc_grpclb_serverlist *grpc_grpclb_response_parse_serverlist(
- gpr_slice encoded_response);
+/** Compare \a lhs against \a rhs and return 0 if \a lhs and \a rhs are equal,
+ * < 0 if \a lhs represents a duration shorter than \a rhs and > 0 otherwise */
+int grpc_grpclb_duration_compare(const grpc_grpclb_duration *lhs,
+ const grpc_grpclb_duration *rhs);
-/** Destroy \a response */
-void grpc_grpclb_response_destroy(grpc_grpclb_response *response);
+/** Destroy \a initial_response */
+void grpc_grpclb_initial_response_destroy(
+ grpc_grpclb_initial_response *response);
#ifdef __cplusplus
}
diff --git a/src/core/ext/load_reporting/load_reporting.c b/src/core/ext/load_reporting/load_reporting.c
index 9e4d32676f..df1ea0ec9a 100644
--- a/src/core/ext/load_reporting/load_reporting.c
+++ b/src/core/ext/load_reporting/load_reporting.c
@@ -42,42 +42,12 @@
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/surface/channel_init.h"
-struct grpc_load_reporting_config {
- grpc_load_reporting_fn fn;
- void *user_data;
-};
-
-grpc_load_reporting_config *grpc_load_reporting_config_create(
- grpc_load_reporting_fn fn, void *user_data) {
- GPR_ASSERT(fn != NULL);
- grpc_load_reporting_config *lrc =
- gpr_malloc(sizeof(grpc_load_reporting_config));
- lrc->fn = fn;
- lrc->user_data = user_data;
- return lrc;
-}
-
-grpc_load_reporting_config *grpc_load_reporting_config_copy(
- grpc_load_reporting_config *src) {
- return grpc_load_reporting_config_create(src->fn, src->user_data);
-}
-
-void grpc_load_reporting_config_destroy(grpc_load_reporting_config *lrc) {
- gpr_free(lrc);
-}
-
-void grpc_load_reporting_config_call(
- grpc_load_reporting_config *lrc,
- const grpc_load_reporting_call_data *call_data) {
- lrc->fn(call_data, lrc->user_data);
-}
-
static bool is_load_reporting_enabled(const grpc_channel_args *a) {
if (a == NULL) return false;
for (size_t i = 0; i < a->num_args; i++) {
if (0 == strcmp(a->args[i].key, GRPC_ARG_ENABLE_LOAD_REPORTING)) {
- return a->args[i].type == GRPC_ARG_POINTER &&
- a->args[i].value.pointer.p != NULL;
+ return a->args[i].type == GRPC_ARG_INTEGER &&
+ a->args[i].value.integer != 0;
}
}
return false;
@@ -94,37 +64,17 @@ static bool maybe_add_load_reporting_filter(grpc_channel_stack_builder *builder,
return true;
}
-static void lrd_arg_destroy(void *p) { grpc_load_reporting_config_destroy(p); }
-
-static void *lrd_arg_copy(void *p) {
- return grpc_load_reporting_config_copy(p);
-}
-
-static int lrd_arg_cmp(void *a, void *b) {
- grpc_load_reporting_config *lhs = a;
- grpc_load_reporting_config *rhs = b;
- return !(lhs->fn == rhs->fn && lhs->user_data == rhs->user_data);
-}
-
-static const grpc_arg_pointer_vtable lrd_ptr_vtable = {
- lrd_arg_copy, lrd_arg_destroy, lrd_arg_cmp};
-
-grpc_arg grpc_load_reporting_config_create_arg(
- grpc_load_reporting_config *lrc) {
+grpc_arg grpc_load_reporting_enable_arg() {
grpc_arg arg;
- arg.type = GRPC_ARG_POINTER;
+ arg.type = GRPC_ARG_INTEGER;
arg.key = GRPC_ARG_ENABLE_LOAD_REPORTING;
- arg.value.pointer.p = lrc;
- arg.value.pointer.vtable = &lrd_ptr_vtable;
+ arg.value.integer = 1;
return arg;
}
/* Plugin registration */
void grpc_load_reporting_plugin_init(void) {
- grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL, INT_MAX,
- maybe_add_load_reporting_filter,
- (void *)&grpc_load_reporting_filter);
grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX,
maybe_add_load_reporting_filter,
(void *)&grpc_load_reporting_filter);
diff --git a/src/core/ext/load_reporting/load_reporting.h b/src/core/ext/load_reporting/load_reporting.h
index 316cd89bd7..e37817d8c2 100644
--- a/src/core/ext/load_reporting/load_reporting.h
+++ b/src/core/ext/load_reporting/load_reporting.h
@@ -34,42 +34,47 @@
#ifndef GRPC_CORE_EXT_LOAD_REPORTING_LOAD_REPORTING_H
#define GRPC_CORE_EXT_LOAD_REPORTING_LOAD_REPORTING_H
-#include "src/core/lib/iomgr/closure.h"
-#include "src/core/lib/surface/call.h"
+#include <grpc/impl/codegen/grpc_types.h>
+#include "src/core/lib/channel/channel_stack.h"
-typedef struct grpc_load_reporting_config grpc_load_reporting_config;
+/** Metadata key for initial metadata coming from clients */
+/* TODO(dgq): change to the final value TBD */
+#define GRPC_LOAD_REPORTING_INITIAL_MD_KEY "load-reporting-initial"
-/** Call information to be passed to the provided load reporting function upon
- * completion of the call */
-typedef struct grpc_load_reporting_call_data {
- const grpc_call_stats *stats; /**< Stats for the call */
- const char *trailing_md_string; /**< LR trailing metadata info */
-} grpc_load_reporting_call_data;
+/** Metadata key for trailing metadata from servers */
+/* TODO(dgq): change to the final value TBD */
+#define GRPC_LOAD_REPORTING_TRAILING_MD_KEY "load-reporting-trailing"
-/** Custom function to be called by the load reporting filter. */
-typedef void (*grpc_load_reporting_fn)(
- const grpc_load_reporting_call_data *call_data, void *user_data);
+/** Identifiers for the invocation point of the users LR callback */
+typedef enum grpc_load_reporting_source {
+ GRPC_LR_POINT_UNKNOWN = 0,
+ GRPC_LR_POINT_CHANNEL_CREATION,
+ GRPC_LR_POINT_CHANNEL_DESTRUCTION,
+ GRPC_LR_POINT_CALL_CREATION,
+ GRPC_LR_POINT_CALL_DESTRUCTION
+} grpc_load_reporting_source;
-/** Register \a fn as the function to be invoked by the load reporting filter.
- * \a fn will be invoked at the beginning and at the end of the call.
- *
- * For the first invocation, \a fn's first argument
- * (grpc_load_reporting_call_data*) will be NULL. \a user_data is always passed
- * as-is. */
-grpc_load_reporting_config *grpc_load_reporting_config_create(
- grpc_load_reporting_fn fn, void *user_data);
+/** Call information to be passed to the provided LR callback. */
+typedef struct grpc_load_reporting_call_data {
+ const grpc_load_reporting_source source; /**< point of last data update. */
+
+ /** Unique identifier for the channel associated with the data */
+ intptr_t channel_id;
-grpc_load_reporting_config *grpc_load_reporting_config_copy(
- grpc_load_reporting_config *src);
+ /** Unique identifier for the call associated with the data. If the call
+ * hasn't been created yet, it'll have a value of zero. */
+ intptr_t call_id;
-void grpc_load_reporting_config_destroy(grpc_load_reporting_config *lrc);
+ /** Only valid when \a source is \a GRPC_LR_POINT_CALL_DESTRUCTION, that is,
+ * once the call has completed */
+ const grpc_call_final_info *final_info;
-/** Invoke the function registered by \a grpc_load_reporting_init. */
-void grpc_load_reporting_config_call(
- grpc_load_reporting_config *lrc,
- const grpc_load_reporting_call_data *call_data);
+ const char *initial_md_string; /**< value string for LR's initial md key */
+ const char *trailing_md_string; /**< value string for LR's trailing md key */
+ const char *method_name; /**< Corresponds to :path header */
+} grpc_load_reporting_call_data;
/** Return a \a grpc_arg enabling load reporting */
-grpc_arg grpc_load_reporting_config_create_arg(grpc_load_reporting_config *lrc);
+grpc_arg grpc_load_reporting_enable_arg();
#endif /* GRPC_CORE_EXT_LOAD_REPORTING_LOAD_REPORTING_H */
diff --git a/src/core/ext/load_reporting/load_reporting_filter.c b/src/core/ext/load_reporting/load_reporting_filter.c
index f372f88c3a..99b560ae27 100644
--- a/src/core/ext/load_reporting/load_reporting_filter.c
+++ b/src/core/ext/load_reporting/load_reporting_filter.c
@@ -31,6 +31,7 @@
*
*/
+#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
@@ -42,17 +43,67 @@
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/transport/static_metadata.h"
-typedef struct call_data { const char *trailing_md_string; } call_data;
+typedef struct call_data {
+ intptr_t id; /**< an id unique to the call */
+ char *trailing_md_string;
+ char *initial_md_string;
+ const char *service_method;
+
+ /* stores the recv_initial_metadata op's ready closure, which we wrap with our
+ * own (on_initial_md_ready) in order to capture the incoming initial metadata
+ * */
+ grpc_closure *ops_recv_initial_metadata_ready;
+
+ /* to get notified of the availability of the incoming initial metadata. */
+ grpc_closure on_initial_md_ready;
+ grpc_metadata_batch *recv_initial_metadata;
+} call_data;
+
typedef struct channel_data {
- gpr_mu mu;
- grpc_load_reporting_config *lrc;
+ intptr_t id; /**< an id unique to the channel */
} channel_data;
-static void invoke_lr_fn_locked(grpc_load_reporting_config *lrc,
- grpc_load_reporting_call_data *lr_call_data) {
- GPR_TIMER_BEGIN("load_reporting_config_fn", 0);
- grpc_load_reporting_config_call(lrc, lr_call_data);
- GPR_TIMER_END("load_reporting_config_fn", 0);
+typedef struct {
+ grpc_call_element *elem;
+ grpc_exec_ctx *exec_ctx;
+} recv_md_filter_args;
+
+static grpc_mdelem *recv_md_filter(void *user_data, grpc_mdelem *md) {
+ recv_md_filter_args *a = user_data;
+ grpc_call_element *elem = a->elem;
+ call_data *calld = elem->call_data;
+
+ if (md->key == GRPC_MDSTR_PATH) {
+ calld->service_method = grpc_mdstr_as_c_string(md->value);
+ } else if (md->key == GRPC_MDSTR_LOAD_REPORTING_INITIAL) {
+ calld->initial_md_string = gpr_strdup(grpc_mdstr_as_c_string(md->value));
+ return NULL;
+ }
+
+ return md;
+}
+
+static void on_initial_md_ready(grpc_exec_ctx *exec_ctx, void *user_data,
+ grpc_error *err) {
+ grpc_call_element *elem = user_data;
+ call_data *calld = elem->call_data;
+
+ if (err == GRPC_ERROR_NONE) {
+ recv_md_filter_args a;
+ a.elem = elem;
+ a.exec_ctx = exec_ctx;
+ grpc_metadata_batch_filter(calld->recv_initial_metadata, recv_md_filter,
+ &a);
+ if (calld->service_method == NULL) {
+ err =
+ grpc_error_add_child(err, GRPC_ERROR_CREATE("Missing :path header"));
+ }
+ } else {
+ GRPC_ERROR_REF(err);
+ }
+ calld->ops_recv_initial_metadata_ready->cb(
+ exec_ctx, calld->ops_recv_initial_metadata_ready->cb_arg, err);
+ GRPC_ERROR_UNREF(err);
}
/* Constructor for call_data */
@@ -60,20 +111,41 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element_args *args) {
call_data *calld = elem->call_data;
memset(calld, 0, sizeof(call_data));
+
+ calld->id = (intptr_t)args->call_stack;
+ grpc_closure_init(&calld->on_initial_md_ready, on_initial_md_ready, elem);
+
+ /* TODO(dgq): do something with the data
+ channel_data *chand = elem->channel_data;
+ grpc_load_reporting_call_data lr_call_data = {GRPC_LR_POINT_CALL_CREATION,
+ (intptr_t)chand->id,
+ (intptr_t)calld->id,
+ NULL,
+ NULL,
+ NULL,
+ NULL};
+ */
}
/* Destructor for call_data */
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- const grpc_call_stats *stats, void *ignored) {
- channel_data *chand = elem->channel_data;
+ const grpc_call_final_info *final_info,
+ void *ignored) {
call_data *calld = elem->call_data;
- grpc_load_reporting_call_data lr_call_data = {stats,
- calld->trailing_md_string};
-
- gpr_mu_lock(&chand->mu);
- invoke_lr_fn_locked(chand->lrc, &lr_call_data);
- gpr_mu_unlock(&chand->mu);
+ /* TODO(dgq): do something with the data
+ channel_data *chand = elem->channel_data;
+ grpc_load_reporting_call_data lr_call_data = {GRPC_LR_POINT_CALL_DESTRUCTION,
+ (intptr_t)chand->id,
+ (intptr_t)calld->id,
+ final_info,
+ calld->initial_md_string,
+ calld->trailing_md_string,
+ calld->service_method};
+ */
+
+ gpr_free(calld->initial_md_string);
+ gpr_free(calld->trailing_md_string);
}
/* Constructor for channel_data */
@@ -85,37 +157,40 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx,
channel_data *chand = elem->channel_data;
memset(chand, 0, sizeof(channel_data));
- gpr_mu_init(&chand->mu);
- for (size_t i = 0; i < args->channel_args->num_args; i++) {
- if (0 == strcmp(args->channel_args->args[i].key,
- GRPC_ARG_ENABLE_LOAD_REPORTING)) {
- grpc_load_reporting_config *arg_lrc =
- args->channel_args->args[i].value.pointer.p;
- chand->lrc = grpc_load_reporting_config_copy(arg_lrc);
- GPR_ASSERT(chand->lrc != NULL);
- break;
- }
- }
- GPR_ASSERT(chand->lrc != NULL); /* arg actually found */
-
- gpr_mu_lock(&chand->mu);
- invoke_lr_fn_locked(chand->lrc, NULL);
- gpr_mu_unlock(&chand->mu);
+ chand->id = (intptr_t)args->channel_stack;
+
+ /* TODO(dgq): do something with the data
+ grpc_load_reporting_call_data lr_call_data = {GRPC_LR_POINT_CHANNEL_CREATION,
+ (intptr_t)chand,
+ 0,
+ NULL,
+ NULL,
+ NULL,
+ NULL};
+ */
}
/* Destructor for channel data */
static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem) {
+ /* TODO(dgq): do something with the data
channel_data *chand = elem->channel_data;
- gpr_mu_destroy(&chand->mu);
- grpc_load_reporting_config_destroy(chand->lrc);
+ grpc_load_reporting_call_data lr_call_data = {
+ GRPC_LR_POINT_CHANNEL_DESTRUCTION,
+ (intptr_t)chand->id,
+ 0,
+ NULL,
+ NULL,
+ NULL,
+ NULL};
+ */
}
static grpc_mdelem *lr_trailing_md_filter(void *user_data, grpc_mdelem *md) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
- if (md->key == GRPC_MDSTR_LOAD_REPORTING) {
+ if (md->key == GRPC_MDSTR_LOAD_REPORTING_TRAILING) {
calld->trailing_md_string = gpr_strdup(grpc_mdstr_as_c_string(md->value));
return NULL;
}
@@ -127,8 +202,14 @@ static void lr_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op *op) {
GPR_TIMER_BEGIN("lr_start_transport_stream_op", 0);
+ call_data *calld = elem->call_data;
- if (op->send_trailing_metadata) {
+ if (op->recv_initial_metadata) {
+ calld->recv_initial_metadata = op->recv_initial_metadata;
+ /* substitute our callback for the higher callback */
+ calld->ops_recv_initial_metadata_ready = op->recv_initial_metadata_ready;
+ op->recv_initial_metadata_ready = &calld->on_initial_md_ready;
+ } else if (op->send_trailing_metadata) {
grpc_metadata_batch_filter(op->send_trailing_metadata,
lr_trailing_md_filter, elem);
}
diff --git a/src/core/ext/load_reporting/load_reporting_filter.h b/src/core/ext/load_reporting/load_reporting_filter.h
index f69cd6fdc6..160ed32af9 100644
--- a/src/core/ext/load_reporting/load_reporting_filter.h
+++ b/src/core/ext/load_reporting/load_reporting_filter.h
@@ -34,6 +34,7 @@
#ifndef GRPC_CORE_EXT_LOAD_REPORTING_LOAD_REPORTING_FILTER_H
#define GRPC_CORE_EXT_LOAD_REPORTING_LOAD_REPORTING_FILTER_H
+#include "src/core/ext/load_reporting/load_reporting.h"
#include "src/core/lib/channel/channel_stack.h"
extern const grpc_channel_filter grpc_load_reporting_filter;
diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c
index e0d87725e9..8f7a1f55c6 100644
--- a/src/core/ext/transport/chttp2/transport/writing.c
+++ b/src/core/ext/transport/chttp2/transport/writing.c
@@ -202,6 +202,7 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx,
GPR_TIMER_BEGIN("finalize_outbuf", 0);
+ bool is_first_data_frame = true;
while (
grpc_chttp2_list_pop_writing_stream(transport_writing, &stream_writing)) {
uint32_t max_outgoing =
@@ -266,6 +267,11 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx,
stream_writing->id, &stream_writing->flow_controlled_buffer,
send_bytes, is_last_frame, &stream_writing->stats,
&transport_writing->outbuf);
+ if (is_first_data_frame) {
+ /* TODO(dgq): this is a hack. It'll be fix in a future refactoring */
+ stream_writing->stats.data_bytes -= 5; /* discount grpc framing */
+ is_first_data_frame = false;
+ }
GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", transport_writing,
stream_writing, outgoing_window,
send_bytes);
diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c
index 87175d7943..f9b7347b89 100644
--- a/src/core/lib/channel/channel_stack.c
+++ b/src/core/lib/channel/channel_stack.c
@@ -217,7 +217,7 @@ void grpc_call_stack_ignore_set_pollset_or_pollset_set(
grpc_polling_entity *pollent) {}
void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack,
- const grpc_call_stats *call_stats,
+ const grpc_call_final_info *final_info,
void *and_free_memory) {
grpc_call_element *elems = CALL_ELEMS_FROM_STACK(stack);
size_t count = stack->count;
@@ -225,7 +225,7 @@ void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack,
/* destroy per-filter data */
for (i = 0; i < count; i++) {
- elems[i].filter->destroy_call_elem(exec_ctx, &elems[i], call_stats,
+ elems[i].filter->destroy_call_elem(exec_ctx, &elems[i], final_info,
i == count - 1 ? and_free_memory : NULL);
}
}
diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h
index d72c015b67..19d18ccf93 100644
--- a/src/core/lib/channel/channel_stack.h
+++ b/src/core/lib/channel/channel_stack.h
@@ -75,9 +75,14 @@ typedef struct {
typedef struct {
grpc_transport_stream_stats transport_stream_stats;
gpr_timespec latency; /* From call creating to enqueing of received status */
- grpc_status_code final_status;
} grpc_call_stats;
+/** Information about the call upon completion. */
+typedef struct {
+ grpc_call_stats stats;
+ grpc_status_code final_status;
+} grpc_call_final_info;
+
/* Channel filters specify:
1. the amount of memory needed in the channel & call (via the sizeof_XXX
members)
@@ -119,16 +124,17 @@ typedef struct {
The filter does not need to do any chaining.
The bottom filter of a stack will be passed a non-NULL pointer to
\a and_free_memory that should be passed to gpr_free when destruction
- is complete. */
+ is complete. \a final_info contains data about the completed call, mainly
+ for reporting purposes. */
void (*destroy_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- const grpc_call_stats *stats,
+ const grpc_call_final_info *final_info,
void *and_free_memory);
/* sizeof(per channel data) */
size_t sizeof_channel_data;
/* Initialize per-channel data.
- elem is initialized at the start of the call, and elem->channel_data is
- what needs initializing.
+ elem is initialized at the creating of the channel, and elem->channel_data
+ is what needs initializing.
is_first, is_last designate this elements position in the stack, and are
useful for asserting correct configuration by upper layer code.
The filter does not need to do any chaining */
@@ -243,7 +249,7 @@ void grpc_call_stack_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
/* Destroy a call stack */
void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack,
- const grpc_call_stats *call_stats,
+ const grpc_call_final_info *final_info,
void *and_free_memory);
/* Ignore set pollset{_set} - used by filters if they don't care about pollsets
diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c
index 32ebe53ee6..af21ed794d 100644
--- a/src/core/lib/channel/compress_filter.c
+++ b/src/core/lib/channel/compress_filter.c
@@ -270,7 +270,8 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
/* Destructor for call_data */
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- const grpc_call_stats *stats, void *ignored) {
+ const grpc_call_final_info *final_info,
+ void *ignored) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
gpr_slice_buffer_destroy(&calld->slices);
diff --git a/src/core/lib/channel/connected_channel.c b/src/core/lib/channel/connected_channel.c
index 0a7d27a1dc..73714369cd 100644
--- a/src/core/lib/channel/connected_channel.c
+++ b/src/core/lib/channel/connected_channel.c
@@ -104,7 +104,7 @@ static void set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
/* Destructor for call_data */
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- const grpc_call_stats *stats,
+ const grpc_call_final_info *final_info,
void *and_free_memory) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c
index 8057e251f0..f1ed22c0ad 100644
--- a/src/core/lib/channel/http_client_filter.c
+++ b/src/core/lib/channel/http_client_filter.c
@@ -184,7 +184,8 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
/* Destructor for call_data */
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- const grpc_call_stats *stats, void *ignored) {}
+ const grpc_call_final_info *final_info,
+ void *ignored) {}
static grpc_mdelem *scheme_from_args(const grpc_channel_args *args) {
unsigned i;
diff --git a/src/core/lib/channel/http_server_filter.c b/src/core/lib/channel/http_server_filter.c
index d0beebd817..d52cc7d018 100644
--- a/src/core/lib/channel/http_server_filter.c
+++ b/src/core/lib/channel/http_server_filter.c
@@ -235,7 +235,8 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
/* Destructor for call_data */
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- const grpc_call_stats *stats, void *ignored) {}
+ const grpc_call_final_info *final_info,
+ void *ignored) {}
/* Constructor for channel_data */
static void init_channel_elem(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index ec21e03944..974d5ae479 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -284,7 +284,7 @@ static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
}
/* returns true if done, false if pending; if returning true, *error is set */
-#define MAX_WRITE_IOVEC 1024
+#define MAX_WRITE_IOVEC 1000
static bool tcp_flush(grpc_tcp *tcp, grpc_error **error) {
struct msghdr msg;
struct iovec iov[MAX_WRITE_IOVEC];
diff --git a/src/core/lib/security/transport/client_auth_filter.c b/src/core/lib/security/transport/client_auth_filter.c
index ed7929aa27..382d30756a 100644
--- a/src/core/lib/security/transport/client_auth_filter.c
+++ b/src/core/lib/security/transport/client_auth_filter.c
@@ -282,7 +282,8 @@ static void set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
/* Destructor for call_data */
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- const grpc_call_stats *stats, void *ignored) {
+ const grpc_call_final_info *final_info,
+ void *ignored) {
call_data *calld = elem->call_data;
grpc_call_credentials_unref(calld->creds);
if (calld->host != NULL) {
diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c
index 12e789bde9..5f3d0dcd6e 100644
--- a/src/core/lib/security/transport/server_auth_filter.c
+++ b/src/core/lib/security/transport/server_auth_filter.c
@@ -226,7 +226,8 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
/* Destructor for call_data */
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- const grpc_call_stats *stats, void *ignored) {}
+ const grpc_call_final_info *final_info,
+ void *ignored) {}
/* Constructor for channel_data */
static void init_channel_elem(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index fc9df76dc1..59295f47f0 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -154,8 +154,9 @@ struct grpc_call {
/* Received call statuses from various sources */
received_status status[STATUS_SOURCE_COUNT];
- /* Call stats: only valid after trailing metadata received */
- grpc_call_stats stats;
+ /* Call data useful used for reporting. Only valid after the call has
+ * completed */
+ grpc_call_final_info final_info;
/* Compression algorithm for *incoming* data */
grpc_compression_algorithm incoming_compression_algorithm;
@@ -361,6 +362,25 @@ void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c REF_ARG) {
GRPC_CALL_STACK_UNREF(exec_ctx, CALL_STACK_FROM_CALL(c), REF_REASON);
}
+static void get_final_status(grpc_call *call,
+ void (*set_value)(grpc_status_code code,
+ void *user_data),
+ void *set_value_user_data) {
+ int i;
+ for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
+ if (call->status[i].is_set) {
+ set_value(call->status[i].code, set_value_user_data);
+ return;
+ }
+ }
+ if (call->is_client) {
+ set_value(GRPC_STATUS_UNKNOWN, set_value_user_data);
+ } else {
+ set_value(GRPC_STATUS_OK, set_value_user_data);
+ }
+}
+
+static void set_status_value_directly(grpc_status_code status, void *dest);
static void destroy_call(grpc_exec_ctx *exec_ctx, void *call,
grpc_error *error) {
size_t i;
@@ -392,7 +412,11 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call,
GRPC_CQ_INTERNAL_UNREF(c->cq, "bind");
}
grpc_channel *channel = c->channel;
- grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), &c->stats, c);
+
+ get_final_status(call, set_status_value_directly,
+ &c->final_info.final_status);
+
+ grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), &c->final_info, c);
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call");
GPR_TIMER_END("destroy_call", 0);
}
@@ -414,24 +438,6 @@ static void set_status_details(grpc_call *call, status_source source,
}
}
-static void get_final_status(grpc_call *call,
- void (*set_value)(grpc_status_code code,
- void *user_data),
- void *set_value_user_data) {
- int i;
- for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
- if (call->status[i].is_set) {
- set_value(call->status[i].code, set_value_user_data);
- return;
- }
- }
- if (call->is_client) {
- set_value(GRPC_STATUS_UNKNOWN, set_value_user_data);
- } else {
- set_value(GRPC_STATUS_OK, set_value_user_data);
- }
-}
-
static void set_status_from_error(grpc_call *call, status_source source,
grpc_error *error) {
intptr_t status;
@@ -1361,6 +1367,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
int num_completion_callbacks_needed = 1;
grpc_call_error error = GRPC_CALL_OK;
+ // sent_initial_metadata guards against variable reuse.
+ grpc_metadata compression_md;
+
GPR_TIMER_BEGIN("grpc_call_start_batch", 0);
GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, notify_tag);
@@ -1406,8 +1415,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
goto done_with_error;
}
/* process compression level */
- grpc_metadata compression_md;
- memset(&compression_md, 0, sizeof(grpc_metadata));
+ memset(&compression_md, 0, sizeof(compression_md));
size_t additional_metadata_count = 0;
grpc_compression_level effective_compression_level;
bool level_set = false;
@@ -1608,7 +1616,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
bctl->recv_final_op = 1;
stream_op.recv_trailing_metadata =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
- stream_op.collect_stats = &call->stats.transport_stream_stats;
+ stream_op.collect_stats =
+ &call->final_info.stats.transport_stream_stats;
break;
case GRPC_OP_RECV_CLOSE_ON_SERVER:
/* Flag validation: currently allow no flags */
@@ -1630,7 +1639,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
bctl->recv_final_op = 1;
stream_op.recv_trailing_metadata =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
- stream_op.collect_stats = &call->stats.transport_stream_stats;
+ stream_op.collect_stats =
+ &call->final_info.stats.transport_stream_stats;
break;
}
}
diff --git a/src/core/lib/surface/lame_client.c b/src/core/lib/surface/lame_client.c
index 5ea4cba5d1..0d3168e56a 100644
--- a/src/core/lib/surface/lame_client.c
+++ b/src/core/lib/surface/lame_client.c
@@ -111,7 +111,7 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element_args *args) {}
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- const grpc_call_stats *stats,
+ const grpc_call_final_info *final_info,
void *and_free_memory) {
gpr_free(and_free_memory);
}
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 2f108af48a..a482ba43d8 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -872,7 +872,8 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
}
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- const grpc_call_stats *stats, void *ignored) {
+ const grpc_call_final_info *final_info,
+ void *ignored) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
diff --git a/src/core/lib/transport/static_metadata.c b/src/core/lib/transport/static_metadata.c
index c5f16e530d..c396c1e0b5 100644
--- a/src/core/lib/transport/static_metadata.c
+++ b/src/core/lib/transport/static_metadata.c
@@ -45,10 +45,10 @@ grpc_mdstr grpc_static_mdstr_table[GRPC_STATIC_MDSTR_COUNT];
grpc_mdelem grpc_static_mdelem_table[GRPC_STATIC_MDELEM_COUNT];
uintptr_t grpc_static_mdelem_user_data[GRPC_STATIC_MDELEM_COUNT] = {
- 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
- 0, 0, 0, 0, 0, 0, 4, 8, 6, 2, 4, 8, 6, 0, 0, 0, 0, 0, 0, 0,
- 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
- 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
+ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+ 0, 0, 0, 0, 0, 4, 8, 6, 2, 4, 8, 6, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
const uint8_t grpc_static_metadata_elem_indices[GRPC_STATIC_MDELEM_COUNT * 2] =
{11, 35, 10, 35, 12, 35, 12, 49, 13, 35, 14, 35, 15, 35, 16, 35, 17, 35,
@@ -56,10 +56,10 @@ const uint8_t grpc_static_metadata_elem_indices[GRPC_STATIC_MDELEM_COUNT * 2] =
30, 18, 30, 35, 31, 35, 32, 35, 36, 35, 37, 35, 38, 35, 39, 35, 42, 33,
42, 34, 42, 48, 42, 53, 42, 54, 42, 55, 42, 56, 43, 33, 43, 48, 43, 53,
46, 0, 46, 1, 46, 2, 50, 35, 57, 35, 58, 35, 59, 35, 60, 35, 61, 35,
- 62, 35, 63, 35, 64, 35, 65, 35, 66, 35, 67, 40, 67, 69, 67, 72, 68, 80,
- 68, 81, 70, 35, 71, 35, 73, 35, 74, 35, 75, 35, 76, 35, 77, 41, 77, 51,
- 77, 52, 78, 35, 79, 35, 82, 3, 82, 4, 82, 5, 82, 6, 82, 7, 82, 8,
- 82, 9, 83, 35, 84, 85, 86, 35, 87, 35, 88, 35, 89, 35, 90, 35};
+ 62, 35, 63, 35, 64, 35, 65, 35, 66, 35, 67, 35, 68, 40, 68, 70, 68, 73,
+ 69, 81, 69, 82, 71, 35, 72, 35, 74, 35, 75, 35, 76, 35, 77, 35, 78, 41,
+ 78, 51, 78, 52, 79, 35, 80, 35, 83, 3, 83, 4, 83, 5, 83, 6, 83, 7,
+ 83, 8, 83, 9, 84, 35, 85, 86, 87, 35, 88, 35, 89, 35, 90, 35, 91, 35};
const char *const grpc_static_metadata_strings[GRPC_STATIC_MDSTR_COUNT] = {
"0",
@@ -126,7 +126,8 @@ const char *const grpc_static_metadata_strings[GRPC_STATIC_MDSTR_COUNT] = {
"if-unmodified-since",
"last-modified",
"link",
- "load-reporting",
+ "load-reporting-initial",
+ "load-reporting-trailing",
"location",
"max-forwards",
":method",
diff --git a/src/core/lib/transport/static_metadata.h b/src/core/lib/transport/static_metadata.h
index 5ff0d2f3bc..491c8cf125 100644
--- a/src/core/lib/transport/static_metadata.h
+++ b/src/core/lib/transport/static_metadata.h
@@ -44,7 +44,7 @@
#include "src/core/lib/transport/metadata.h"
-#define GRPC_STATIC_MDSTR_COUNT 91
+#define GRPC_STATIC_MDSTR_COUNT 92
extern grpc_mdstr grpc_static_mdstr_table[GRPC_STATIC_MDSTR_COUNT];
/* "0" */
#define GRPC_MDSTR_0 (&grpc_static_mdstr_table[0])
@@ -175,62 +175,64 @@ extern grpc_mdstr grpc_static_mdstr_table[GRPC_STATIC_MDSTR_COUNT];
#define GRPC_MDSTR_LAST_MODIFIED (&grpc_static_mdstr_table[62])
/* "link" */
#define GRPC_MDSTR_LINK (&grpc_static_mdstr_table[63])
-/* "load-reporting" */
-#define GRPC_MDSTR_LOAD_REPORTING (&grpc_static_mdstr_table[64])
+/* "load-reporting-initial" */
+#define GRPC_MDSTR_LOAD_REPORTING_INITIAL (&grpc_static_mdstr_table[64])
+/* "load-reporting-trailing" */
+#define GRPC_MDSTR_LOAD_REPORTING_TRAILING (&grpc_static_mdstr_table[65])
/* "location" */
-#define GRPC_MDSTR_LOCATION (&grpc_static_mdstr_table[65])
+#define GRPC_MDSTR_LOCATION (&grpc_static_mdstr_table[66])
/* "max-forwards" */
-#define GRPC_MDSTR_MAX_FORWARDS (&grpc_static_mdstr_table[66])
+#define GRPC_MDSTR_MAX_FORWARDS (&grpc_static_mdstr_table[67])
/* ":method" */
-#define GRPC_MDSTR_METHOD (&grpc_static_mdstr_table[67])
+#define GRPC_MDSTR_METHOD (&grpc_static_mdstr_table[68])
/* ":path" */
-#define GRPC_MDSTR_PATH (&grpc_static_mdstr_table[68])
+#define GRPC_MDSTR_PATH (&grpc_static_mdstr_table[69])
/* "POST" */
-#define GRPC_MDSTR_POST (&grpc_static_mdstr_table[69])
+#define GRPC_MDSTR_POST (&grpc_static_mdstr_table[70])
/* "proxy-authenticate" */
-#define GRPC_MDSTR_PROXY_AUTHENTICATE (&grpc_static_mdstr_table[70])
+#define GRPC_MDSTR_PROXY_AUTHENTICATE (&grpc_static_mdstr_table[71])
/* "proxy-authorization" */
-#define GRPC_MDSTR_PROXY_AUTHORIZATION (&grpc_static_mdstr_table[71])
+#define GRPC_MDSTR_PROXY_AUTHORIZATION (&grpc_static_mdstr_table[72])
/* "PUT" */
-#define GRPC_MDSTR_PUT (&grpc_static_mdstr_table[72])
+#define GRPC_MDSTR_PUT (&grpc_static_mdstr_table[73])
/* "range" */
-#define GRPC_MDSTR_RANGE (&grpc_static_mdstr_table[73])
+#define GRPC_MDSTR_RANGE (&grpc_static_mdstr_table[74])
/* "referer" */
-#define GRPC_MDSTR_REFERER (&grpc_static_mdstr_table[74])
+#define GRPC_MDSTR_REFERER (&grpc_static_mdstr_table[75])
/* "refresh" */
-#define GRPC_MDSTR_REFRESH (&grpc_static_mdstr_table[75])
+#define GRPC_MDSTR_REFRESH (&grpc_static_mdstr_table[76])
/* "retry-after" */
-#define GRPC_MDSTR_RETRY_AFTER (&grpc_static_mdstr_table[76])
+#define GRPC_MDSTR_RETRY_AFTER (&grpc_static_mdstr_table[77])
/* ":scheme" */
-#define GRPC_MDSTR_SCHEME (&grpc_static_mdstr_table[77])
+#define GRPC_MDSTR_SCHEME (&grpc_static_mdstr_table[78])
/* "server" */
-#define GRPC_MDSTR_SERVER (&grpc_static_mdstr_table[78])
+#define GRPC_MDSTR_SERVER (&grpc_static_mdstr_table[79])
/* "set-cookie" */
-#define GRPC_MDSTR_SET_COOKIE (&grpc_static_mdstr_table[79])
+#define GRPC_MDSTR_SET_COOKIE (&grpc_static_mdstr_table[80])
/* "/" */
-#define GRPC_MDSTR_SLASH (&grpc_static_mdstr_table[80])
+#define GRPC_MDSTR_SLASH (&grpc_static_mdstr_table[81])
/* "/index.html" */
-#define GRPC_MDSTR_SLASH_INDEX_DOT_HTML (&grpc_static_mdstr_table[81])
+#define GRPC_MDSTR_SLASH_INDEX_DOT_HTML (&grpc_static_mdstr_table[82])
/* ":status" */
-#define GRPC_MDSTR_STATUS (&grpc_static_mdstr_table[82])
+#define GRPC_MDSTR_STATUS (&grpc_static_mdstr_table[83])
/* "strict-transport-security" */
-#define GRPC_MDSTR_STRICT_TRANSPORT_SECURITY (&grpc_static_mdstr_table[83])
+#define GRPC_MDSTR_STRICT_TRANSPORT_SECURITY (&grpc_static_mdstr_table[84])
/* "te" */
-#define GRPC_MDSTR_TE (&grpc_static_mdstr_table[84])
+#define GRPC_MDSTR_TE (&grpc_static_mdstr_table[85])
/* "trailers" */
-#define GRPC_MDSTR_TRAILERS (&grpc_static_mdstr_table[85])
+#define GRPC_MDSTR_TRAILERS (&grpc_static_mdstr_table[86])
/* "transfer-encoding" */
-#define GRPC_MDSTR_TRANSFER_ENCODING (&grpc_static_mdstr_table[86])
+#define GRPC_MDSTR_TRANSFER_ENCODING (&grpc_static_mdstr_table[87])
/* "user-agent" */
-#define GRPC_MDSTR_USER_AGENT (&grpc_static_mdstr_table[87])
+#define GRPC_MDSTR_USER_AGENT (&grpc_static_mdstr_table[88])
/* "vary" */
-#define GRPC_MDSTR_VARY (&grpc_static_mdstr_table[88])
+#define GRPC_MDSTR_VARY (&grpc_static_mdstr_table[89])
/* "via" */
-#define GRPC_MDSTR_VIA (&grpc_static_mdstr_table[89])
+#define GRPC_MDSTR_VIA (&grpc_static_mdstr_table[90])
/* "www-authenticate" */
-#define GRPC_MDSTR_WWW_AUTHENTICATE (&grpc_static_mdstr_table[90])
+#define GRPC_MDSTR_WWW_AUTHENTICATE (&grpc_static_mdstr_table[91])
-#define GRPC_STATIC_MDELEM_COUNT 80
+#define GRPC_STATIC_MDELEM_COUNT 81
extern grpc_mdelem grpc_static_mdelem_table[GRPC_STATIC_MDELEM_COUNT];
extern uintptr_t grpc_static_mdelem_user_data[GRPC_STATIC_MDELEM_COUNT];
/* "accept-charset": "" */
@@ -335,73 +337,76 @@ extern uintptr_t grpc_static_mdelem_user_data[GRPC_STATIC_MDELEM_COUNT];
#define GRPC_MDELEM_LAST_MODIFIED_EMPTY (&grpc_static_mdelem_table[45])
/* "link": "" */
#define GRPC_MDELEM_LINK_EMPTY (&grpc_static_mdelem_table[46])
-/* "load-reporting": "" */
-#define GRPC_MDELEM_LOAD_REPORTING_EMPTY (&grpc_static_mdelem_table[47])
+/* "load-reporting-initial": "" */
+#define GRPC_MDELEM_LOAD_REPORTING_INITIAL_EMPTY (&grpc_static_mdelem_table[47])
+/* "load-reporting-trailing": "" */
+#define GRPC_MDELEM_LOAD_REPORTING_TRAILING_EMPTY \
+ (&grpc_static_mdelem_table[48])
/* "location": "" */
-#define GRPC_MDELEM_LOCATION_EMPTY (&grpc_static_mdelem_table[48])
+#define GRPC_MDELEM_LOCATION_EMPTY (&grpc_static_mdelem_table[49])
/* "max-forwards": "" */
-#define GRPC_MDELEM_MAX_FORWARDS_EMPTY (&grpc_static_mdelem_table[49])
+#define GRPC_MDELEM_MAX_FORWARDS_EMPTY (&grpc_static_mdelem_table[50])
/* ":method": "GET" */
-#define GRPC_MDELEM_METHOD_GET (&grpc_static_mdelem_table[50])
+#define GRPC_MDELEM_METHOD_GET (&grpc_static_mdelem_table[51])
/* ":method": "POST" */
-#define GRPC_MDELEM_METHOD_POST (&grpc_static_mdelem_table[51])
+#define GRPC_MDELEM_METHOD_POST (&grpc_static_mdelem_table[52])
/* ":method": "PUT" */
-#define GRPC_MDELEM_METHOD_PUT (&grpc_static_mdelem_table[52])
+#define GRPC_MDELEM_METHOD_PUT (&grpc_static_mdelem_table[53])
/* ":path": "/" */
-#define GRPC_MDELEM_PATH_SLASH (&grpc_static_mdelem_table[53])
+#define GRPC_MDELEM_PATH_SLASH (&grpc_static_mdelem_table[54])
/* ":path": "/index.html" */
-#define GRPC_MDELEM_PATH_SLASH_INDEX_DOT_HTML (&grpc_static_mdelem_table[54])
+#define GRPC_MDELEM_PATH_SLASH_INDEX_DOT_HTML (&grpc_static_mdelem_table[55])
/* "proxy-authenticate": "" */
-#define GRPC_MDELEM_PROXY_AUTHENTICATE_EMPTY (&grpc_static_mdelem_table[55])
+#define GRPC_MDELEM_PROXY_AUTHENTICATE_EMPTY (&grpc_static_mdelem_table[56])
/* "proxy-authorization": "" */
-#define GRPC_MDELEM_PROXY_AUTHORIZATION_EMPTY (&grpc_static_mdelem_table[56])
+#define GRPC_MDELEM_PROXY_AUTHORIZATION_EMPTY (&grpc_static_mdelem_table[57])
/* "range": "" */
-#define GRPC_MDELEM_RANGE_EMPTY (&grpc_static_mdelem_table[57])
+#define GRPC_MDELEM_RANGE_EMPTY (&grpc_static_mdelem_table[58])
/* "referer": "" */
-#define GRPC_MDELEM_REFERER_EMPTY (&grpc_static_mdelem_table[58])
+#define GRPC_MDELEM_REFERER_EMPTY (&grpc_static_mdelem_table[59])
/* "refresh": "" */
-#define GRPC_MDELEM_REFRESH_EMPTY (&grpc_static_mdelem_table[59])
+#define GRPC_MDELEM_REFRESH_EMPTY (&grpc_static_mdelem_table[60])
/* "retry-after": "" */
-#define GRPC_MDELEM_RETRY_AFTER_EMPTY (&grpc_static_mdelem_table[60])
+#define GRPC_MDELEM_RETRY_AFTER_EMPTY (&grpc_static_mdelem_table[61])
/* ":scheme": "grpc" */
-#define GRPC_MDELEM_SCHEME_GRPC (&grpc_static_mdelem_table[61])
+#define GRPC_MDELEM_SCHEME_GRPC (&grpc_static_mdelem_table[62])
/* ":scheme": "http" */
-#define GRPC_MDELEM_SCHEME_HTTP (&grpc_static_mdelem_table[62])
+#define GRPC_MDELEM_SCHEME_HTTP (&grpc_static_mdelem_table[63])
/* ":scheme": "https" */
-#define GRPC_MDELEM_SCHEME_HTTPS (&grpc_static_mdelem_table[63])
+#define GRPC_MDELEM_SCHEME_HTTPS (&grpc_static_mdelem_table[64])
/* "server": "" */
-#define GRPC_MDELEM_SERVER_EMPTY (&grpc_static_mdelem_table[64])
+#define GRPC_MDELEM_SERVER_EMPTY (&grpc_static_mdelem_table[65])
/* "set-cookie": "" */
-#define GRPC_MDELEM_SET_COOKIE_EMPTY (&grpc_static_mdelem_table[65])
+#define GRPC_MDELEM_SET_COOKIE_EMPTY (&grpc_static_mdelem_table[66])
/* ":status": "200" */
-#define GRPC_MDELEM_STATUS_200 (&grpc_static_mdelem_table[66])
+#define GRPC_MDELEM_STATUS_200 (&grpc_static_mdelem_table[67])
/* ":status": "204" */
-#define GRPC_MDELEM_STATUS_204 (&grpc_static_mdelem_table[67])
+#define GRPC_MDELEM_STATUS_204 (&grpc_static_mdelem_table[68])
/* ":status": "206" */
-#define GRPC_MDELEM_STATUS_206 (&grpc_static_mdelem_table[68])
+#define GRPC_MDELEM_STATUS_206 (&grpc_static_mdelem_table[69])
/* ":status": "304" */
-#define GRPC_MDELEM_STATUS_304 (&grpc_static_mdelem_table[69])
+#define GRPC_MDELEM_STATUS_304 (&grpc_static_mdelem_table[70])
/* ":status": "400" */
-#define GRPC_MDELEM_STATUS_400 (&grpc_static_mdelem_table[70])
+#define GRPC_MDELEM_STATUS_400 (&grpc_static_mdelem_table[71])
/* ":status": "404" */
-#define GRPC_MDELEM_STATUS_404 (&grpc_static_mdelem_table[71])
+#define GRPC_MDELEM_STATUS_404 (&grpc_static_mdelem_table[72])
/* ":status": "500" */
-#define GRPC_MDELEM_STATUS_500 (&grpc_static_mdelem_table[72])
+#define GRPC_MDELEM_STATUS_500 (&grpc_static_mdelem_table[73])
/* "strict-transport-security": "" */
#define GRPC_MDELEM_STRICT_TRANSPORT_SECURITY_EMPTY \
- (&grpc_static_mdelem_table[73])
+ (&grpc_static_mdelem_table[74])
/* "te": "trailers" */
-#define GRPC_MDELEM_TE_TRAILERS (&grpc_static_mdelem_table[74])
+#define GRPC_MDELEM_TE_TRAILERS (&grpc_static_mdelem_table[75])
/* "transfer-encoding": "" */
-#define GRPC_MDELEM_TRANSFER_ENCODING_EMPTY (&grpc_static_mdelem_table[75])
+#define GRPC_MDELEM_TRANSFER_ENCODING_EMPTY (&grpc_static_mdelem_table[76])
/* "user-agent": "" */
-#define GRPC_MDELEM_USER_AGENT_EMPTY (&grpc_static_mdelem_table[76])
+#define GRPC_MDELEM_USER_AGENT_EMPTY (&grpc_static_mdelem_table[77])
/* "vary": "" */
-#define GRPC_MDELEM_VARY_EMPTY (&grpc_static_mdelem_table[77])
+#define GRPC_MDELEM_VARY_EMPTY (&grpc_static_mdelem_table[78])
/* "via": "" */
-#define GRPC_MDELEM_VIA_EMPTY (&grpc_static_mdelem_table[78])
+#define GRPC_MDELEM_VIA_EMPTY (&grpc_static_mdelem_table[79])
/* "www-authenticate": "" */
-#define GRPC_MDELEM_WWW_AUTHENTICATE_EMPTY (&grpc_static_mdelem_table[79])
+#define GRPC_MDELEM_WWW_AUTHENTICATE_EMPTY (&grpc_static_mdelem_table[80])
extern const uint8_t
grpc_static_metadata_elem_indices[GRPC_STATIC_MDELEM_COUNT * 2];
diff --git a/src/core/plugin_registry/grpc_plugin_registry.c b/src/core/plugin_registry/grpc_plugin_registry.c
index 905cd59e23..7a7a9ce477 100644
--- a/src/core/plugin_registry/grpc_plugin_registry.c
+++ b/src/core/plugin_registry/grpc_plugin_registry.c
@@ -37,6 +37,8 @@ extern void grpc_chttp2_plugin_init(void);
extern void grpc_chttp2_plugin_shutdown(void);
extern void grpc_client_config_init(void);
extern void grpc_client_config_shutdown(void);
+extern void grpc_lb_policy_grpclb_init(void);
+extern void grpc_lb_policy_grpclb_shutdown(void);
extern void grpc_lb_policy_pick_first_init(void);
extern void grpc_lb_policy_pick_first_shutdown(void);
extern void grpc_lb_policy_round_robin_init(void);
@@ -55,6 +57,8 @@ void grpc_register_built_in_plugins(void) {
grpc_chttp2_plugin_shutdown);
grpc_register_plugin(grpc_client_config_init,
grpc_client_config_shutdown);
+ grpc_register_plugin(grpc_lb_policy_grpclb_init,
+ grpc_lb_policy_grpclb_shutdown);
grpc_register_plugin(grpc_lb_policy_pick_first_init,
grpc_lb_policy_pick_first_shutdown);
grpc_register_plugin(grpc_lb_policy_round_robin_init,
diff --git a/src/core/plugin_registry/grpc_unsecure_plugin_registry.c b/src/core/plugin_registry/grpc_unsecure_plugin_registry.c
index 7995078725..ad4ddf0ff4 100644
--- a/src/core/plugin_registry/grpc_unsecure_plugin_registry.c
+++ b/src/core/plugin_registry/grpc_unsecure_plugin_registry.c
@@ -43,6 +43,8 @@ extern void grpc_resolver_sockaddr_init(void);
extern void grpc_resolver_sockaddr_shutdown(void);
extern void grpc_load_reporting_plugin_init(void);
extern void grpc_load_reporting_plugin_shutdown(void);
+extern void grpc_lb_policy_grpclb_init(void);
+extern void grpc_lb_policy_grpclb_shutdown(void);
extern void grpc_lb_policy_pick_first_init(void);
extern void grpc_lb_policy_pick_first_shutdown(void);
extern void grpc_lb_policy_round_robin_init(void);
@@ -61,6 +63,8 @@ void grpc_register_built_in_plugins(void) {
grpc_resolver_sockaddr_shutdown);
grpc_register_plugin(grpc_load_reporting_plugin_init,
grpc_load_reporting_plugin_shutdown);
+ grpc_register_plugin(grpc_lb_policy_grpclb_init,
+ grpc_lb_policy_grpclb_shutdown);
grpc_register_plugin(grpc_lb_policy_pick_first_init,
grpc_lb_policy_pick_first_shutdown);
grpc_register_plugin(grpc_lb_policy_round_robin_init,
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index fc0908b2f1..e3f1820753 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -240,6 +240,7 @@ CORE_SOURCE_FILES = [
'src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c',
'src/core/ext/transport/chttp2/client/insecure/channel_create.c',
'src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c',
+ 'src/core/ext/lb_policy/grpclb/grpclb.c',
'src/core/ext/lb_policy/grpclb/load_balancer_api.c',
'src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c',
'third_party/nanopb/pb_common.c',