diff options
Diffstat (limited to 'src/core/ext')
-rw-r--r-- | src/core/ext/client_config/lb_policy.h | 11 | ||||
-rw-r--r-- | src/core/ext/lb_policy/grpclb/grpclb.c | 856 | ||||
-rw-r--r-- | src/core/ext/lb_policy/grpclb/grpclb.h | 44 | ||||
-rw-r--r-- | src/core/ext/lb_policy/grpclb/load_balancer_api.c | 77 | ||||
-rw-r--r-- | src/core/ext/lb_policy/grpclb/load_balancer_api.h | 16 |
5 files changed, 1001 insertions, 3 deletions
diff --git a/src/core/ext/client_config/lb_policy.h b/src/core/ext/client_config/lb_policy.h index 3cfd041d3a..a2f5446fc6 100644 --- a/src/core/ext/client_config/lb_policy.h +++ b/src/core/ext/client_config/lb_policy.h @@ -73,7 +73,7 @@ struct grpc_lb_policy_vtable { void (*ping_one)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_closure *closure); - /** try to enter a READY connectivity state */ + /** Try to enter a READY connectivity state */ void (*exit_idle)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); /** check the current connectivity of the lb_policy */ @@ -82,7 +82,9 @@ struct grpc_lb_policy_vtable { grpc_error **connectivity_error); /** call notify when the connectivity state of a channel changes from *state. - Updates *state with the new state of the policy */ + Updates *state with the new state of the policy. Calling with a NULL \a + state cancels the subscription. + */ void (*notify_on_state_change)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_connectivity_state *state, @@ -125,7 +127,7 @@ void grpc_lb_policy_init(grpc_lb_policy *policy, /** Given initial metadata in \a initial_metadata, find an appropriate target for this rpc, and 'return' it by calling \a on_complete after setting \a target. - Picking can be asynchronous. Any IO should be done under \a pollset. */ + Picking can be asynchronous. Any IO should be done under \a pollent. */ int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_polling_entity *pollent, grpc_metadata_batch *initial_metadata, @@ -147,8 +149,11 @@ void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx, uint32_t initial_metadata_flags_mask, uint32_t initial_metadata_flags_eq); +/** Try to enter a READY connectivity state */ void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); +/* Call notify when the connectivity state of a channel changes from \a *state. + * Updates \a *state with the new state of the policy */ void grpc_lb_policy_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_connectivity_state *state, diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c new file mode 100644 index 0000000000..ede6b3e071 --- /dev/null +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -0,0 +1,856 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <string.h> + +#include <grpc/byte_buffer_reader.h> +#include <grpc/grpc.h> +#include <grpc/support/alloc.h> +#include <grpc/support/host_port.h> +#include <grpc/support/string_util.h> + +#include "src/core/ext/client_config/client_channel_factory.h" +#include "src/core/ext/client_config/lb_policy_registry.h" +#include "src/core/ext/client_config/parse_address.h" +#include "src/core/ext/lb_policy/grpclb/grpclb.h" +#include "src/core/ext/lb_policy/grpclb/load_balancer_api.h" +#include "src/core/lib/iomgr/sockaddr_utils.h" +#include "src/core/lib/support/string.h" +#include "src/core/lib/surface/call.h" +#include "src/core/lib/surface/channel.h" + +int grpc_lb_glb_trace = 0; + +typedef struct wrapped_rr_closure_arg { + grpc_closure *wrapped_closure; + grpc_lb_policy *rr_policy; +} wrapped_rr_closure_arg; + +/* The \a on_complete closure passed as part of the pick requires keeping a + * reference to its associated round robin instance. We wrap this closure in + * order to unref the round robin instance upon its invocation */ +static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + wrapped_rr_closure_arg *wc = arg; + + if (wc->rr_policy != NULL) { + if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")", + (intptr_t)wc->rr_policy); + } + GRPC_LB_POLICY_UNREF(exec_ctx, wc->rr_policy, "wrapped_rr_closure"); + } + + if (wc->wrapped_closure != NULL) { + grpc_exec_ctx_sched(exec_ctx, wc->wrapped_closure, error, NULL); + } + gpr_free(wc); +} + +typedef struct pending_pick { + struct pending_pick *next; + grpc_polling_entity *pollent; + grpc_metadata_batch *initial_metadata; + uint32_t initial_metadata_flags; + grpc_connected_subchannel **target; + grpc_closure *wrapped_on_complete; + wrapped_rr_closure_arg *wrapped_on_complete_arg; +} pending_pick; + +typedef struct pending_ping { + struct pending_ping *next; + grpc_closure *wrapped_notify; + wrapped_rr_closure_arg *wrapped_notify_arg; +} pending_ping; + +typedef struct glb_lb_policy glb_lb_policy; + +#define MAX_LBCD_OPS_LEN 6 +typedef struct lb_client_data { + gpr_mu mu; + grpc_closure md_sent; + grpc_closure md_rcvd; + grpc_closure req_sent; + grpc_closure res_rcvd; + grpc_closure close_sent; + grpc_closure srv_status_rcvd; + + grpc_call *c; + gpr_timespec deadline; + + grpc_metadata_array initial_metadata_recv; + grpc_metadata_array trailing_metadata_recv; + + grpc_byte_buffer *request_payload; + grpc_byte_buffer *response_payload; + + grpc_status_code status; + char *status_details; + size_t status_details_capacity; + + glb_lb_policy *p; +} lb_client_data; + +/* Keeps track and reacts to changes in connectivity of the RR instance */ +typedef struct rr_connectivity_data { + grpc_closure on_change; + grpc_connectivity_state state; + glb_lb_policy *p; +} rr_connectivity_data; + +struct glb_lb_policy { + /** base policy: must be first */ + grpc_lb_policy base; + + /** mutex protecting remaining members */ + gpr_mu mu; + + grpc_client_channel_factory *cc_factory; + + /** for communicating with the LB server */ + grpc_channel *lb_server_channel; + + /** the RR policy to use of the backend servers returned by the LB server */ + grpc_lb_policy *rr_policy; + + bool started_picking; + + /** our connectivity state tracker */ + grpc_connectivity_state_tracker state_tracker; + + grpc_grpclb_serverlist *serverlist; + + /** list of picks that are waiting on connectivity */ + pending_pick *pending_picks; + + /** list of pings that are waiting on connectivity */ + pending_ping *pending_pings; + + /** data associated with the communication with the LB server */ + lb_client_data *lbcd; + + /** for tracking of the RR connectivity */ + rr_connectivity_data *rr_connectivity; +}; + +static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *p, + grpc_error *error); +static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + rr_connectivity_data *rrcd = arg; + glb_lb_policy *p = rrcd->p; + if (rrcd->state == GRPC_CHANNEL_SHUTDOWN) { + if (p->serverlist != NULL) { + /* a RR policy is shutting down but there's a serverlist available -> + * perform a handover */ + rr_handover(exec_ctx, p, error); + } else { + /* shutting down and no new serverlist available. bail out. */ + gpr_free(rrcd); + } + } else { + if (error == GRPC_ERROR_NONE) { + /* not shutting down. mimic the RR's policy state */ + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, rrcd->state, + error, "rr_connectivity_changed"); + /* resubscribe */ + grpc_lb_policy_notify_on_state_change(exec_ctx, p->rr_policy, + &rrcd->state, &rrcd->on_change); + } else { /* error */ + gpr_free(rrcd); + } + } + GRPC_ERROR_UNREF(error); +} + +static void add_pending_pick(pending_pick **root, grpc_polling_entity *pollent, + grpc_metadata_batch *initial_metadata, + uint32_t initial_metadata_flags, + grpc_connected_subchannel **target, + grpc_closure *on_complete) { + pending_pick *pp = gpr_malloc(sizeof(*pp)); + memset(pp, 0, sizeof(pending_pick)); + pp->wrapped_on_complete_arg = gpr_malloc(sizeof(wrapped_rr_closure_arg)); + memset(pp->wrapped_on_complete_arg, 0, sizeof(wrapped_rr_closure_arg)); + pp->next = *root; + pp->pollent = pollent; + pp->target = target; + pp->initial_metadata = initial_metadata; + pp->initial_metadata_flags = initial_metadata_flags; + pp->wrapped_on_complete = + grpc_closure_create(wrapped_rr_closure, pp->wrapped_on_complete_arg); + pp->wrapped_on_complete_arg->wrapped_closure = on_complete; + *root = pp; +} + +static void add_pending_ping(pending_ping **root, grpc_closure *notify) { + pending_ping *pping = gpr_malloc(sizeof(*pping)); + memset(pping, 0, sizeof(pending_ping)); + pping->wrapped_notify_arg = gpr_malloc(sizeof(wrapped_rr_closure_arg)); + memset(pping->wrapped_notify_arg, 0, sizeof(wrapped_rr_closure_arg)); + pping->next = *root; + pping->wrapped_notify = + grpc_closure_create(wrapped_rr_closure, pping->wrapped_notify_arg); + pping->wrapped_notify_arg->wrapped_closure = notify; + *root = pping; +} + +static void lb_client_data_destroy(lb_client_data *lbcd); + +static void md_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + lb_client_data *lbcd = arg; + GPR_ASSERT(lbcd->c); + grpc_call_error call_error; + grpc_op ops[1]; + memset(ops, 0, sizeof(ops)); + grpc_op *op = ops; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata = &lbcd->initial_metadata_recv; + op->flags = 0; + op->reserved = NULL; + op++; + call_error = grpc_call_start_batch_and_execute( + exec_ctx, lbcd->c, ops, (size_t)(op - ops), &lbcd->md_rcvd); + GPR_ASSERT(GRPC_CALL_OK == call_error); +} + +static void md_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + lb_client_data *lbcd = arg; + GPR_ASSERT(lbcd->c); + grpc_call_error call_error; + grpc_op ops[1]; + memset(ops, 0, sizeof(ops)); + grpc_op *op = ops; + + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message = lbcd->request_payload; + op->flags = 0; + op->reserved = NULL; + op++; + call_error = grpc_call_start_batch_and_execute( + exec_ctx, lbcd->c, ops, (size_t)(op - ops), &lbcd->req_sent); + GPR_ASSERT(GRPC_CALL_OK == call_error); +} + +static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + lb_client_data *lbcd = arg; + grpc_call_error call_error; + + grpc_op ops[1]; + memset(ops, 0, sizeof(ops)); + grpc_op *op = ops; + + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message = &lbcd->response_payload; + op->flags = 0; + op->reserved = NULL; + op++; + call_error = grpc_call_start_batch_and_execute( + exec_ctx, lbcd->c, ops, (size_t)(op - ops), &lbcd->res_rcvd); + GPR_ASSERT(GRPC_CALL_OK == call_error); +} + +static void res_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + /* look inside lbcd->response_payload, ideally to send it back as the + * serverlist. */ + lb_client_data *lbcd = arg; + grpc_op ops[2]; + memset(ops, 0, sizeof(ops)); + grpc_op *op = ops; + if (lbcd->response_payload) { + grpc_byte_buffer_reader bbr; + grpc_byte_buffer_reader_init(&bbr, lbcd->response_payload); + gpr_slice response_slice = grpc_byte_buffer_reader_readall(&bbr); + grpc_byte_buffer_destroy(lbcd->response_payload); + grpc_grpclb_serverlist *serverlist = + grpc_grpclb_response_parse_serverlist(response_slice); + if (serverlist) { + gpr_slice_unref(response_slice); + if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, "Serverlist with %zu servers received", + serverlist->num_servers); + } + /* update serverlist */ + if (serverlist->num_servers > 0) { + if (grpc_grpclb_serverlist_equals(lbcd->p->serverlist, serverlist)) { + gpr_log(GPR_INFO, + "Incoming server list identical to current, ignoring."); + } else { + if (lbcd->p->serverlist != NULL) { + grpc_grpclb_destroy_serverlist(lbcd->p->serverlist); + } + lbcd->p->serverlist = serverlist; + } + } + if (lbcd->p->rr_policy == NULL) { + /* initial "handover", in this case from a null RR policy, meaning it'll + * just create the first one */ + rr_handover(exec_ctx, lbcd->p, error); + } else { + /* unref the RR policy, eventually leading to its substitution with a + * new one constructed from the received serverlist (see + * rr_connectivity_changed) */ + GRPC_LB_POLICY_UNREF(exec_ctx, lbcd->p->rr_policy, + "serverlist_received"); + } + + /* listen for a potential serverlist update */ + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message = &lbcd->response_payload; + op->flags = 0; + op->reserved = NULL; + op++; + const grpc_call_error call_error = grpc_call_start_batch_and_execute( + exec_ctx, lbcd->c, ops, (size_t)(op - ops), + &lbcd->res_rcvd); /* loop */ + GPR_ASSERT(GRPC_CALL_OK == call_error); + return; + } else { + gpr_log(GPR_ERROR, "Invalid LB response received: '%s'", + gpr_dump_slice(response_slice, GPR_DUMP_ASCII)); + gpr_slice_unref(response_slice); + + /* Disconnect from server returning invalid response. */ + op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = 0; + op->reserved = NULL; + op++; + grpc_call_error call_error = grpc_call_start_batch_and_execute( + exec_ctx, lbcd->c, ops, (size_t)(op - ops), &lbcd->close_sent); + GPR_ASSERT(GRPC_CALL_OK == call_error); + } + } + /* empty payload: call cancelled by server. Cleanups happening in + * srv_status_rcvd_cb */ +} +static void close_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, + "Close from LB client sent. Waiting from server status now"); + } +} +static void srv_status_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + lb_client_data *lbcd = arg; + glb_lb_policy *p = lbcd->p; + if (grpc_lb_glb_trace) { + gpr_log( + GPR_INFO, + "status from lb server received. Status = %d, Details = '%s', Capaticy " + "= %zu", + lbcd->status, lbcd->status_details, lbcd->status_details_capacity); + } + + grpc_call_destroy(lbcd->c); + grpc_channel_destroy(lbcd->p->lb_server_channel); + lbcd->p->lb_server_channel = NULL; + lb_client_data_destroy(lbcd); + p->lbcd = NULL; +} + +static lb_client_data *lb_client_data_create(glb_lb_policy *p) { + lb_client_data *lbcd = gpr_malloc(sizeof(lb_client_data)); + memset(lbcd, 0, sizeof(lb_client_data)); + + gpr_mu_init(&lbcd->mu); + grpc_closure_init(&lbcd->md_sent, md_sent_cb, lbcd); + + grpc_closure_init(&lbcd->md_rcvd, md_recv_cb, lbcd); + grpc_closure_init(&lbcd->req_sent, req_sent_cb, lbcd); + grpc_closure_init(&lbcd->res_rcvd, res_rcvd_cb, lbcd); + grpc_closure_init(&lbcd->close_sent, close_sent_cb, lbcd); + grpc_closure_init(&lbcd->srv_status_rcvd, srv_status_rcvd_cb, lbcd); + + /* TODO(dgq): get the deadline from the client/user instead of fabricating + * one + * here. Make it a policy arg? */ + lbcd->deadline = gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), + gpr_time_from_seconds(3, GPR_TIMESPAN)); + + lbcd->c = grpc_channel_create_pollset_set_call( + p->lb_server_channel, NULL, GRPC_PROPAGATE_DEFAULTS, + p->base.interested_parties, "/BalanceLoad", + NULL, /* FIXME(dgq): which "host" value to use? */ + lbcd->deadline, NULL); + + grpc_metadata_array_init(&lbcd->initial_metadata_recv); + grpc_metadata_array_init(&lbcd->trailing_metadata_recv); + + grpc_grpclb_request *request = grpc_grpclb_request_create( + "load.balanced.service.name"); /* FIXME(dgq): get the name of the load + balanced service from above. */ + gpr_slice request_payload_slice = grpc_grpclb_request_encode(request); + lbcd->request_payload = + grpc_raw_byte_buffer_create(&request_payload_slice, 1); + gpr_slice_unref(request_payload_slice); + grpc_grpclb_request_destroy(request); + + lbcd->status_details = NULL; + lbcd->status_details_capacity = 0; + lbcd->p = p; + return lbcd; +} + +static void lb_client_data_destroy(lb_client_data *lbcd) { + grpc_metadata_array_destroy(&lbcd->initial_metadata_recv); + grpc_metadata_array_destroy(&lbcd->trailing_metadata_recv); + + grpc_byte_buffer_destroy(lbcd->request_payload); + + gpr_free(lbcd->status_details); + gpr_mu_destroy(&lbcd->mu); + gpr_free(lbcd); +} + +static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { + glb_lb_policy *p = (glb_lb_policy *)pol; + GPR_ASSERT(p->pending_picks == NULL); + GPR_ASSERT(p->pending_pings == NULL); + grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); + if (p->serverlist != NULL) { + grpc_grpclb_destroy_serverlist(p->serverlist); + } + gpr_mu_destroy(&p->mu); + gpr_free(p); +} + +static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { + glb_lb_policy *p = (glb_lb_policy *)pol; + gpr_mu_lock(&p->mu); + + pending_pick *pp = p->pending_picks; + p->pending_picks = NULL; + pending_ping *pping = p->pending_pings; + p->pending_pings = NULL; + gpr_mu_unlock(&p->mu); + + while (pp != NULL) { + pending_pick *next = pp->next; + *pp->target = NULL; + grpc_exec_ctx_sched(exec_ctx, pp->wrapped_on_complete, GRPC_ERROR_NONE, + NULL); + gpr_free(pp); + pp = next; + } + + while (pping != NULL) { + pending_ping *next = pping->next; + grpc_exec_ctx_sched(exec_ctx, pping->wrapped_notify, GRPC_ERROR_NONE, NULL); + pping = next; + } + + if (p->rr_policy) { + /* unsubscribe */ + grpc_lb_policy_notify_on_state_change(exec_ctx, p->rr_policy, NULL, + &p->rr_connectivity->on_change); + GRPC_LB_POLICY_UNREF(exec_ctx, p->rr_policy, "glb_shutdown"); + } + + grpc_connectivity_state_set( + exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, + GRPC_ERROR_CREATE("Channel Shutdown"), "glb_shutdown"); +} + +static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + grpc_connected_subchannel **target) { + glb_lb_policy *p = (glb_lb_policy *)pol; + gpr_mu_lock(&p->mu); + pending_pick *pp = p->pending_picks; + p->pending_picks = NULL; + while (pp != NULL) { + pending_pick *next = pp->next; + if (pp->target == target) { + grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent, + p->base.interested_parties); + *target = NULL; + grpc_exec_ctx_sched(exec_ctx, pp->wrapped_on_complete, + GRPC_ERROR_CANCELLED, NULL); + gpr_free(pp); + } else { + pp->next = p->pending_picks; + p->pending_picks = pp; + } + pp = next; + } + gpr_mu_unlock(&p->mu); +} + +static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + uint32_t initial_metadata_flags_mask, + uint32_t initial_metadata_flags_eq) { + glb_lb_policy *p = (glb_lb_policy *)pol; + gpr_mu_lock(&p->mu); + if (p->lbcd != NULL) { + /* cancel the call to the load balancer service, if any */ + grpc_call_cancel(p->lbcd->c, NULL); + } + pending_pick *pp = p->pending_picks; + p->pending_picks = NULL; + while (pp != NULL) { + pending_pick *next = pp->next; + if ((pp->initial_metadata_flags & initial_metadata_flags_mask) == + initial_metadata_flags_eq) { + grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent, + p->base.interested_parties); + grpc_exec_ctx_sched(exec_ctx, pp->wrapped_on_complete, + GRPC_ERROR_CANCELLED, NULL); + gpr_free(pp); + } else { + pp->next = p->pending_picks; + p->pending_picks = pp; + } + pp = next; + } + gpr_mu_unlock(&p->mu); +} + +static void query_for_backends(grpc_exec_ctx *exec_ctx, glb_lb_policy *p) { + GPR_ASSERT(p->lb_server_channel != NULL); + + p->lbcd = lb_client_data_create(p); + grpc_call_error call_error; + grpc_op ops[1]; + memset(ops, 0, sizeof(ops)); + grpc_op *op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op->flags = 0; + op->reserved = NULL; + op++; + call_error = grpc_call_start_batch_and_execute( + exec_ctx, p->lbcd->c, ops, (size_t)(op - ops), &p->lbcd->md_sent); + GPR_ASSERT(GRPC_CALL_OK == call_error); + + op = ops; + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = + &p->lbcd->trailing_metadata_recv; + op->data.recv_status_on_client.status = &p->lbcd->status; + op->data.recv_status_on_client.status_details = &p->lbcd->status_details; + op->data.recv_status_on_client.status_details_capacity = + &p->lbcd->status_details_capacity; + op->flags = 0; + op->reserved = NULL; + op++; + call_error = grpc_call_start_batch_and_execute( + exec_ctx, p->lbcd->c, ops, (size_t)(op - ops), &p->lbcd->srv_status_rcvd); + GPR_ASSERT(GRPC_CALL_OK == call_error); +} + +static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx, + const grpc_grpclb_serverlist *serverlist, + glb_lb_policy *p) { + /* TODO(dgq): support mixed ip version */ + GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0); + char **host_ports = gpr_malloc(sizeof(char *) * serverlist->num_servers); + for (size_t i = 0; i < serverlist->num_servers; ++i) { + gpr_join_host_port(&host_ports[i], serverlist->servers[i]->ip_address, + serverlist->servers[i]->port); + } + + size_t uri_path_len; + char *concat_ipports = gpr_strjoin_sep( + (const char **)host_ports, serverlist->num_servers, ",", &uri_path_len); + + grpc_lb_policy_args args; + args.client_channel_factory = p->cc_factory; + args.addresses = gpr_malloc(sizeof(grpc_resolved_addresses)); + args.addresses->naddrs = serverlist->num_servers; + args.addresses->addrs = + gpr_malloc(sizeof(grpc_resolved_address) * args.addresses->naddrs); + size_t out_addrs_idx = 0; + for (size_t i = 0; i < serverlist->num_servers; ++i) { + grpc_uri uri; + struct sockaddr_storage sa; + size_t sa_len; + uri.path = host_ports[i]; + if (parse_ipv4(&uri, &sa, &sa_len)) { /* TODO(dgq): add support for ipv6 */ + memcpy(args.addresses->addrs[out_addrs_idx].addr, &sa, sa_len); + args.addresses->addrs[out_addrs_idx].len = sa_len; + ++out_addrs_idx; + } else { + gpr_log(GPR_ERROR, "Invalid LB service address '%s', ignoring.", + host_ports[i]); + } + } + + grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args); + + gpr_free(concat_ipports); + for (size_t i = 0; i < serverlist->num_servers; i++) { + gpr_free(host_ports[i]); + } + gpr_free(host_ports); + + gpr_free(args.addresses->addrs); + gpr_free(args.addresses); + + return rr; +} + +static void start_picking(grpc_exec_ctx *exec_ctx, glb_lb_policy *p) { + p->started_picking = true; + query_for_backends(exec_ctx, p); +} + +static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *p, + grpc_error *error) { + GRPC_ERROR_REF(error); + p->rr_policy = create_rr(exec_ctx, p->serverlist, p); + + if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, "Created RR policy (0x%" PRIxPTR ")", + (intptr_t)p->rr_policy); + } + GPR_ASSERT(p->rr_policy != NULL); + p->rr_connectivity->state = + grpc_lb_policy_check_connectivity(exec_ctx, p->rr_policy, &error); + grpc_lb_policy_notify_on_state_change(exec_ctx, p->rr_policy, + &p->rr_connectivity->state, + &p->rr_connectivity->on_change); + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, + p->rr_connectivity->state, error, "rr_handover"); + grpc_lb_policy_exit_idle(exec_ctx, p->rr_policy); + + /* flush pending ops */ + pending_pick *pp; + while ((pp = p->pending_picks)) { + p->pending_picks = pp->next; + GRPC_LB_POLICY_REF(p->rr_policy, "rr_handover_pending_pick"); + pp->wrapped_on_complete_arg->rr_policy = p->rr_policy; + if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "", + (intptr_t)p->rr_policy); + } + grpc_lb_policy_pick(exec_ctx, p->rr_policy, pp->pollent, + pp->initial_metadata, pp->initial_metadata_flags, + pp->target, pp->wrapped_on_complete); + gpr_free(pp); + } + + pending_ping *pping; + while ((pping = p->pending_pings)) { + p->pending_pings = pping->next; + GRPC_LB_POLICY_REF(p->rr_policy, "rr_handover_pending_ping"); + pping->wrapped_notify_arg->rr_policy = p->rr_policy; + if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, "Pending ping about to PING from 0x%" PRIxPTR "", + (intptr_t)p->rr_policy); + } + grpc_lb_policy_ping_one(exec_ctx, p->rr_policy, pping->wrapped_notify); + gpr_free(pping); + } + GRPC_ERROR_UNREF(error); +} + +static void glb_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { + glb_lb_policy *p = (glb_lb_policy *)pol; + gpr_mu_lock(&p->mu); + if (!p->started_picking) { + start_picking(exec_ctx, p); + } + gpr_mu_unlock(&p->mu); +} + +static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + grpc_polling_entity *pollent, + grpc_metadata_batch *initial_metadata, + uint32_t initial_metadata_flags, + grpc_connected_subchannel **target, + grpc_closure *on_complete) { + glb_lb_policy *p = (glb_lb_policy *)pol; + gpr_mu_lock(&p->mu); + int r; + + if (p->rr_policy != NULL) { + if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, "about to PICK from 0x%" PRIxPTR "", + (intptr_t)p->rr_policy); + } + GRPC_LB_POLICY_REF(p->rr_policy, "rr_pick"); + wrapped_rr_closure_arg *warg = gpr_malloc(sizeof(wrapped_rr_closure_arg)); + warg->rr_policy = p->rr_policy; + warg->wrapped_closure = on_complete; + grpc_closure *wrapped_on_complete = + grpc_closure_create(wrapped_rr_closure, warg); + r = grpc_lb_policy_pick(exec_ctx, p->rr_policy, pollent, initial_metadata, + initial_metadata_flags, target, + wrapped_on_complete); + if (r != 0) { + /* the call to grpc_lb_policy_pick has been sychronous. Invoke a neutered + * wrapped closure */ + warg->wrapped_closure = NULL; + grpc_exec_ctx_sched(exec_ctx, wrapped_on_complete, GRPC_ERROR_NONE, NULL); + } + } else { + grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent, + p->base.interested_parties); + add_pending_pick(&p->pending_picks, pollent, initial_metadata, + initial_metadata_flags, target, on_complete); + + if (!p->started_picking) { + start_picking(exec_ctx, p); + } + r = 0; + } + gpr_mu_unlock(&p->mu); + return r; +} + +static grpc_connectivity_state glb_check_connectivity( + grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + grpc_error **connectivity_error) { + glb_lb_policy *p = (glb_lb_policy *)pol; + grpc_connectivity_state st; + gpr_mu_lock(&p->mu); + st = grpc_connectivity_state_check(&p->state_tracker, connectivity_error); + gpr_mu_unlock(&p->mu); + return st; +} + +static void glb_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + grpc_closure *closure) { + glb_lb_policy *p = (glb_lb_policy *)pol; + gpr_mu_lock(&p->mu); + if (p->rr_policy) { + grpc_lb_policy_ping_one(exec_ctx, p->rr_policy, closure); + } else { + add_pending_ping(&p->pending_pings, closure); + if (!p->started_picking) { + start_picking(exec_ctx, p); + } + } + gpr_mu_unlock(&p->mu); +} + +static void glb_notify_on_state_change(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *pol, + grpc_connectivity_state *current, + grpc_closure *notify) { + glb_lb_policy *p = (glb_lb_policy *)pol; + gpr_mu_lock(&p->mu); + grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker, + current, notify); + + gpr_mu_unlock(&p->mu); +} + +static const grpc_lb_policy_vtable glb_lb_policy_vtable = { + glb_destroy, glb_shutdown, glb_pick, + glb_cancel_pick, glb_cancel_picks, glb_ping_one, + glb_exit_idle, glb_check_connectivity, glb_notify_on_state_change}; + +static void glb_factory_ref(grpc_lb_policy_factory *factory) {} + +static void glb_factory_unref(grpc_lb_policy_factory *factory) {} + +static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, + grpc_lb_policy_factory *factory, + grpc_lb_policy_args *args) { + glb_lb_policy *p = gpr_malloc(sizeof(*p)); + memset(p, 0, sizeof(*p)); + + /* all input addresses in args->addresses come from a resolver that claims + * they are LB services. + * + * Create a client channel over them to communicate with a LB service */ + p->cc_factory = args->client_channel_factory; + GPR_ASSERT(p->cc_factory != NULL); + if (args->addresses->naddrs == 0) { + return NULL; + } + + /* construct a target from the args->addresses, in the form + * ipvX://ip1:port1,ip2:port2,... + * TODO(dgq): support mixed ip version */ + char **addr_strs = gpr_malloc(sizeof(char *) * args->addresses->naddrs); + addr_strs[0] = + grpc_sockaddr_to_uri((const struct sockaddr *)&args->addresses->addrs[0]); + for (size_t i = 1; i < args->addresses->naddrs; i++) { + GPR_ASSERT(grpc_sockaddr_to_string( + &addr_strs[i], + (const struct sockaddr *)&args->addresses->addrs[i], + true) == 0); + } + size_t uri_path_len; + char *target_uri_str = gpr_strjoin_sep( + (const char **)addr_strs, args->addresses->naddrs, ",", &uri_path_len); + + /* will pick using pick_first */ + p->lb_server_channel = grpc_client_channel_factory_create_channel( + exec_ctx, p->cc_factory, target_uri_str, + GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, NULL); + + gpr_free(target_uri_str); + for (size_t i = 0; i < args->addresses->naddrs; i++) { + gpr_free(addr_strs[i]); + } + gpr_free(addr_strs); + + if (p->lb_server_channel == NULL) { + gpr_free(p); + return NULL; + } + + rr_connectivity_data *rr_connectivity = + gpr_malloc(sizeof(rr_connectivity_data)); + memset(rr_connectivity, 0, sizeof(rr_connectivity_data)); + grpc_closure_init(&rr_connectivity->on_change, rr_connectivity_changed, + rr_connectivity); + rr_connectivity->p = p; + p->rr_connectivity = rr_connectivity; + + grpc_lb_policy_init(&p->base, &glb_lb_policy_vtable); + gpr_mu_init(&p->mu); + grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, "grpclb"); + return &p->base; +} + +static const grpc_lb_policy_factory_vtable glb_factory_vtable = { + glb_factory_ref, glb_factory_unref, glb_create, "grpclb"}; + +static grpc_lb_policy_factory glb_lb_policy_factory = {&glb_factory_vtable}; + +grpc_lb_policy_factory *grpc_glb_lb_factory_create() { + return &glb_lb_policy_factory; +} + +/* Plugin registration */ + +void grpc_lb_policy_grpclb_init() { + grpc_register_lb_policy(grpc_glb_lb_factory_create()); + grpc_register_tracer("glb", &grpc_lb_glb_trace); +} + +void grpc_lb_policy_grpclb_shutdown() {} diff --git a/src/core/ext/lb_policy/grpclb/grpclb.h b/src/core/ext/lb_policy/grpclb/grpclb.h new file mode 100644 index 0000000000..83552b4fa0 --- /dev/null +++ b/src/core/ext/lb_policy/grpclb/grpclb.h @@ -0,0 +1,44 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_EXT_LB_POLICY_GRPCLB_GRPCLB_H +#define GRPC_CORE_EXT_LB_POLICY_GRPCLB_GRPCLB_H + +#include "src/core/ext/client_config/lb_policy_factory.h" + +/** Returns a load balancing factory for the glb policy, which tries to connect + * to a load balancing server to decide the next successfully connected + * subchannel to pick. */ +grpc_lb_policy_factory *grpc_glb_lb_factory_create(); + +#endif /* GRPC_CORE_EXT_LB_POLICY_GRPCLB_GRPCLB_H */ diff --git a/src/core/ext/lb_policy/grpclb/load_balancer_api.c b/src/core/ext/lb_policy/grpclb/load_balancer_api.c index 59b89997dd..6c77612e0d 100644 --- a/src/core/ext/lb_policy/grpclb/load_balancer_api.c +++ b/src/core/ext/lb_policy/grpclb/load_balancer_api.c @@ -56,6 +56,7 @@ static bool decode_serverlist(pb_istream_t *stream, const pb_field_t *field, dec_arg->num_servers++; } else { /* second pass */ grpc_grpclb_server *server = gpr_malloc(sizeof(grpc_grpclb_server)); + memset(server, 0, sizeof(grpc_grpclb_server)); GPR_ASSERT(dec_arg->num_servers > 0); if (dec_arg->i == 0) { /* first iteration of second pass */ dec_arg->servers = @@ -147,6 +148,7 @@ grpc_grpclb_serverlist *grpc_grpclb_response_parse_serverlist( } grpc_grpclb_serverlist *sl = gpr_malloc(sizeof(grpc_grpclb_serverlist)); + memset(sl, 0, sizeof(*sl)); sl->num_servers = arg.num_servers; sl->servers = arg.servers; if (res->server_list.has_expiration_interval) { @@ -167,6 +169,81 @@ void grpc_grpclb_destroy_serverlist(grpc_grpclb_serverlist *serverlist) { gpr_free(serverlist); } +grpc_grpclb_serverlist *grpc_grpclb_serverlist_copy( + const grpc_grpclb_serverlist *sl) { + grpc_grpclb_serverlist *copy = gpr_malloc(sizeof(grpc_grpclb_serverlist)); + memset(copy, 0, sizeof(grpc_grpclb_serverlist)); + copy->num_servers = sl->num_servers; + memcpy(©->expiration_interval, &sl->expiration_interval, + sizeof(grpc_grpclb_duration)); + copy->servers = gpr_malloc(sizeof(grpc_grpclb_server *) * sl->num_servers); + for (size_t i = 0; i < sl->num_servers; i++) { + copy->servers[i] = gpr_malloc(sizeof(grpc_grpclb_server)); + memcpy(copy->servers[i], sl->servers[i], sizeof(grpc_grpclb_server)); + } + return copy; +} + +bool grpc_grpclb_serverlist_equals(const grpc_grpclb_serverlist *lhs, + const grpc_grpclb_serverlist *rhs) { + if ((lhs == NULL) || (rhs == NULL)) { + return false; + } + if (lhs->num_servers != rhs->num_servers) { + return false; + } + if (grpc_grpclb_duration_compare(&lhs->expiration_interval, + &rhs->expiration_interval) != 0) { + return false; + } + for (size_t i = 0; i < lhs->num_servers; i++) { + if (!grpc_grpclb_server_equals(lhs->servers[i], rhs->servers[i])) { + return false; + } + } + return true; +} + +bool grpc_grpclb_server_equals(const grpc_grpclb_server *lhs, + const grpc_grpclb_server *rhs) { + return memcmp(lhs, rhs, sizeof(grpc_grpclb_server)) == 0; +} + +int grpc_grpclb_duration_compare(const grpc_grpclb_duration *lhs, + const grpc_grpclb_duration *rhs) { + GPR_ASSERT(lhs && rhs); + if (lhs->has_seconds && rhs->has_seconds) { + if (lhs->seconds < rhs->seconds) { + return -1; + } + if (lhs->seconds > rhs->seconds) { + return 1; + } + goto compare_nanos; + } + if (lhs->has_seconds + rhs->has_seconds == 0) { + goto compare_nanos; + } + // only lhs or rhs have seconds + return rhs->has_seconds ? 1 : -1; + +compare_nanos: + if (lhs->has_nanos && rhs->has_nanos) { + if (lhs->nanos < rhs->nanos) { + return -1; + } + if (lhs->nanos > rhs->nanos) { + return 1; + } + return 0; + } + if (lhs->has_nanos + rhs->has_nanos == 0) { + return 0; + } + // only lhs or rhs have nanos + return rhs->has_nanos ? 1 : -1; +} + void grpc_grpclb_response_destroy(grpc_grpclb_response *response) { gpr_free(response); } diff --git a/src/core/ext/lb_policy/grpclb/load_balancer_api.h b/src/core/ext/lb_policy/grpclb/load_balancer_api.h index 71b5616d0c..cc5c4e9343 100644 --- a/src/core/ext/lb_policy/grpclb/load_balancer_api.h +++ b/src/core/ext/lb_policy/grpclb/load_balancer_api.h @@ -68,6 +68,11 @@ void grpc_grpclb_request_destroy(grpc_grpclb_request *request); * grpc_grpclb_response */ grpc_grpclb_response *grpc_grpclb_response_parse(gpr_slice encoded_response); +/** Return a copy of \a sl. The caller is responsible for calling \a + * grpc_grpclb_destroy_serverlist on the returned copy. */ +grpc_grpclb_serverlist *grpc_grpclb_serverlist_copy( + const grpc_grpclb_serverlist *sl); + /** Destroy \a serverlist */ void grpc_grpclb_destroy_serverlist(grpc_grpclb_serverlist *serverlist); @@ -75,6 +80,17 @@ void grpc_grpclb_destroy_serverlist(grpc_grpclb_serverlist *serverlist); grpc_grpclb_serverlist *grpc_grpclb_response_parse_serverlist( gpr_slice encoded_response); +bool grpc_grpclb_serverlist_equals(const grpc_grpclb_serverlist *lhs, + const grpc_grpclb_serverlist *rhs); + +bool grpc_grpclb_server_equals(const grpc_grpclb_server *lhs, + const grpc_grpclb_server *rhs); + +/** Compare \a lhs against \a rhs and return 0 if \a lhs and \a rhs are equal, + * < 0 if \a lhs represents a duration shorter than \a rhs and > 0 otherwise */ +int grpc_grpclb_duration_compare(const grpc_grpclb_duration *lhs, + const grpc_grpclb_duration *rhs); + /** Destroy \a response */ void grpc_grpclb_response_destroy(grpc_grpclb_response *response); |