diff options
author | Craig Tiller <ctiller@google.com> | 2015-09-22 09:25:57 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2015-09-22 09:25:57 -0700 |
commit | 10ee2747a92a20c0bbe8cf3e2e759a121c6cb076 (patch) | |
tree | d295d20b29d36c273216368e93b5fcdb65892f76 /src/core | |
parent | d9fdaf204cbca6472f5588eba83f37c01bb6c8a4 (diff) | |
parent | 1ca05139a2d3b05518d73200b53ccb86161eeef3 (diff) |
Merge github.com:grpc/grpc into we-are-one
Diffstat (limited to 'src/core')
20 files changed, 1023 insertions, 244 deletions
diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c index d0adafc048..0aa708694d 100644 --- a/src/core/channel/http_client_filter.c +++ b/src/core/channel/http_client_filter.c @@ -70,10 +70,10 @@ typedef struct channel_data { typedef struct { grpc_call_element *elem; grpc_call_list *call_list; -} client_filter_args; +} client_recv_filter_args; -static grpc_mdelem *client_filter(void *user_data, grpc_mdelem *md) { - client_filter_args *a = user_data; +static grpc_mdelem *client_recv_filter(void *user_data, grpc_mdelem *md) { + client_recv_filter_args *a = user_data; grpc_call_element *elem = a->elem; channel_data *channeld = elem->channel_data; if (md == channeld->status) { @@ -81,6 +81,8 @@ static grpc_mdelem *client_filter(void *user_data, grpc_mdelem *md) { } else if (md->key == channeld->status->key) { grpc_call_element_send_cancel(elem, a->call_list); return NULL; + } else if (md->key == channeld->content_type->key) { + return NULL; } return md; } @@ -94,12 +96,12 @@ static void hc_on_recv(void *user_data, int success, grpc_stream_op *ops = calld->recv_ops->ops; for (i = 0; i < nops; i++) { grpc_stream_op *op = &ops[i]; - client_filter_args a; + client_recv_filter_args a; if (op->type != GRPC_OP_METADATA) continue; calld->got_initial_metadata = 1; a.elem = elem; a.call_list = call_list; - grpc_metadata_batch_filter(&op->data.metadata, client_filter, &a); + grpc_metadata_batch_filter(&op->data.metadata, client_recv_filter, &a); } calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success, call_list); } diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c index 70cc4f298a..448ce9a5b1 100644 --- a/src/core/channel/http_server_filter.c +++ b/src/core/channel/http_server_filter.c @@ -117,8 +117,7 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { return NULL; } else if (md->key == channeld->te_trailers->key || md->key == channeld->method_post->key || - md->key == channeld->http_scheme->key || - md->key == channeld->content_type->key) { + md->key == channeld->http_scheme->key) { gpr_log(GPR_ERROR, "Invalid %s: header: '%s'", grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value)); /* swallow it and error everything out. */ diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c new file mode 100644 index 0000000000..a346007aca --- /dev/null +++ b/src/core/client_config/lb_policies/round_robin.c @@ -0,0 +1,554 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/client_config/lb_policies/round_robin.h" + +#include <string.h> + +#include <grpc/support/alloc.h> +#include "src/core/transport/connectivity_state.h" + +int grpc_lb_round_robin_trace = 0; + +/** List of entities waiting for a pick. + * + * Once a pick is available, \a target is updated and \a on_complete called. */ +typedef struct pending_pick { + struct pending_pick *next; + grpc_pollset *pollset; + grpc_subchannel **target; + grpc_closure *on_complete; +} pending_pick; + +/** List of subchannels in a connectivity READY state */ +typedef struct ready_list { + grpc_subchannel *subchannel; + struct ready_list *next; + struct ready_list *prev; +} ready_list; + +typedef struct { + size_t subchannel_idx; /**< Index over p->subchannels */ + void *p; /**< round_robin_lb_policy instance */ +} connectivity_changed_cb_arg; + +typedef struct { + /** base policy: must be first */ + grpc_lb_policy base; + + /** all our subchannels */ + grpc_subchannel **subchannels; + size_t num_subchannels; + + /** Callbacks, one per subchannel being watched, to be called when their + * respective connectivity changes */ + grpc_closure *connectivity_changed_cbs; + connectivity_changed_cb_arg *cb_args; + + /** mutex protecting remaining members */ + gpr_mu mu; + /** have we started picking? */ + int started_picking; + /** are we shutting down? */ + int shutdown; + /** Connectivity state of the subchannels being watched */ + grpc_connectivity_state *subchannel_connectivity; + /** List of picks that are waiting on connectivity */ + pending_pick *pending_picks; + + /** our connectivity state tracker */ + grpc_connectivity_state_tracker state_tracker; + + /** (Dummy) root of the doubly linked list containing READY subchannels */ + ready_list ready_list; + /** Last pick from the ready list. */ + ready_list *ready_list_last_pick; + + /** Subchannel index to ready_list node. + * + * Kept in order to remove nodes from the ready list associated with a + * subchannel */ + ready_list **subchannel_index_to_readylist_node; +} round_robin_lb_policy; + +/** Returns the next subchannel from the connected list or NULL if the list is + * empty. + * + * Note that this function does *not* advance p->ready_list_last_pick. Use \a + * advance_last_picked_locked() for that. */ +static ready_list *peek_next_connected_locked(const round_robin_lb_policy *p) { + ready_list *selected; + selected = p->ready_list_last_pick->next; + + while (selected != NULL) { + if (selected == &p->ready_list) { + GPR_ASSERT(selected->subchannel == NULL); + /* skip dummy root */ + selected = selected->next; + } else { + GPR_ASSERT(selected->subchannel != NULL); + return selected; + } + } + return NULL; +} + +/** Advance the \a ready_list picking head. */ +static void advance_last_picked_locked(round_robin_lb_policy *p) { + if (p->ready_list_last_pick->next != NULL) { /* non-empty list */ + p->ready_list_last_pick = p->ready_list_last_pick->next; + if (p->ready_list_last_pick == &p->ready_list) { + /* skip dummy root */ + p->ready_list_last_pick = p->ready_list_last_pick->next; + } + } else { /* should be an empty list */ + GPR_ASSERT(p->ready_list_last_pick == &p->ready_list); + } + + if (grpc_lb_round_robin_trace) { + gpr_log(GPR_DEBUG, "[READYLIST] ADVANCED LAST PICK. NOW AT NODE %p (SC %p)", + p->ready_list_last_pick, p->ready_list_last_pick->subchannel); + } +} + +/** Prepends (relative to the root at p->ready_list) the connected subchannel \a + * csc to the list of ready subchannels. */ +static ready_list *add_connected_sc_locked(round_robin_lb_policy *p, + grpc_subchannel *csc) { + ready_list *new_elem = gpr_malloc(sizeof(ready_list)); + new_elem->subchannel = csc; + if (p->ready_list.prev == NULL) { + /* first element */ + new_elem->next = &p->ready_list; + new_elem->prev = &p->ready_list; + p->ready_list.next = new_elem; + p->ready_list.prev = new_elem; + } else { + new_elem->next = &p->ready_list; + new_elem->prev = p->ready_list.prev; + p->ready_list.prev->next = new_elem; + p->ready_list.prev = new_elem; + } + if (grpc_lb_round_robin_trace) { + gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (SC %p)", new_elem, csc); + } + return new_elem; +} + +/** Removes \a node from the list of connected subchannels */ +static void remove_disconnected_sc_locked(round_robin_lb_policy *p, + ready_list *node) { + if (node == NULL) { + return; + } + if (node == p->ready_list_last_pick) { + /* If removing the lastly picked node, reset the last pick pointer to the + * dummy root of the list */ + p->ready_list_last_pick = &p->ready_list; + } + + /* removing last item */ + if (node->next == &p->ready_list && node->prev == &p->ready_list) { + GPR_ASSERT(p->ready_list.next == node); + GPR_ASSERT(p->ready_list.prev == node); + p->ready_list.next = NULL; + p->ready_list.prev = NULL; + } else { + node->prev->next = node->next; + node->next->prev = node->prev; + } + + if (grpc_lb_round_robin_trace) { + gpr_log(GPR_DEBUG, "[READYLIST] REMOVED NODE %p (SC %p)", node, + node->subchannel); + } + + node->next = NULL; + node->prev = NULL; + node->subchannel = NULL; + + gpr_free(node); +} + +static void del_interested_parties_locked(round_robin_lb_policy *p, + const size_t subchannel_idx, + grpc_call_list *call_list) { + pending_pick *pp; + for (pp = p->pending_picks; pp; pp = pp->next) { + grpc_subchannel_del_interested_party(p->subchannels[subchannel_idx], + pp->pollset, call_list); + } +} + +void rr_destroy(grpc_lb_policy *pol, grpc_call_list *call_list) { + round_robin_lb_policy *p = (round_robin_lb_policy *)pol; + size_t i; + ready_list *elem; + for (i = 0; i < p->num_subchannels; i++) { + del_interested_parties_locked(p, i, call_list); + } + for (i = 0; i < p->num_subchannels; i++) { + GRPC_SUBCHANNEL_UNREF(p->subchannels[i], "round_robin", call_list); + } + gpr_free(p->connectivity_changed_cbs); + gpr_free(p->subchannel_connectivity); + + grpc_connectivity_state_destroy(&p->state_tracker, call_list); + gpr_free(p->subchannels); + gpr_mu_destroy(&p->mu); + + elem = p->ready_list.next; + while (elem != NULL && elem != &p->ready_list) { + ready_list *tmp; + tmp = elem->next; + elem->next = NULL; + elem->prev = NULL; + elem->subchannel = NULL; + gpr_free(elem); + elem = tmp; + } + gpr_free(p->subchannel_index_to_readylist_node); + gpr_free(p->cb_args); + gpr_free(p); +} + +void rr_shutdown(grpc_lb_policy *pol, grpc_call_list *call_list) { + size_t i; + round_robin_lb_policy *p = (round_robin_lb_policy *)pol; + pending_pick *pp; + gpr_mu_lock(&p->mu); + + for (i = 0; i < p->num_subchannels; i++) { + del_interested_parties_locked(p, i, call_list); + } + + p->shutdown = 1; + while ((pp = p->pending_picks)) { + p->pending_picks = pp->next; + *pp->target = NULL; + grpc_call_list_add(call_list, pp->on_complete, 0); + gpr_free(pp); + } + grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, + "shutdown", call_list); + gpr_mu_unlock(&p->mu); +} + +static void start_picking(round_robin_lb_policy *p, grpc_call_list *call_list) { + size_t i; + p->started_picking = 1; + + for (i = 0; i < p->num_subchannels; i++) { + p->subchannel_connectivity[i] = GRPC_CHANNEL_IDLE; + grpc_subchannel_notify_on_state_change( + p->subchannels[i], &p->subchannel_connectivity[i], + &p->connectivity_changed_cbs[i], call_list); + GRPC_LB_POLICY_REF(&p->base, "round_robin_connectivity"); + } +} + +void rr_exit_idle(grpc_lb_policy *pol, grpc_call_list *call_list) { + round_robin_lb_policy *p = (round_robin_lb_policy *)pol; + gpr_mu_lock(&p->mu); + if (!p->started_picking) { + start_picking(p, call_list); + } + gpr_mu_unlock(&p->mu); +} + +void rr_pick(grpc_lb_policy *pol, grpc_pollset *pollset, + grpc_metadata_batch *initial_metadata, grpc_subchannel **target, + grpc_closure *on_complete, grpc_call_list *call_list) { + size_t i; + round_robin_lb_policy *p = (round_robin_lb_policy *)pol; + pending_pick *pp; + ready_list *selected; + gpr_mu_lock(&p->mu); + if ((selected = peek_next_connected_locked(p))) { + gpr_mu_unlock(&p->mu); + *target = selected->subchannel; + if (grpc_lb_round_robin_trace) { + gpr_log(GPR_DEBUG, "[RR PICK] TARGET <-- SUBCHANNEL %p (NODE %p)", + selected->subchannel, selected); + } + /* only advance the last picked pointer if the selection was used */ + advance_last_picked_locked(p); + on_complete->cb(on_complete->cb_arg, 1, call_list); + } else { + if (!p->started_picking) { + start_picking(p, call_list); + } + for (i = 0; i < p->num_subchannels; i++) { + grpc_subchannel_add_interested_party(p->subchannels[i], pollset, + call_list); + } + pp = gpr_malloc(sizeof(*pp)); + pp->next = p->pending_picks; + pp->pollset = pollset; + pp->target = target; + pp->on_complete = on_complete; + p->pending_picks = pp; + gpr_mu_unlock(&p->mu); + } +} + +static void rr_connectivity_changed(void *arg, int iomgr_success, + grpc_call_list *call_list) { + connectivity_changed_cb_arg *cb_arg = arg; + round_robin_lb_policy *p = cb_arg->p; + /* index over p->subchannels of this cb's subchannel */ + const size_t this_idx = cb_arg->subchannel_idx; + pending_pick *pp; + ready_list *selected; + + int unref = 0; + + /* connectivity state of this cb's subchannel */ + grpc_connectivity_state *this_connectivity; + + gpr_mu_lock(&p->mu); + + this_connectivity = &p->subchannel_connectivity[this_idx]; + + if (p->shutdown) { + unref = 1; + } else { + switch (*this_connectivity) { + case GRPC_CHANNEL_READY: + grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY, + "connecting_ready", call_list); + /* add the newly connected subchannel to the list of connected ones. + * Note that it goes to the "end of the line". */ + p->subchannel_index_to_readylist_node[this_idx] = + add_connected_sc_locked(p, p->subchannels[this_idx]); + /* at this point we know there's at least one suitable subchannel. Go + * ahead and pick one and notify the pending suitors in + * p->pending_picks. This preemtively replicates rr_pick()'s actions. */ + selected = peek_next_connected_locked(p); + if (p->pending_picks != NULL) { + /* if the selected subchannel is going to be used for the pending + * picks, update the last picked pointer */ + advance_last_picked_locked(p); + } + while ((pp = p->pending_picks)) { + p->pending_picks = pp->next; + *pp->target = selected->subchannel; + if (grpc_lb_round_robin_trace) { + gpr_log(GPR_DEBUG, + "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", + selected->subchannel, selected); + } + grpc_subchannel_del_interested_party(selected->subchannel, + pp->pollset, call_list); + grpc_call_list_add(call_list, pp->on_complete, 1); + gpr_free(pp); + } + grpc_subchannel_notify_on_state_change( + p->subchannels[this_idx], this_connectivity, + &p->connectivity_changed_cbs[this_idx], call_list); + break; + case GRPC_CHANNEL_CONNECTING: + case GRPC_CHANNEL_IDLE: + grpc_connectivity_state_set(&p->state_tracker, *this_connectivity, + "connecting_changed", call_list); + grpc_subchannel_notify_on_state_change( + p->subchannels[this_idx], this_connectivity, + &p->connectivity_changed_cbs[this_idx], call_list); + break; + case GRPC_CHANNEL_TRANSIENT_FAILURE: + del_interested_parties_locked(p, this_idx, call_list); + /* renew state notification */ + grpc_subchannel_notify_on_state_change( + p->subchannels[this_idx], this_connectivity, + &p->connectivity_changed_cbs[this_idx], call_list); + + /* remove from ready list if still present */ + if (p->subchannel_index_to_readylist_node[this_idx] != NULL) { + remove_disconnected_sc_locked( + p, p->subchannel_index_to_readylist_node[this_idx]); + p->subchannel_index_to_readylist_node[this_idx] = NULL; + } + grpc_connectivity_state_set(&p->state_tracker, + GRPC_CHANNEL_TRANSIENT_FAILURE, + "connecting_transient_failure", call_list); + break; + case GRPC_CHANNEL_FATAL_FAILURE: + del_interested_parties_locked(p, this_idx, call_list); + if (p->subchannel_index_to_readylist_node[this_idx] != NULL) { + remove_disconnected_sc_locked( + p, p->subchannel_index_to_readylist_node[this_idx]); + p->subchannel_index_to_readylist_node[this_idx] = NULL; + } + + GPR_SWAP(grpc_subchannel *, p->subchannels[this_idx], + p->subchannels[p->num_subchannels - 1]); + p->num_subchannels--; + GRPC_SUBCHANNEL_UNREF(p->subchannels[p->num_subchannels], "round_robin", + call_list); + + if (p->num_subchannels == 0) { + grpc_connectivity_state_set(&p->state_tracker, + GRPC_CHANNEL_FATAL_FAILURE, + "no_more_channels", call_list); + while ((pp = p->pending_picks)) { + p->pending_picks = pp->next; + *pp->target = NULL; + grpc_call_list_add(call_list, pp->on_complete, 1); + gpr_free(pp); + } + unref = 1; + } else { + grpc_connectivity_state_set(&p->state_tracker, + GRPC_CHANNEL_TRANSIENT_FAILURE, + "subchannel_failed", call_list); + } + } /* switch */ + } /* !unref */ + + gpr_mu_unlock(&p->mu); + + if (unref) { + GRPC_LB_POLICY_UNREF(&p->base, "round_robin_connectivity", call_list); + } +} + +static void rr_broadcast(grpc_lb_policy *pol, grpc_transport_op *op, + grpc_call_list *call_list) { + round_robin_lb_policy *p = (round_robin_lb_policy *)pol; + size_t i; + size_t n; + grpc_subchannel **subchannels; + + gpr_mu_lock(&p->mu); + n = p->num_subchannels; + subchannels = gpr_malloc(n * sizeof(*subchannels)); + for (i = 0; i < n; i++) { + subchannels[i] = p->subchannels[i]; + GRPC_SUBCHANNEL_REF(subchannels[i], "rr_broadcast"); + } + gpr_mu_unlock(&p->mu); + + for (i = 0; i < n; i++) { + grpc_subchannel_process_transport_op(subchannels[i], op, call_list); + GRPC_SUBCHANNEL_UNREF(subchannels[i], "rr_broadcast", call_list); + } + gpr_free(subchannels); +} + +static grpc_connectivity_state rr_check_connectivity( + grpc_lb_policy *pol, grpc_call_list *call_list) { + round_robin_lb_policy *p = (round_robin_lb_policy *)pol; + grpc_connectivity_state st; + gpr_mu_lock(&p->mu); + st = grpc_connectivity_state_check(&p->state_tracker); + gpr_mu_unlock(&p->mu); + return st; +} + +static void rr_notify_on_state_change(grpc_lb_policy *pol, + grpc_connectivity_state *current, + grpc_closure *notify, + grpc_call_list *call_list) { + round_robin_lb_policy *p = (round_robin_lb_policy *)pol; + gpr_mu_lock(&p->mu); + grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current, + notify, call_list); + gpr_mu_unlock(&p->mu); +} + +static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = { + rr_destroy, + rr_shutdown, + rr_pick, + rr_exit_idle, + rr_broadcast, + rr_check_connectivity, + rr_notify_on_state_change}; + +static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {} + +static void round_robin_factory_unref(grpc_lb_policy_factory *factory) {} + +static grpc_lb_policy *create_round_robin(grpc_lb_policy_factory *factory, + grpc_lb_policy_args *args) { + size_t i; + round_robin_lb_policy *p = gpr_malloc(sizeof(*p)); + GPR_ASSERT(args->num_subchannels > 0); + memset(p, 0, sizeof(*p)); + grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable); + p->subchannels = + gpr_malloc(sizeof(grpc_subchannel *) * args->num_subchannels); + p->num_subchannels = args->num_subchannels; + grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, + "round_robin"); + memcpy(p->subchannels, args->subchannels, + sizeof(grpc_subchannel *) * args->num_subchannels); + + gpr_mu_init(&p->mu); + p->connectivity_changed_cbs = + gpr_malloc(sizeof(grpc_closure) * args->num_subchannels); + p->subchannel_connectivity = + gpr_malloc(sizeof(grpc_connectivity_state) * args->num_subchannels); + + p->cb_args = + gpr_malloc(sizeof(connectivity_changed_cb_arg) * args->num_subchannels); + for (i = 0; i < args->num_subchannels; i++) { + p->cb_args[i].subchannel_idx = i; + p->cb_args[i].p = p; + grpc_closure_init(&p->connectivity_changed_cbs[i], rr_connectivity_changed, + &p->cb_args[i]); + } + + /* The (dummy node) root of the ready list */ + p->ready_list.subchannel = NULL; + p->ready_list.prev = NULL; + p->ready_list.next = NULL; + p->ready_list_last_pick = &p->ready_list; + + p->subchannel_index_to_readylist_node = + gpr_malloc(sizeof(grpc_subchannel *) * args->num_subchannels); + memset(p->subchannel_index_to_readylist_node, 0, + sizeof(grpc_subchannel *) * args->num_subchannels); + return &p->base; +} + +static const grpc_lb_policy_factory_vtable round_robin_factory_vtable = { + round_robin_factory_ref, round_robin_factory_unref, create_round_robin, + "round_robin"}; + +static grpc_lb_policy_factory round_robin_lb_policy_factory = { + &round_robin_factory_vtable}; + +grpc_lb_policy_factory *grpc_round_robin_lb_factory_create() { + return &round_robin_lb_policy_factory; +} diff --git a/src/core/client_config/lb_policies/round_robin.h b/src/core/client_config/lb_policies/round_robin.h new file mode 100644 index 0000000000..cf1f69c85f --- /dev/null +++ b/src/core/client_config/lb_policies/round_robin.h @@ -0,0 +1,46 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_INTERNAL_CORE_CLIENT_CONFIG_ROUND_ROBIN_H +#define GRPC_INTERNAL_CORE_CLIENT_CONFIG_ROUND_ROBIN_H + +#include "src/core/client_config/lb_policy.h" + +extern int grpc_lb_round_robin_trace; + +#include "src/core/client_config/lb_policy_factory.h" + +/** Returns a load balancing factory for the round robin policy */ +grpc_lb_policy_factory *grpc_round_robin_lb_factory_create(); + +#endif diff --git a/src/core/client_config/resolvers/sockaddr_resolver.c b/src/core/client_config/resolvers/sockaddr_resolver.c index 2729036d4f..f0e2076244 100644 --- a/src/core/client_config/resolvers/sockaddr_resolver.c +++ b/src/core/client_config/resolvers/sockaddr_resolver.c @@ -284,7 +284,7 @@ done: static void do_nothing(void *ignored) {} static grpc_resolver *sockaddr_create( - grpc_resolver_args *args, const char *lb_policy_name, + grpc_resolver_args *args, const char *default_lb_policy_name, int parse(grpc_uri *uri, struct sockaddr_storage *dst, size_t *len)) { size_t i; int errors_found = 0; /* GPR_FALSE */ @@ -293,13 +293,34 @@ static grpc_resolver *sockaddr_create( gpr_slice_buffer path_parts; if (0 != strcmp(args->uri->authority, "")) { - gpr_log(GPR_ERROR, "authority based uri's not supported"); + gpr_log(GPR_ERROR, "authority based uri's not supported by the %s scheme", + args->uri->scheme); return NULL; } r = gpr_malloc(sizeof(sockaddr_resolver)); memset(r, 0, sizeof(*r)); + r->lb_policy_name = NULL; + if (0 != strcmp(args->uri->query, "")) { + gpr_slice query_slice; + gpr_slice_buffer query_parts; + + query_slice = + gpr_slice_new(args->uri->query, strlen(args->uri->query), do_nothing); + gpr_slice_buffer_init(&query_parts); + gpr_slice_split(query_slice, "=", &query_parts); + GPR_ASSERT(query_parts.count == 2); + if (0 == gpr_slice_str_cmp(query_parts.slices[0], "lb_policy")) { + r->lb_policy_name = gpr_dump_slice(query_parts.slices[1], GPR_DUMP_ASCII); + } + gpr_slice_buffer_destroy(&query_parts); + gpr_slice_unref(query_slice); + } + if (r->lb_policy_name == NULL) { + r->lb_policy_name = gpr_strdup(default_lb_policy_name); + } + path_slice = gpr_slice_new(args->uri->path, strlen(args->uri->path), do_nothing); gpr_slice_buffer_init(&path_parts); @@ -332,7 +353,6 @@ static grpc_resolver *sockaddr_create( grpc_resolver_init(&r->base, &sockaddr_resolver_vtable); r->subchannel_factory = args->subchannel_factory; grpc_subchannel_factory_ref(r->subchannel_factory); - r->lb_policy_name = gpr_strdup(lb_policy_name); return &r->base; } diff --git a/src/core/httpcli/httpcli_security_connector.c b/src/core/httpcli/httpcli_security_connector.c index a11fcd3e62..a340a90c4b 100644 --- a/src/core/httpcli/httpcli_security_connector.c +++ b/src/core/httpcli/httpcli_security_connector.c @@ -35,7 +35,7 @@ #include <string.h> -#include "src/core/security/secure_transport_setup.h" +#include "src/core/security/handshake.h" #include "src/core/support/string.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> @@ -58,20 +58,29 @@ static void httpcli_ssl_destroy(grpc_security_connector *sc) { gpr_free(sc); } -static grpc_security_status httpcli_ssl_create_handshaker( - grpc_security_connector *sc, tsi_handshaker **handshaker) { +static void httpcli_ssl_do_handshake(grpc_security_connector *sc, + grpc_endpoint *nonsecure_endpoint, + grpc_security_handshake_done_cb cb, + void *user_data, + grpc_call_list *call_list) { grpc_httpcli_ssl_channel_security_connector *c = (grpc_httpcli_ssl_channel_security_connector *)sc; tsi_result result = TSI_OK; - if (c->handshaker_factory == NULL) return GRPC_SECURITY_ERROR; + tsi_handshaker *handshaker; + if (c->handshaker_factory == NULL) { + cb(user_data, GRPC_SECURITY_ERROR, nonsecure_endpoint, NULL, call_list); + return; + } result = tsi_ssl_handshaker_factory_create_handshaker( - c->handshaker_factory, c->secure_peer_name, handshaker); + c->handshaker_factory, c->secure_peer_name, &handshaker); if (result != TSI_OK) { gpr_log(GPR_ERROR, "Handshaker creation failed with error %s.", tsi_result_to_string(result)); - return GRPC_SECURITY_ERROR; + cb(user_data, GRPC_SECURITY_ERROR, nonsecure_endpoint, NULL, call_list); + } else { + grpc_do_security_handshake(handshaker, sc, nonsecure_endpoint, cb, + user_data, call_list); } - return GRPC_SECURITY_OK; } static grpc_security_status httpcli_ssl_check_peer(grpc_security_connector *sc, @@ -94,7 +103,7 @@ static grpc_security_status httpcli_ssl_check_peer(grpc_security_connector *sc, } static grpc_security_connector_vtable httpcli_ssl_vtable = { - httpcli_ssl_destroy, httpcli_ssl_create_handshaker, httpcli_ssl_check_peer}; + httpcli_ssl_destroy, httpcli_ssl_do_handshake, httpcli_ssl_check_peer}; static grpc_security_status httpcli_ssl_channel_security_connector_create( const unsigned char *pem_root_certs, size_t pem_root_certs_size, @@ -172,8 +181,8 @@ static void ssl_handshake(void *arg, grpc_endpoint *tcp, const char *host, GPR_ASSERT(httpcli_ssl_channel_security_connector_create( pem_root_certs, pem_root_certs_size, host, &sc) == GRPC_SECURITY_OK); - grpc_setup_secure_transport(&sc->base, tcp, on_secure_transport_setup_done, c, - call_list); + grpc_security_connector_do_handshake( + &sc->base, tcp, on_secure_transport_setup_done, c, call_list); GRPC_SECURITY_CONNECTOR_UNREF(&sc->base, "httpcli"); } diff --git a/src/core/iomgr/tcp_server.h b/src/core/iomgr/tcp_server.h index 19e58bc294..b2aab234d3 100644 --- a/src/core/iomgr/tcp_server.h +++ b/src/core/iomgr/tcp_server.h @@ -39,7 +39,7 @@ /* Forward decl of grpc_tcp_server */ typedef struct grpc_tcp_server grpc_tcp_server; -/* New server callback: tcp is the newly connected tcp connection */ +/* Called for newly connected TCP connections. */ typedef void (*grpc_tcp_server_cb)(void *arg, grpc_endpoint *ep, grpc_call_list *call_list); @@ -48,8 +48,9 @@ grpc_tcp_server *grpc_tcp_server_create(void); /* Start listening to bound ports */ void grpc_tcp_server_start(grpc_tcp_server *server, grpc_pollset **pollsets, - size_t pollset_count, grpc_tcp_server_cb cb, - void *cb_arg, grpc_call_list *call_list); + size_t pollset_count, + grpc_tcp_server_cb on_accept_cb, void *cb_arg, + grpc_call_list *call_list); /* Add a port to the server, returning port number on success, or negative on failure. diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index 0c5e0053dd..635fdeb198 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -98,8 +98,9 @@ static void unlink_if_unix_domain_socket(const struct sockaddr_un *un) { /* the overall server */ struct grpc_tcp_server { - grpc_tcp_server_cb cb; - void *cb_arg; + /* Called whenever accept() succeeds on a server port. */ + grpc_tcp_server_cb on_accept_cb; + void *on_accept_cb_arg; gpr_mu mu; @@ -131,8 +132,8 @@ grpc_tcp_server *grpc_tcp_server_create(void) { s->active_ports = 0; s->destroyed_ports = 0; s->shutdown = 0; - s->cb = NULL; - s->cb_arg = NULL; + s->on_accept_cb = NULL; + s->on_accept_cb_arg = NULL; s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP); s->nports = 0; s->port_capacity = INIT_PORT_CAP; @@ -345,8 +346,8 @@ static void on_read(void *arg, int success, grpc_call_list *call_list) { for (i = 0; i < sp->server->pollset_count; i++) { grpc_pollset_add_fd(sp->server->pollsets[i], fdobj, call_list); } - sp->server->cb( - sp->server->cb_arg, + sp->server->on_accept_cb( + sp->server->on_accept_cb_arg, grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str), call_list); @@ -378,7 +379,7 @@ static int add_socket_to_server(grpc_tcp_server *s, int fd, grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1); gpr_asprintf(&name, "tcp-server-listener:%s", addr_str); gpr_mu_lock(&s->mu); - GPR_ASSERT(!s->cb && "must add ports before starting server"); + GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server"); /* append it to the list under a lock */ if (s->nports == s->port_capacity) { s->port_capacity *= 2; @@ -485,15 +486,16 @@ int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned index) { } void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset **pollsets, - size_t pollset_count, grpc_tcp_server_cb cb, - void *cb_arg, grpc_call_list *call_list) { + size_t pollset_count, + grpc_tcp_server_cb on_accept_cb, + void *on_accept_cb_arg, grpc_call_list *call_list) { size_t i, j; - GPR_ASSERT(cb); + GPR_ASSERT(on_accept_cb); gpr_mu_lock(&s->mu); - GPR_ASSERT(!s->cb); + GPR_ASSERT(!s->on_accept_cb); GPR_ASSERT(s->active_ports == 0); - s->cb = cb; - s->cb_arg = cb_arg; + s->on_accept_cb = on_accept_cb; + s->on_accept_cb_arg = on_accept_cb_arg; s->pollsets = pollsets; s->pollset_count = pollset_count; for (i = 0; i < s->nports; i++) { diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c index b513d854aa..63d90cd425 100644 --- a/src/core/iomgr/tcp_server_windows.c +++ b/src/core/iomgr/tcp_server_windows.c @@ -71,8 +71,9 @@ typedef struct server_port { /* the overall server */ struct grpc_tcp_server { - grpc_tcp_server_cb cb; - void *cb_arg; + /* Called whenever accept() succeeds on a server port. */ + grpc_tcp_server_cb on_accept_cb; + void *on_accept_cb_arg; gpr_mu mu; @@ -95,8 +96,8 @@ grpc_tcp_server *grpc_tcp_server_create(void) { grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server)); gpr_mu_init(&s->mu); s->active_ports = 0; - s->cb = NULL; - s->cb_arg = NULL; + s->on_accept_cb = NULL; + s->on_accept_cb_arg = NULL; s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP); s->nports = 0; s->port_capacity = INIT_PORT_CAP; @@ -344,7 +345,7 @@ static void on_accept(void *arg, int from_iocp) { /* The only time we should call our callback, is where we successfully managed to accept a connection, and created an endpoint. */ - if (ep) sp->server->cb(sp->server->cb_arg, ep); + if (ep) sp->server->on_accept_cb(sp->server->on_accept_cb_arg, ep); /* As we were notified from the IOCP of one and exactly one accept, the former socked we created has now either been destroy or assigned to the new connection. We need to create a new one for the next @@ -380,7 +381,7 @@ static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock, port = prepare_socket(sock, addr, addr_len); if (port >= 0) { gpr_mu_lock(&s->mu); - GPR_ASSERT(!s->cb && "must add ports before starting server"); + GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server"); /* append it to the list under a lock */ if (s->nports == s->port_capacity) { s->port_capacity *= 2; @@ -462,15 +463,16 @@ SOCKET grpc_tcp_server_get_socket(grpc_tcp_server *s, unsigned index) { } void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset **pollset, - size_t pollset_count, grpc_tcp_server_cb cb, - void *cb_arg) { + size_t pollset_count, + grpc_tcp_server_cb on_accept_cb, + void *on_accept_cb_arg) { size_t i; - GPR_ASSERT(cb); + GPR_ASSERT(on_accept_cb); gpr_mu_lock(&s->mu); - GPR_ASSERT(!s->cb); + GPR_ASSERT(!s->on_accept_cb); GPR_ASSERT(s->active_ports == 0); - s->cb = cb; - s->cb_arg = cb_arg; + s->on_accept_cb = on_accept_cb; + s->on_accept_cb_arg = on_accept_cb_arg; for (i = 0; i < s->nports; i++) { start_accept(s->ports + i); s->active_ports++; diff --git a/src/core/security/client_auth_filter.c b/src/core/security/client_auth_filter.c index 1d06df8533..43a37c2077 100644 --- a/src/core/security/client_auth_filter.c +++ b/src/core/security/client_auth_filter.c @@ -63,6 +63,7 @@ typedef struct { int sent_initial_metadata; gpr_uint8 security_context_set; grpc_linked_mdelem md_links[MAX_CREDENTIALS_METADATA_COUNT]; + char *service_url; } call_data; /* We can have a per-channel credentials. */ @@ -75,6 +76,13 @@ typedef struct { grpc_mdstr *status_key; } channel_data; +static void reset_service_url(call_data *calld) { + if (calld->service_url != NULL) { + gpr_free(calld->service_url); + calld->service_url = NULL; + } +} + static void bubble_up_error(grpc_call_element *elem, grpc_status_code status, const char *error_msg, grpc_call_list *call_list) { call_data *calld = elem->call_data; @@ -94,6 +102,7 @@ static void on_credentials_metadata(void *user_data, grpc_transport_stream_op *op = &calld->op; grpc_metadata_batch *mdb; size_t i; + reset_service_url(calld); if (status != GRPC_CREDENTIALS_OK) { bubble_up_error(elem, GRPC_STATUS_UNAUTHENTICATED, "Credentials failed to get metadata.", call_list); @@ -112,8 +121,7 @@ static void on_credentials_metadata(void *user_data, grpc_call_next_op(elem, op, call_list); } -static char *build_service_url(const char *url_scheme, call_data *calld) { - char *service_url; +void build_service_url(const char *url_scheme, call_data *calld) { char *service = gpr_strdup(grpc_mdstr_as_c_string(calld->method)); char *last_slash = strrchr(service, '/'); if (last_slash == NULL) { @@ -126,10 +134,10 @@ static char *build_service_url(const char *url_scheme, call_data *calld) { *last_slash = '\0'; } if (url_scheme == NULL) url_scheme = ""; - gpr_asprintf(&service_url, "%s://%s%s", url_scheme, + reset_service_url(calld); + gpr_asprintf(&calld->service_url, "%s://%s%s", url_scheme, grpc_mdstr_as_c_string(calld->host), service); gpr_free(service); - return service_url; } static void send_security_metadata(grpc_call_element *elem, @@ -139,7 +147,6 @@ static void send_security_metadata(grpc_call_element *elem, channel_data *chand = elem->channel_data; grpc_client_security_context *ctx = (grpc_client_security_context *)op->context[GRPC_CONTEXT_SECURITY].value; - char *service_url = NULL; grpc_credentials *channel_creds = chand->security_connector->request_metadata_creds; int channel_creds_has_md = @@ -168,14 +175,12 @@ static void send_security_metadata(grpc_call_element *elem, grpc_credentials_ref(call_creds_has_md ? ctx->creds : channel_creds); } - service_url = - build_service_url(chand->security_connector->base.url_scheme, calld); + build_service_url(chand->security_connector->base.url_scheme, calld); calld->op = *op; /* Copy op (originates from the caller's stack). */ GPR_ASSERT(calld->pollset); - grpc_credentials_get_request_metadata(calld->creds, calld->pollset, - service_url, on_credentials_metadata, - elem, call_list); - gpr_free(service_url); + grpc_credentials_get_request_metadata( + calld->creds, calld->pollset, calld->service_url, on_credentials_metadata, + elem, call_list); } static void on_host_checked(void *user_data, grpc_security_status status, @@ -283,13 +288,7 @@ static void init_call_elem(grpc_call_element *elem, grpc_transport_stream_op *initial_op, grpc_call_list *call_list) { call_data *calld = elem->call_data; - calld->creds = NULL; - calld->host = NULL; - calld->method = NULL; - calld->pollset = NULL; - calld->sent_initial_metadata = 0; - calld->security_context_set = 0; - + memset(calld, 0, sizeof(*calld)); GPR_ASSERT(!initial_op || !initial_op->send_ops); } @@ -304,6 +303,7 @@ static void destroy_call_elem(grpc_call_element *elem, if (calld->method != NULL) { GRPC_MDSTR_UNREF(calld->method); } + reset_service_url(calld); } /* Constructor for channel_data */ diff --git a/src/core/security/credentials.c b/src/core/security/credentials.c index 086c428112..80e0a79447 100644 --- a/src/core/security/credentials.c +++ b/src/core/security/credentials.c @@ -1197,3 +1197,97 @@ grpc_credentials *grpc_google_iam_credentials_create( c->iam_md, GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY, authority_selector); return &c->base; } + +/* -- Plugin credentials. -- */ + +typedef struct { + void *user_data; + grpc_credentials_metadata_cb cb; +} grpc_metadata_plugin_request; + +static void plugin_destruct(grpc_credentials *creds) { + grpc_plugin_credentials *c = (grpc_plugin_credentials *)creds; + if (c->plugin.state != NULL && c->plugin.destroy != NULL) { + c->plugin.destroy(c->plugin.state); + } +} + +static int plugin_has_request_metadata(const grpc_credentials *creds) { + return 1; +} + +static int plugin_has_request_metadata_only(const grpc_credentials *creds) { + return 1; +} + +static void plugin_md_request_metadata_ready(void *request, + const grpc_metadata *md, + size_t num_md, + grpc_status_code status, + const char *error_details) { + /* called from application code */ + grpc_call_list call_list = GRPC_CALL_LIST_INIT; + grpc_metadata_plugin_request *r = (grpc_metadata_plugin_request *)request; + if (status != GRPC_STATUS_OK) { + if (error_details != NULL) { + gpr_log(GPR_ERROR, "Getting metadata from plugin failed with error: %s", + error_details); + } + r->cb(r->user_data, NULL, 0, GRPC_CREDENTIALS_ERROR, &call_list); + } else { + size_t i; + grpc_credentials_md *md_array = NULL; + if (num_md > 0) { + md_array = gpr_malloc(num_md * sizeof(grpc_credentials_md)); + for (i = 0; i < num_md; i++) { + md_array[i].key = gpr_slice_from_copied_string(md[i].key); + md_array[i].value = + gpr_slice_from_copied_buffer(md[i].value, md[i].value_length); + } + } + r->cb(r->user_data, md_array, num_md, GRPC_CREDENTIALS_OK, &call_list); + if (md_array != NULL) { + for (i = 0; i < num_md; i++) { + gpr_slice_unref(md_array[i].key); + gpr_slice_unref(md_array[i].value); + } + gpr_free(md_array); + } + } + gpr_free(r); +} + +static void plugin_get_request_metadata(grpc_credentials *creds, + grpc_pollset *pollset, + const char *service_url, + grpc_credentials_metadata_cb cb, + void *user_data, + grpc_call_list *call_list) { + grpc_plugin_credentials *c = (grpc_plugin_credentials *)creds; + if (c->plugin.get_metadata != NULL) { + grpc_metadata_plugin_request *request = gpr_malloc(sizeof(*request)); + memset(request, 0, sizeof(*request)); + request->user_data = user_data; + request->cb = cb; + c->plugin.get_metadata(c->plugin.state, service_url, + plugin_md_request_metadata_ready, request); + } else { + cb(user_data, NULL, 0, GRPC_CREDENTIALS_OK, call_list); + } +} + +static grpc_credentials_vtable plugin_vtable = { + plugin_destruct, plugin_has_request_metadata, + plugin_has_request_metadata_only, plugin_get_request_metadata, NULL}; + +grpc_credentials *grpc_metadata_credentials_create_from_plugin( + grpc_metadata_credentials_plugin plugin, void *reserved) { + grpc_plugin_credentials *c = gpr_malloc(sizeof(*c)); + GPR_ASSERT(reserved == NULL); + memset(c, 0, sizeof(*c)); + c->base.type = GRPC_CREDENTIALS_TYPE_METADATA_PLUGIN; + c->base.vtable = &plugin_vtable; + gpr_ref_init(&c->base.refcount, 1); + c->plugin = plugin; + return &c->base; +} diff --git a/src/core/security/credentials.h b/src/core/security/credentials.h index a93d0ec653..090767d237 100644 --- a/src/core/security/credentials.h +++ b/src/core/security/credentials.h @@ -56,6 +56,7 @@ typedef enum { #define GRPC_CREDENTIALS_TYPE_SSL "Ssl" #define GRPC_CREDENTIALS_TYPE_OAUTH2 "Oauth2" +#define GRPC_CREDENTIALS_TYPE_METADATA_PLUGIN "Plugin" #define GRPC_CREDENTIALS_TYPE_JWT "Jwt" #define GRPC_CREDENTIALS_TYPE_IAM "Iam" #define GRPC_CREDENTIALS_TYPE_COMPOSITE "Composite" @@ -325,4 +326,12 @@ typedef struct { grpc_credentials *connector_creds; } grpc_composite_credentials; +/* -- Plugin credentials. -- */ + +typedef struct { + grpc_credentials base; + grpc_metadata_credentials_plugin plugin; + grpc_credentials_md_store *plugin_md; +} grpc_plugin_credentials; + #endif /* GRPC_INTERNAL_CORE_SECURITY_CREDENTIALS_H */ diff --git a/src/core/security/secure_transport_setup.c b/src/core/security/handshake.c index 1412ab21ad..e3206c2e12 100644 --- a/src/core/security/secure_transport_setup.c +++ b/src/core/security/handshake.c @@ -31,7 +31,7 @@ * */ -#include "src/core/security/secure_transport_setup.h" +#include "src/core/security/handshake.h" #include <string.h> @@ -52,11 +52,11 @@ typedef struct { gpr_slice_buffer left_overs; gpr_slice_buffer incoming; gpr_slice_buffer outgoing; - grpc_secure_transport_setup_done_cb cb; + grpc_security_handshake_done_cb cb; void *user_data; grpc_closure on_handshake_data_sent_to_peer; grpc_closure on_handshake_data_received_from_peer; -} grpc_secure_transport_setup; +} grpc_security_handshake; static void on_handshake_data_received_from_peer(void *setup, int success, grpc_call_list *call_list); @@ -64,119 +64,117 @@ static void on_handshake_data_received_from_peer(void *setup, int success, static void on_handshake_data_sent_to_peer(void *setup, int success, grpc_call_list *call_list); -static void secure_transport_setup_done(grpc_secure_transport_setup *s, - int is_success, - grpc_call_list *call_list) { +static void security_handshake_done(grpc_security_handshake *h, int is_success, + grpc_call_list *call_list) { if (is_success) { - s->cb(s->user_data, GRPC_SECURITY_OK, s->wrapped_endpoint, - s->secure_endpoint, call_list); + h->cb(h->user_data, GRPC_SECURITY_OK, h->wrapped_endpoint, + h->secure_endpoint, call_list); } else { - if (s->secure_endpoint != NULL) { - grpc_endpoint_shutdown(s->secure_endpoint, call_list); - grpc_endpoint_destroy(s->secure_endpoint, call_list); + if (h->secure_endpoint != NULL) { + grpc_endpoint_shutdown(h->secure_endpoint, call_list); + grpc_endpoint_destroy(h->secure_endpoint, call_list); } else { - grpc_endpoint_destroy(s->wrapped_endpoint, call_list); + grpc_endpoint_destroy(h->wrapped_endpoint, call_list); } - s->cb(s->user_data, GRPC_SECURITY_ERROR, s->wrapped_endpoint, NULL, + h->cb(h->user_data, GRPC_SECURITY_ERROR, h->wrapped_endpoint, NULL, call_list); } - if (s->handshaker != NULL) tsi_handshaker_destroy(s->handshaker); - if (s->handshake_buffer != NULL) gpr_free(s->handshake_buffer); - gpr_slice_buffer_destroy(&s->left_overs); - gpr_slice_buffer_destroy(&s->outgoing); - gpr_slice_buffer_destroy(&s->incoming); - GRPC_SECURITY_CONNECTOR_UNREF(s->connector, "secure_transport_setup"); - gpr_free(s); + if (h->handshaker != NULL) tsi_handshaker_destroy(h->handshaker); + if (h->handshake_buffer != NULL) gpr_free(h->handshake_buffer); + gpr_slice_buffer_destroy(&h->left_overs); + gpr_slice_buffer_destroy(&h->outgoing); + gpr_slice_buffer_destroy(&h->incoming); + GRPC_SECURITY_CONNECTOR_UNREF(h->connector, "handshake"); + gpr_free(h); } static void on_peer_checked(void *user_data, grpc_security_status status, grpc_call_list *call_list) { - grpc_secure_transport_setup *s = user_data; + grpc_security_handshake *h = user_data; tsi_frame_protector *protector; tsi_result result; if (status != GRPC_SECURITY_OK) { gpr_log(GPR_ERROR, "Error checking peer."); - secure_transport_setup_done(s, 0, call_list); + security_handshake_done(h, 0, call_list); return; } result = - tsi_handshaker_create_frame_protector(s->handshaker, NULL, &protector); + tsi_handshaker_create_frame_protector(h->handshaker, NULL, &protector); if (result != TSI_OK) { gpr_log(GPR_ERROR, "Frame protector creation failed with error %s.", tsi_result_to_string(result)); - secure_transport_setup_done(s, 0, call_list); + security_handshake_done(h, 0, call_list); return; } - s->secure_endpoint = - grpc_secure_endpoint_create(protector, s->wrapped_endpoint, - s->left_overs.slices, s->left_overs.count); - s->left_overs.count = 0; - s->left_overs.length = 0; - secure_transport_setup_done(s, 1, call_list); + h->secure_endpoint = + grpc_secure_endpoint_create(protector, h->wrapped_endpoint, + h->left_overs.slices, h->left_overs.count); + h->left_overs.count = 0; + h->left_overs.length = 0; + security_handshake_done(h, 1, call_list); return; } -static void check_peer(grpc_secure_transport_setup *s, - grpc_call_list *call_list) { +static void check_peer(grpc_security_handshake *h, grpc_call_list *call_list) { grpc_security_status peer_status; tsi_peer peer; - tsi_result result = tsi_handshaker_extract_peer(s->handshaker, &peer); + tsi_result result = tsi_handshaker_extract_peer(h->handshaker, &peer); if (result != TSI_OK) { gpr_log(GPR_ERROR, "Peer extraction failed with error %s", tsi_result_to_string(result)); - secure_transport_setup_done(s, 0, call_list); + security_handshake_done(h, 0, call_list); return; } - peer_status = grpc_security_connector_check_peer(s->connector, peer, - on_peer_checked, s); + peer_status = grpc_security_connector_check_peer(h->connector, peer, + on_peer_checked, h); if (peer_status == GRPC_SECURITY_ERROR) { gpr_log(GPR_ERROR, "Peer check failed."); - secure_transport_setup_done(s, 0, call_list); + security_handshake_done(h, 0, call_list); return; } else if (peer_status == GRPC_SECURITY_OK) { - on_peer_checked(s, peer_status, call_list); + on_peer_checked(h, peer_status, call_list); } } -static void send_handshake_bytes_to_peer(grpc_secure_transport_setup *s, +static void send_handshake_bytes_to_peer(grpc_security_handshake *h, grpc_call_list *call_list) { size_t offset = 0; tsi_result result = TSI_OK; gpr_slice to_send; do { - size_t to_send_size = s->handshake_buffer_size - offset; + size_t to_send_size = h->handshake_buffer_size - offset; result = tsi_handshaker_get_bytes_to_send_to_peer( - s->handshaker, s->handshake_buffer + offset, &to_send_size); + h->handshaker, h->handshake_buffer + offset, &to_send_size); offset += to_send_size; if (result == TSI_INCOMPLETE_DATA) { - s->handshake_buffer_size *= 2; - s->handshake_buffer = - gpr_realloc(s->handshake_buffer, s->handshake_buffer_size); + h->handshake_buffer_size *= 2; + h->handshake_buffer = + gpr_realloc(h->handshake_buffer, h->handshake_buffer_size); } } while (result == TSI_INCOMPLETE_DATA); if (result != TSI_OK) { gpr_log(GPR_ERROR, "Handshake failed with error %s", tsi_result_to_string(result)); - secure_transport_setup_done(s, 0, call_list); + security_handshake_done(h, 0, call_list); return; } to_send = - gpr_slice_from_copied_buffer((const char *)s->handshake_buffer, offset); - gpr_slice_buffer_reset_and_unref(&s->outgoing); - gpr_slice_buffer_add(&s->outgoing, to_send); + gpr_slice_from_copied_buffer((const char *)h->handshake_buffer, offset); + gpr_slice_buffer_reset_and_unref(&h->outgoing); + gpr_slice_buffer_add(&h->outgoing, to_send); /* TODO(klempner,jboeuf): This should probably use the client setup deadline */ - grpc_endpoint_write(s->wrapped_endpoint, &s->outgoing, - &s->on_handshake_data_sent_to_peer, call_list); + grpc_endpoint_write(h->wrapped_endpoint, &h->outgoing, + &h->on_handshake_data_sent_to_peer, call_list); } -static void on_handshake_data_received_from_peer(void *setup, int success, +static void on_handshake_data_received_from_peer(void *handshake, int success, grpc_call_list *call_list) { - grpc_secure_transport_setup *s = setup; + grpc_security_handshake *h = handshake; size_t consumed_slice_size = 0; tsi_result result = TSI_OK; size_t i; @@ -185,26 +183,26 @@ static void on_handshake_data_received_from_peer(void *setup, int success, if (!success) { gpr_log(GPR_ERROR, "Read failed."); - secure_transport_setup_done(s, 0, call_list); + security_handshake_done(h, 0, call_list); return; } - for (i = 0; i < s->incoming.count; i++) { - consumed_slice_size = GPR_SLICE_LENGTH(s->incoming.slices[i]); + for (i = 0; i < h->incoming.count; i++) { + consumed_slice_size = GPR_SLICE_LENGTH(h->incoming.slices[i]); result = tsi_handshaker_process_bytes_from_peer( - s->handshaker, GPR_SLICE_START_PTR(s->incoming.slices[i]), + h->handshaker, GPR_SLICE_START_PTR(h->incoming.slices[i]), &consumed_slice_size); - if (!tsi_handshaker_is_in_progress(s->handshaker)) break; + if (!tsi_handshaker_is_in_progress(h->handshaker)) break; } - if (tsi_handshaker_is_in_progress(s->handshaker)) { + if (tsi_handshaker_is_in_progress(h->handshaker)) { /* We may need more data. */ if (result == TSI_INCOMPLETE_DATA) { - grpc_endpoint_read(s->wrapped_endpoint, &s->incoming, - &s->on_handshake_data_received_from_peer, call_list); + grpc_endpoint_read(h->wrapped_endpoint, &h->incoming, + &h->on_handshake_data_received_from_peer, call_list); return; } else { - send_handshake_bytes_to_peer(s, call_list); + send_handshake_bytes_to_peer(h, call_list); return; } } @@ -212,80 +210,77 @@ static void on_handshake_data_received_from_peer(void *setup, int success, if (result != TSI_OK) { gpr_log(GPR_ERROR, "Handshake failed with error %s", tsi_result_to_string(result)); - secure_transport_setup_done(s, 0, call_list); + security_handshake_done(h, 0, call_list); return; } /* Handshake is done and successful this point. */ has_left_overs_in_current_slice = - (consumed_slice_size < GPR_SLICE_LENGTH(s->incoming.slices[i])); + (consumed_slice_size < GPR_SLICE_LENGTH(h->incoming.slices[i])); num_left_overs = - (has_left_overs_in_current_slice ? 1 : 0) + s->incoming.count - i - 1; + (has_left_overs_in_current_slice ? 1 : 0) + h->incoming.count - i - 1; if (num_left_overs == 0) { - check_peer(s, call_list); + check_peer(h, call_list); return; } + /* Put the leftovers in our buffer (ownership transfered). */ if (has_left_overs_in_current_slice) { gpr_slice_buffer_add( - &s->left_overs, - gpr_slice_split_tail(&s->incoming.slices[i], consumed_slice_size)); + &h->left_overs, + gpr_slice_split_tail(&h->incoming.slices[i], consumed_slice_size)); gpr_slice_unref( - s->incoming.slices[i]); /* split_tail above increments refcount. */ + h->incoming.slices[i]); /* split_tail above increments refcount. */ } gpr_slice_buffer_addn( - &s->left_overs, &s->incoming.slices[i + 1], + &h->left_overs, &h->incoming.slices[i + 1], num_left_overs - (size_t)has_left_overs_in_current_slice); - check_peer(s, call_list); + check_peer(h, call_list); } -/* If setup is NULL, the setup is done. */ -static void on_handshake_data_sent_to_peer(void *setup, int success, +/* If handshake is NULL, the handshake is done. */ +static void on_handshake_data_sent_to_peer(void *handshake, int success, grpc_call_list *call_list) { - grpc_secure_transport_setup *s = setup; + grpc_security_handshake *h = handshake; /* Make sure that write is OK. */ if (!success) { gpr_log(GPR_ERROR, "Write failed."); - if (setup != NULL) secure_transport_setup_done(s, 0, call_list); + if (handshake != NULL) security_handshake_done(h, 0, call_list); return; } /* We may be done. */ - if (tsi_handshaker_is_in_progress(s->handshaker)) { - grpc_endpoint_read(s->wrapped_endpoint, &s->incoming, - &s->on_handshake_data_received_from_peer, call_list); + if (tsi_handshaker_is_in_progress(h->handshaker)) { + /* TODO(klempner,jboeuf): This should probably use the client setup + deadline */ + grpc_endpoint_read(h->wrapped_endpoint, &h->incoming, + &h->on_handshake_data_received_from_peer, call_list); } else { - check_peer(s, call_list); + check_peer(h, call_list); } } -void grpc_setup_secure_transport(grpc_security_connector *connector, - grpc_endpoint *nonsecure_endpoint, - grpc_secure_transport_setup_done_cb cb, - void *user_data, grpc_call_list *call_list) { - grpc_security_status result = GRPC_SECURITY_OK; - grpc_secure_transport_setup *s = - gpr_malloc(sizeof(grpc_secure_transport_setup)); - memset(s, 0, sizeof(grpc_secure_transport_setup)); - result = grpc_security_connector_create_handshaker(connector, &s->handshaker); - if (result != GRPC_SECURITY_OK) { - secure_transport_setup_done(s, 0, call_list); - return; - } - s->connector = - GRPC_SECURITY_CONNECTOR_REF(connector, "secure_transport_setup"); - s->handshake_buffer_size = GRPC_INITIAL_HANDSHAKE_BUFFER_SIZE; - s->handshake_buffer = gpr_malloc(s->handshake_buffer_size); - s->wrapped_endpoint = nonsecure_endpoint; - s->user_data = user_data; - s->cb = cb; - grpc_closure_init(&s->on_handshake_data_sent_to_peer, - on_handshake_data_sent_to_peer, s); - grpc_closure_init(&s->on_handshake_data_received_from_peer, - on_handshake_data_received_from_peer, s); - gpr_slice_buffer_init(&s->left_overs); - gpr_slice_buffer_init(&s->outgoing); - gpr_slice_buffer_init(&s->incoming); - send_handshake_bytes_to_peer(s, call_list); +void grpc_do_security_handshake(tsi_handshaker *handshaker, + grpc_security_connector *connector, + grpc_endpoint *nonsecure_endpoint, + grpc_security_handshake_done_cb cb, + void *user_data, grpc_call_list *call_list) { + grpc_security_handshake *h = gpr_malloc(sizeof(grpc_security_handshake)); + memset(h, 0, sizeof(grpc_security_handshake)); + h->handshaker = handshaker; + h->connector = GRPC_SECURITY_CONNECTOR_REF(connector, "handshake"); + h->handshake_buffer_size = GRPC_INITIAL_HANDSHAKE_BUFFER_SIZE; + h->handshake_buffer = gpr_malloc(h->handshake_buffer_size); + h->wrapped_endpoint = nonsecure_endpoint; + h->user_data = user_data; + h->cb = cb; + grpc_closure_init(&h->on_handshake_data_sent_to_peer, + on_handshake_data_sent_to_peer, h); + grpc_closure_init(&h->on_handshake_data_received_from_peer, + on_handshake_data_received_from_peer, h); + gpr_slice_buffer_init(&h->left_overs); + gpr_slice_buffer_init(&h->outgoing); + gpr_slice_buffer_init(&h->incoming); + send_handshake_bytes_to_peer(h, call_list); } diff --git a/src/core/security/secure_transport_setup.h b/src/core/security/handshake.h index 867726ff4c..d492de5059 100644 --- a/src/core/security/secure_transport_setup.h +++ b/src/core/security/handshake.h @@ -31,24 +31,17 @@ * */ -#ifndef GRPC_INTERNAL_CORE_SECURITY_SECURE_TRANSPORT_SETUP_H -#define GRPC_INTERNAL_CORE_SECURITY_SECURE_TRANSPORT_SETUP_H +#ifndef GRPC_INTERNAL_CORE_SECURITY_HANDSHAKE_H +#define GRPC_INTERNAL_CORE_SECURITY_HANDSHAKE_H #include "src/core/iomgr/endpoint.h" #include "src/core/security/security_connector.h" -/* --- Secure transport setup --- */ +/* Calls the callback upon completion. Takes owership of handshaker. */ +void grpc_do_security_handshake(tsi_handshaker *handshaker, + grpc_security_connector *connector, + grpc_endpoint *nonsecure_endpoint, + grpc_security_handshake_done_cb cb, + void *user_data, grpc_call_list *call_list); -/* Ownership of the secure_endpoint is transfered. */ -typedef void (*grpc_secure_transport_setup_done_cb)( - void *user_data, grpc_security_status status, - grpc_endpoint *wrapped_endpoint, grpc_endpoint *secure_endpoint, - grpc_call_list *call_list); - -/* Calls the callback upon completion. */ -void grpc_setup_secure_transport(grpc_security_connector *connector, - grpc_endpoint *nonsecure_endpoint, - grpc_secure_transport_setup_done_cb cb, - void *user_data, grpc_call_list *call_list); - -#endif /* GRPC_INTERNAL_CORE_SECURITY_SECURE_TRANSPORT_SETUP_H */ +#endif /* GRPC_INTERNAL_CORE_SECURITY_HANDSHAKE_H */ diff --git a/src/core/security/security_connector.c b/src/core/security/security_connector.c index dac6a5a5fc..07e2490981 100644 --- a/src/core/security/security_connector.c +++ b/src/core/security/security_connector.c @@ -36,6 +36,7 @@ #include <string.h> #include "src/core/security/credentials.h" +#include "src/core/security/handshake.h" #include "src/core/security/secure_endpoint.h" #include "src/core/security/security_context.h" #include "src/core/support/env.h" @@ -101,10 +102,16 @@ const tsi_peer_property *tsi_peer_get_property_by_name(const tsi_peer *peer, return NULL; } -grpc_security_status grpc_security_connector_create_handshaker( - grpc_security_connector *sc, tsi_handshaker **handshaker) { - if (sc == NULL || handshaker == NULL) return GRPC_SECURITY_ERROR; - return sc->vtable->create_handshaker(sc, handshaker); +void grpc_security_connector_do_handshake(grpc_security_connector *sc, + grpc_endpoint *nonsecure_endpoint, + grpc_security_handshake_done_cb cb, + void *user_data, + grpc_call_list *call_list) { + if (sc == NULL || nonsecure_endpoint == NULL) { + cb(user_data, GRPC_SECURITY_ERROR, nonsecure_endpoint, NULL, call_list); + } else { + sc->vtable->do_handshake(sc, nonsecure_endpoint, cb, user_data, call_list); + } } grpc_security_status grpc_security_connector_check_peer( @@ -225,18 +232,6 @@ static void fake_server_destroy(grpc_security_connector *sc) { gpr_free(sc); } -static grpc_security_status fake_channel_create_handshaker( - grpc_security_connector *sc, tsi_handshaker **handshaker) { - *handshaker = tsi_create_fake_handshaker(1); - return GRPC_SECURITY_OK; -} - -static grpc_security_status fake_server_create_handshaker( - grpc_security_connector *sc, tsi_handshaker **handshaker) { - *handshaker = tsi_create_fake_handshaker(0); - return GRPC_SECURITY_OK; -} - static grpc_security_status fake_check_peer(grpc_security_connector *sc, tsi_peer peer, grpc_security_check_cb cb, @@ -286,11 +281,29 @@ static grpc_security_status fake_channel_check_call_host( } } +static void fake_channel_do_handshake(grpc_security_connector *sc, + grpc_endpoint *nonsecure_endpoint, + grpc_security_handshake_done_cb cb, + void *user_data, + grpc_call_list *call_list) { + grpc_do_security_handshake(tsi_create_fake_handshaker(1), sc, + nonsecure_endpoint, cb, user_data, call_list); +} + +static void fake_server_do_handshake(grpc_security_connector *sc, + grpc_endpoint *nonsecure_endpoint, + grpc_security_handshake_done_cb cb, + void *user_data, + grpc_call_list *call_list) { + grpc_do_security_handshake(tsi_create_fake_handshaker(0), sc, + nonsecure_endpoint, cb, user_data, call_list); +} + static grpc_security_connector_vtable fake_channel_vtable = { - fake_channel_destroy, fake_channel_create_handshaker, fake_check_peer}; + fake_channel_destroy, fake_channel_do_handshake, fake_check_peer}; static grpc_security_connector_vtable fake_server_vtable = { - fake_server_destroy, fake_server_create_handshaker, fake_check_peer}; + fake_server_destroy, fake_server_do_handshake, fake_check_peer}; grpc_channel_security_connector *grpc_fake_channel_security_connector_create( grpc_credentials *request_metadata_creds, int call_host_check_is_async) { @@ -372,22 +385,43 @@ static grpc_security_status ssl_create_handshaker( return GRPC_SECURITY_OK; } -static grpc_security_status ssl_channel_create_handshaker( - grpc_security_connector *sc, tsi_handshaker **handshaker) { +static void ssl_channel_do_handshake(grpc_security_connector *sc, + grpc_endpoint *nonsecure_endpoint, + grpc_security_handshake_done_cb cb, + void *user_data, + grpc_call_list *call_list) { grpc_ssl_channel_security_connector *c = (grpc_ssl_channel_security_connector *)sc; - return ssl_create_handshaker(c->handshaker_factory, 1, - c->overridden_target_name != NULL - ? c->overridden_target_name - : c->target_name, - handshaker); + tsi_handshaker *handshaker; + grpc_security_status status = ssl_create_handshaker( + c->handshaker_factory, 1, + c->overridden_target_name != NULL ? c->overridden_target_name + : c->target_name, + &handshaker); + if (status != GRPC_SECURITY_OK) { + cb(user_data, status, nonsecure_endpoint, NULL, call_list); + } else { + grpc_do_security_handshake(handshaker, sc, nonsecure_endpoint, cb, + user_data, call_list); + } } -static grpc_security_status ssl_server_create_handshaker( - grpc_security_connector *sc, tsi_handshaker **handshaker) { +static void ssl_server_do_handshake(grpc_security_connector *sc, + grpc_endpoint *nonsecure_endpoint, + grpc_security_handshake_done_cb cb, + void *user_data, + grpc_call_list *call_list) { grpc_ssl_server_security_connector *c = (grpc_ssl_server_security_connector *)sc; - return ssl_create_handshaker(c->handshaker_factory, 0, NULL, handshaker); + tsi_handshaker *handshaker; + grpc_security_status status = + ssl_create_handshaker(c->handshaker_factory, 0, NULL, &handshaker); + if (status != GRPC_SECURITY_OK) { + cb(user_data, status, nonsecure_endpoint, NULL, call_list); + } else { + grpc_do_security_handshake(handshaker, sc, nonsecure_endpoint, cb, + user_data, call_list); + } } static int ssl_host_matches_name(const tsi_peer *peer, const char *peer_name) { @@ -512,10 +546,10 @@ static grpc_security_status ssl_channel_check_call_host( } static grpc_security_connector_vtable ssl_channel_vtable = { - ssl_channel_destroy, ssl_channel_create_handshaker, ssl_channel_check_peer}; + ssl_channel_destroy, ssl_channel_do_handshake, ssl_channel_check_peer}; static grpc_security_connector_vtable ssl_server_vtable = { - ssl_server_destroy, ssl_server_create_handshaker, ssl_server_check_peer}; + ssl_server_destroy, ssl_server_do_handshake, ssl_server_check_peer}; static gpr_slice default_pem_root_certs; diff --git a/src/core/security/security_connector.h b/src/core/security/security_connector.h index d78d6ad69f..276fa47940 100644 --- a/src/core/security/security_connector.h +++ b/src/core/security/security_connector.h @@ -64,10 +64,19 @@ typedef void (*grpc_security_check_cb)(void *user_data, grpc_security_status status, grpc_call_list *call_list); +/* Ownership of the secure_endpoint is transfered. */ +typedef void (*grpc_security_handshake_done_cb)(void *user_data, + grpc_security_status status, + grpc_endpoint *wrapped_endpoint, + grpc_endpoint *secure_endpoint, + grpc_call_list *call_list); + typedef struct { void (*destroy)(grpc_security_connector *sc); - grpc_security_status (*create_handshaker)(grpc_security_connector *sc, - tsi_handshaker **handshaker); + void (*do_handshake)(grpc_security_connector *sc, + grpc_endpoint *nonsecure_endpoint, + grpc_security_handshake_done_cb cb, void *user_data, + grpc_call_list *call_list); grpc_security_status (*check_peer)(grpc_security_connector *sc, tsi_peer peer, grpc_security_check_cb cb, void *user_data); @@ -101,9 +110,12 @@ grpc_security_connector *grpc_security_connector_ref( void grpc_security_connector_unref(grpc_security_connector *policy); #endif -/* Handshake creation. */ -grpc_security_status grpc_security_connector_create_handshaker( - grpc_security_connector *sc, tsi_handshaker **handshaker); +/* Handshake. */ +void grpc_security_connector_do_handshake(grpc_security_connector *connector, + grpc_endpoint *nonsecure_endpoint, + grpc_security_handshake_done_cb cb, + void *user_data, + grpc_call_list *call_list); /* Check the peer. Implementations can choose to check the peer either synchronously or diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c index 829082507c..2ecb615140 100644 --- a/src/core/security/server_secure_chttp2.c +++ b/src/core/security/server_secure_chttp2.c @@ -44,7 +44,6 @@ #include "src/core/security/credentials.h" #include "src/core/security/security_connector.h" #include "src/core/security/security_context.h" -#include "src/core/security/secure_transport_setup.h" #include "src/core/surface/server.h" #include "src/core/transport/chttp2_transport.h" #include <grpc/support/alloc.h> @@ -126,11 +125,10 @@ static int remove_tcp_from_list_locked(grpc_server_secure_state *state, return -1; } -static void on_secure_transport_setup_done(void *statep, - grpc_security_status status, - grpc_endpoint *wrapped_endpoint, - grpc_endpoint *secure_endpoint, - grpc_call_list *call_list) { +static void on_secure_handshake_done(void *statep, grpc_security_status status, + grpc_endpoint *wrapped_endpoint, + grpc_endpoint *secure_endpoint, + grpc_call_list *call_list) { grpc_server_secure_state *state = statep; grpc_transport *transport; grpc_mdctx *mdctx; @@ -170,8 +168,8 @@ static void on_accept(void *statep, grpc_endpoint *tcp, node->next = state->handshaking_tcp_endpoints; state->handshaking_tcp_endpoints = node; gpr_mu_unlock(&state->mu); - grpc_setup_secure_transport(state->sc, tcp, on_secure_transport_setup_done, - state, call_list); + grpc_security_connector_do_handshake(state->sc, tcp, on_secure_handshake_done, + state, call_list); } /* Server callback: start listening on our ports */ diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c index 7891669e35..3e34e84f60 100644 --- a/src/core/surface/channel_connectivity.c +++ b/src/core/surface/channel_connectivity.c @@ -71,6 +71,7 @@ typedef struct { gpr_mu mu; callback_phase phase; int success; + int removed; grpc_closure on_complete; grpc_alarm alarm; grpc_connectivity_state state; @@ -81,10 +82,6 @@ typedef struct { } state_watcher; static void delete_state_watcher(state_watcher *w, grpc_call_list *call_list) { - grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element( - grpc_channel_get_channel_stack(w->channel)); - grpc_client_channel_del_interested_party(client_channel_elem, - grpc_cq_pollset(w->cq), call_list); GRPC_CHANNEL_INTERNAL_UNREF(w->channel, "watch_connectivity", call_list); gpr_mu_destroy(&w->mu); gpr_free(w); @@ -118,7 +115,17 @@ static void finished_completion(void *pw, grpc_cq_completion *ignored, static void partly_done(state_watcher *w, int due_to_completion, grpc_call_list *call_list) { int delete = 0; + grpc_channel_element *client_channel_elem = NULL; + gpr_mu_lock(&w->mu); + if (w->removed == 0) { + w->removed = 1; + client_channel_elem = grpc_channel_stack_last_element( + grpc_channel_get_channel_stack(w->channel)); + grpc_client_channel_del_interested_party(client_channel_elem, + grpc_cq_pollset(w->cq), call_list); + } + gpr_mu_unlock(&w->mu); if (due_to_completion) { gpr_mu_lock(&w->mu); w->success = 1; @@ -174,6 +181,7 @@ void grpc_channel_watch_connectivity_state( w->phase = WAITING; w->state = last_observed_state; w->success = 0; + w->removed = 0; w->cq = cq; w->tag = tag; w->channel = channel; diff --git a/src/core/surface/init.c b/src/core/surface/init.c index 03bd026a42..93c27c77bf 100644 --- a/src/core/surface/init.c +++ b/src/core/surface/init.c @@ -42,6 +42,7 @@ #include "src/core/channel/channel_stack.h" #include "src/core/client_config/lb_policy_registry.h" #include "src/core/client_config/lb_policies/pick_first.h" +#include "src/core/client_config/lb_policies/round_robin.h" #include "src/core/client_config/resolver_registry.h" #include "src/core/client_config/resolvers/dns_resolver.h" #include "src/core/client_config/resolvers/sockaddr_resolver.h" @@ -89,6 +90,7 @@ void grpc_init(void) { gpr_time_init(); grpc_lb_policy_registry_init(grpc_pick_first_lb_factory_create()); grpc_register_lb_policy(grpc_pick_first_lb_factory_create()); + grpc_register_lb_policy(grpc_round_robin_lb_factory_create()); grpc_resolver_registry_init("dns:///"); grpc_register_resolver_type(grpc_dns_resolver_factory_create()); grpc_register_resolver_type(grpc_ipv4_resolver_factory_create()); diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index 9f10cd29fd..c2d571d955 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -47,7 +47,6 @@ #include "src/core/iomgr/tcp_client.h" #include "src/core/security/auth_filters.h" #include "src/core/security/credentials.h" -#include "src/core/security/secure_transport_setup.h" #include "src/core/surface/channel.h" #include "src/core/transport/chttp2_transport.h" #include "src/core/tsi/transport_security_interface.h" @@ -84,11 +83,10 @@ static void connector_unref(grpc_connector *con, grpc_call_list *call_list) { } } -static void on_secure_transport_setup_done(void *arg, - grpc_security_status status, - grpc_endpoint *wrapped_endpoint, - grpc_endpoint *secure_endpoint, - grpc_call_list *call_list) { +static void on_secure_handshake_done(void *arg, grpc_security_status status, + grpc_endpoint *wrapped_endpoint, + grpc_endpoint *secure_endpoint, + grpc_call_list *call_list) { connector *c = arg; grpc_closure *notify; gpr_mu_lock(&c->mu); @@ -97,7 +95,7 @@ static void on_secure_transport_setup_done(void *arg, gpr_mu_unlock(&c->mu); } else if (status != GRPC_SECURITY_OK) { GPR_ASSERT(c->connecting_endpoint == wrapped_endpoint); - gpr_log(GPR_ERROR, "Secure transport setup failed with error %d.", status); + gpr_log(GPR_ERROR, "Secure handshake failed with error %d.", status); memset(c->result, 0, sizeof(*c->result)); c->connecting_endpoint = NULL; gpr_mu_unlock(&c->mu); @@ -128,8 +126,9 @@ static void connected(void *arg, int success, grpc_call_list *call_list) { GPR_ASSERT(c->connecting_endpoint == NULL); c->connecting_endpoint = tcp; gpr_mu_unlock(&c->mu); - grpc_setup_secure_transport(&c->security_connector->base, tcp, - on_secure_transport_setup_done, c, call_list); + grpc_security_connector_do_handshake(&c->security_connector->base, tcp, + on_secure_handshake_done, c, + call_list); } else { memset(c->result, 0, sizeof(*c->result)); notify = c->notify; |