diff options
-rw-r--r-- | src/core/channel/client_channel.c | 329 | ||||
-rw-r--r-- | src/core/client_config/client_config.c | 2 | ||||
-rw-r--r-- | src/core/client_config/lb_policies/pick_first.c | 169 | ||||
-rw-r--r-- | src/core/client_config/lb_policy.h | 2 | ||||
-rw-r--r-- | src/core/client_config/resolver_factory.c | 1 | ||||
-rw-r--r-- | src/core/client_config/resolver_registry.c | 79 | ||||
-rw-r--r-- | src/core/client_config/resolver_registry.h | 12 | ||||
-rw-r--r-- | src/core/client_config/resolvers/dns_resolver.c | 47 | ||||
-rw-r--r-- | src/core/client_config/resolvers/dns_resolver.h | 3 | ||||
-rw-r--r-- | src/core/client_config/subchannel.c | 77 | ||||
-rw-r--r-- | src/core/client_config/subchannel.h | 5 | ||||
-rw-r--r-- | src/core/client_config/subchannel_factory.c | 12 | ||||
-rw-r--r-- | src/core/client_config/uri_parser.c | 25 | ||||
-rw-r--r-- | src/core/client_config/uri_parser.h | 3 | ||||
-rw-r--r-- | src/core/surface/init.c | 4 | ||||
-rw-r--r-- | test/core/end2end/fixtures/chttp2_fullstack.c | 1 |
16 files changed, 511 insertions, 260 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 9630f6898d..965d4e53dc 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -34,6 +34,7 @@ #include "src/core/channel/client_channel.h" #include <stdio.h> +#include <string.h> #include "src/core/channel/channel_args.h" #include "src/core/channel/connected_channel.h" @@ -75,6 +76,7 @@ typedef enum { CALL_CREATED, CALL_WAITING_FOR_CONFIG, CALL_WAITING_FOR_PICK, + CALL_WAITING_FOR_CALL, CALL_ACTIVE, CALL_CANCELLED } call_state; @@ -87,17 +89,13 @@ struct call_data { call_state state; gpr_timespec deadline; - union { - struct { - /* our child call stack */ - grpc_subchannel_call *subchannel_call; - } active; - grpc_transport_stream_op waiting_op; - struct { - grpc_linked_mdelem status; - grpc_linked_mdelem details; - } cancelled; - } s; + grpc_subchannel *picked_channel; + grpc_iomgr_closure async_setup_task; + grpc_transport_stream_op waiting_op; + /* our child call stack */ + grpc_subchannel_call *subchannel_call; + grpc_linked_mdelem status; + grpc_linked_mdelem details; }; #if 0 @@ -110,9 +108,9 @@ static int prepare_activate(grpc_call_element *elem, /* no more access to calld->s.waiting allowed */ GPR_ASSERT(calld->state == CALL_WAITING); - if (calld->s.waiting_op.bind_pollset) { + if (calld->waiting_op.bind_pollset) { grpc_transport_setup_del_interested_party(chand->transport_setup, - calld->s.waiting_op.bind_pollset); + calld->waiting_op.bind_pollset); } calld->state = CALL_ACTIVE; @@ -143,7 +141,7 @@ static void remove_waiting_child(channel_data *chand, call_data *calld) { for (i = 0, new_count = 0; i < chand->waiting_child_count; i++) { if (chand->waiting_children[i] == calld) { grpc_transport_setup_del_interested_party( - chand->transport_setup, calld->s.waiting_op.bind_pollset); + chand->transport_setup, calld->waiting_op.bind_pollset); continue; } chand->waiting_children[new_count++] = chand->waiting_children[i]; @@ -166,15 +164,15 @@ static void handle_op_after_cancellation(grpc_call_element *elem, char status[GPR_LTOA_MIN_BUFSIZE]; grpc_metadata_batch mdb; gpr_ltoa(GRPC_STATUS_CANCELLED, status); - calld->s.cancelled.status.md = + calld->status.md = grpc_mdelem_from_strings(chand->mdctx, "grpc-status", status); - calld->s.cancelled.details.md = + calld->details.md = grpc_mdelem_from_strings(chand->mdctx, "grpc-message", "Cancelled"); - calld->s.cancelled.status.prev = calld->s.cancelled.details.next = NULL; - calld->s.cancelled.status.next = &calld->s.cancelled.details; - calld->s.cancelled.details.prev = &calld->s.cancelled.status; - mdb.list.head = &calld->s.cancelled.status; - mdb.list.tail = &calld->s.cancelled.details; + calld->status.prev = calld->details.next = NULL; + calld->status.next = &calld->details; + calld->details.prev = &calld->status; + mdb.list.head = &calld->status; + mdb.list.tail = &calld->details; mdb.garbage.head = mdb.garbage.tail = NULL; mdb.deadline = gpr_inf_future; grpc_sopb_add_metadata(op->recv_ops, mdb); @@ -186,16 +184,111 @@ static void handle_op_after_cancellation(grpc_call_element *elem, } } -static void add_to_lb_policy_wait_queue_locked_state_config(channel_data *chand, call_data *calld) { - abort(); +typedef struct { + grpc_iomgr_closure closure; + grpc_call_element *elem; +} waiting_call; + +static void perform_transport_stream_op(grpc_call_element *elem, grpc_transport_stream_op *op, int continuation); + +static void continue_with_pick(void *arg, int iomgr_success) { + waiting_call *wc = arg; + call_data *calld = wc->elem->call_data; + perform_transport_stream_op(wc->elem, &calld->waiting_op, 1); + gpr_free(wc); +} + +static void add_to_lb_policy_wait_queue_locked_state_config(grpc_call_element *elem) { + channel_data *chand = elem->channel_data; + waiting_call *wc = gpr_malloc(sizeof(*wc)); + grpc_iomgr_closure_init(&wc->closure, continue_with_pick, wc); + wc->elem = elem; + wc->closure.next = chand->waiting_for_config_closures; + chand->waiting_for_config_closures = &wc->closure; +} + +static int is_empty(void *p, int len) { + char *ptr = p; + int i; + for (i = 0; i < len; i++) { + if (ptr[i] != 0) return 0; + } + return 1; +} + +static void started_call(void *arg, int iomgr_success) { + call_data *calld = arg; + grpc_transport_stream_op op; + int have_waiting; + + gpr_mu_lock(&calld->mu_state); + if (calld->state == CALL_CANCELLED && calld->subchannel_call != NULL) { + memset(&op, 0, sizeof(op)); + op.cancel_with_status = GRPC_STATUS_CANCELLED; + gpr_mu_unlock(&calld->mu_state); + grpc_subchannel_call_process_op(calld->subchannel_call, &op); + } else if (calld->state == CALL_WAITING_FOR_CALL) { + have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op)); + if (calld->subchannel_call != NULL) { + calld->state = CALL_ACTIVE; + gpr_mu_unlock(&calld->mu_state); + if (have_waiting) { + grpc_subchannel_call_process_op(calld->subchannel_call, &calld->waiting_op); + } + } else { + calld->state = CALL_CANCELLED; + gpr_mu_unlock(&calld->mu_state); + if (have_waiting) { + handle_op_after_cancellation(calld->elem, &calld->waiting_op); + } + } + } else { + GPR_ASSERT(calld->state == CALL_CANCELLED); + } +} + +static void picked_target(void *arg, int iomgr_success) { + call_data *calld = arg; + channel_data *chand = calld->elem->channel_data; + grpc_transport_stream_op op; + + if (calld->picked_channel == NULL) { + /* treat this like a cancellation */ + calld->waiting_op.cancel_with_status = GRPC_STATUS_UNAVAILABLE; + perform_transport_stream_op(calld->elem, &calld->waiting_op, 1); + } else { + gpr_mu_lock(&calld->mu_state); + if (calld->state == CALL_CANCELLED) { + gpr_mu_unlock(&calld->mu_state); + handle_op_after_cancellation(calld->elem, &calld->waiting_op); + } else { + GPR_ASSERT(calld->state == CALL_WAITING_FOR_PICK); + calld->state = CALL_WAITING_FOR_CALL; + op = calld->waiting_op; + memset(&calld->waiting_op, 0, sizeof(calld->waiting_op)); + gpr_mu_unlock(&calld->mu_state); + grpc_iomgr_closure_init(&calld->async_setup_task, started_call, calld); + grpc_subchannel_create_call(calld->picked_channel, chand->mdctx, &op, &calld->subchannel_call, &calld->async_setup_task); + } + } } static void pick_target(grpc_lb_policy *lb_policy, call_data *calld) { - abort(); + grpc_metadata_batch *initial_metadata; + grpc_transport_stream_op *op = &calld->waiting_op; + + GPR_ASSERT(op->bind_pollset); + GPR_ASSERT(op->send_ops); + GPR_ASSERT(op->send_ops->nops >= 1); + GPR_ASSERT(op->send_ops->ops[0].type == GRPC_OP_METADATA); + initial_metadata = &op->send_ops->ops[0].data.metadata; + + grpc_iomgr_closure_init(&calld->async_setup_task, picked_target, calld); + grpc_lb_policy_pick(lb_policy, op->bind_pollset, + initial_metadata, &calld->picked_channel, &calld->async_setup_task); } -static void cc_start_transport_stream_op(grpc_call_element *elem, - grpc_transport_stream_op *op) { +static void perform_transport_stream_op(grpc_call_element *elem, grpc_transport_stream_op *op, int continuation) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; grpc_subchannel_call *subchannel_call; @@ -206,7 +299,8 @@ static void cc_start_transport_stream_op(grpc_call_element *elem, gpr_mu_lock(&calld->mu_state); switch (calld->state) { case CALL_ACTIVE: - subchannel_call = calld->s.active.subchannel_call; + GPR_ASSERT(!continuation); + subchannel_call = calld->subchannel_call; gpr_mu_unlock(&calld->mu_state); grpc_subchannel_call_process_op(subchannel_call, op); break; @@ -214,13 +308,44 @@ static void cc_start_transport_stream_op(grpc_call_element *elem, gpr_mu_unlock(&calld->mu_state); handle_op_after_cancellation(elem, op); break; + case CALL_WAITING_FOR_CONFIG: + case CALL_WAITING_FOR_PICK: + case CALL_WAITING_FOR_CALL: + if (!continuation) { + if (op->cancel_with_status != GRPC_STATUS_OK) { + calld->state = CALL_CANCELLED; + gpr_mu_unlock(&calld->mu_state); + handle_op_after_cancellation(elem, op); + } else { + GPR_ASSERT((calld->waiting_op.send_ops == NULL) != + (op->send_ops == NULL)); + GPR_ASSERT((calld->waiting_op.recv_ops == NULL) != + (op->recv_ops == NULL)); + if (op->send_ops != NULL) { + calld->waiting_op.send_ops = op->send_ops; + calld->waiting_op.is_last_send = op->is_last_send; + calld->waiting_op.on_done_send = op->on_done_send; + } + if (op->recv_ops != NULL) { + calld->waiting_op.recv_ops = op->recv_ops; + calld->waiting_op.recv_state = op->recv_state; + calld->waiting_op.on_done_recv = op->on_done_recv; + } + gpr_mu_unlock(&calld->mu_state); + if (op->on_consumed != NULL) { + op->on_consumed->cb(op->on_consumed->cb_arg, 0); + } + } + break; + } + /* fall through */ case CALL_CREATED: if (op->cancel_with_status != GRPC_STATUS_OK) { calld->state = CALL_CANCELLED; gpr_mu_unlock(&calld->mu_state); handle_op_after_cancellation(elem, op); } else { - calld->s.waiting_op = *op; + calld->waiting_op = *op; gpr_mu_lock(&chand->mu_config); lb_policy = chand->lb_policy; @@ -235,141 +360,22 @@ static void cc_start_transport_stream_op(grpc_call_element *elem, grpc_lb_policy_unref(lb_policy); } else { calld->state = CALL_WAITING_FOR_CONFIG; - add_to_lb_policy_wait_queue_locked_state_config(chand, calld); + add_to_lb_policy_wait_queue_locked_state_config(elem); gpr_mu_unlock(&chand->mu_config); gpr_mu_unlock(&calld->mu_state); } } break; - case CALL_WAITING_FOR_CONFIG: - case CALL_WAITING_FOR_PICK: - if (op->cancel_with_status != GRPC_STATUS_OK) { - calld->state = CALL_CANCELLED; - gpr_mu_unlock(&calld->mu_state); - handle_op_after_cancellation(elem, op); - } else { - GPR_ASSERT((calld->s.waiting_op.send_ops == NULL) != - (op->send_ops == NULL)); - GPR_ASSERT((calld->s.waiting_op.recv_ops == NULL) != - (op->recv_ops == NULL)); - if (op->send_ops != NULL) { - calld->s.waiting_op.send_ops = op->send_ops; - calld->s.waiting_op.is_last_send = op->is_last_send; - calld->s.waiting_op.on_done_send = op->on_done_send; - } - if (op->recv_ops != NULL) { - calld->s.waiting_op.recv_ops = op->recv_ops; - calld->s.waiting_op.recv_state = op->recv_state; - calld->s.waiting_op.on_done_recv = op->on_done_recv; - } - gpr_mu_unlock(&calld->mu_state); - if (op->on_consumed != NULL) { - op->on_consumed->cb(op->on_consumed->cb_arg, 0); - } - } - break; } +} - - - -#if 0 - gpr_mu_lock(&chand->mu); - switch (calld->state) { - case CALL_ACTIVE: - child_elem = grpc_child_call_get_top_element(calld->s.active.child_call); - gpr_mu_unlock(&chand->mu); - child_elem->filter->start_transport_op(child_elem, op); - break; - case CALL_CREATED: - if (op->cancel_with_status != GRPC_STATUS_OK) { - calld->state = CALL_CANCELLED; - gpr_mu_unlock(&chand->mu); - handle_op_after_cancellation(elem, op); - } else { - calld->state = CALL_WAITING; - calld->s.waiting_op.bind_pollset = NULL; - if (chand->active_child) { - /* channel is connected - use the connected stack */ - if (prepare_activate(elem, chand->active_child)) { - gpr_mu_unlock(&chand->mu); - /* activate the request (pass it down) outside the lock */ - complete_activate(elem, op); - } else { - gpr_mu_unlock(&chand->mu); - } - } else { - /* check to see if we should initiate a connection (if we're not - already), - but don't do so until outside the lock to avoid re-entrancy - problems if - the callback is immediate */ - int initiate_transport_setup = 0; - if (!chand->transport_setup_initiated) { - chand->transport_setup_initiated = 1; - initiate_transport_setup = 1; - } - /* add this call to the waiting set to be resumed once we have a child - channel stack, growing the waiting set if needed */ - if (chand->waiting_child_count == chand->waiting_child_capacity) { - chand->waiting_child_capacity = - GPR_MAX(chand->waiting_child_capacity * 2, 8); - chand->waiting_children = gpr_realloc( - chand->waiting_children, - chand->waiting_child_capacity * sizeof(call_data *)); - } - calld->s.waiting_op = *op; - chand->waiting_children[chand->waiting_child_count++] = calld; - grpc_transport_setup_add_interested_party(chand->transport_setup, - op->bind_pollset); - gpr_mu_unlock(&chand->mu); - - /* finally initiate transport setup if needed */ - if (initiate_transport_setup) { - grpc_transport_setup_initiate(chand->transport_setup); - } - } - } - break; - case CALL_WAITING: - if (op->cancel_with_status != GRPC_STATUS_OK) { - waiting_op = calld->s.waiting_op; - remove_waiting_child(chand, calld); - calld->state = CALL_CANCELLED; - gpr_mu_unlock(&chand->mu); - handle_op_after_cancellation(elem, &waiting_op); - handle_op_after_cancellation(elem, op); - } else { - GPR_ASSERT((calld->s.waiting_op.send_ops == NULL) != - (op->send_ops == NULL)); - GPR_ASSERT((calld->s.waiting_op.recv_ops == NULL) != - (op->recv_ops == NULL)); - if (op->send_ops) { - calld->s.waiting_op.send_ops = op->send_ops; - calld->s.waiting_op.is_last_send = op->is_last_send; - calld->s.waiting_op.on_done_send = op->on_done_send; - } - if (op->recv_ops) { - calld->s.waiting_op.recv_ops = op->recv_ops; - calld->s.waiting_op.recv_state = op->recv_state; - calld->s.waiting_op.on_done_recv = op->on_done_recv; - } - gpr_mu_unlock(&chand->mu); - if (op->on_consumed) { - op->on_consumed->cb(op->on_consumed->cb_arg, 0); - } - } - break; - case CALL_CANCELLED: - gpr_mu_unlock(&chand->mu); - handle_op_after_cancellation(elem, op); - break; - } -#endif +static void cc_start_transport_stream_op(grpc_call_element *elem, + grpc_transport_stream_op *op) { + perform_transport_stream_op(elem, op, 0); } static void update_state_locked(channel_data *chand) { - + gpr_log(GPR_ERROR, "update_state_locked not implemented"); } static void cc_on_config_changed(void *arg, int iomgr_success) { @@ -382,9 +388,10 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { if (chand->incoming_configuration) { lb_policy = grpc_client_config_get_lb_policy(chand->incoming_configuration); grpc_lb_policy_ref(lb_policy); + + grpc_client_config_unref(chand->incoming_configuration); } - grpc_client_config_unref(chand->incoming_configuration); chand->incoming_configuration = NULL; gpr_mu_lock(&chand->mu_config); @@ -402,7 +409,9 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { wakeup_closures = next; } - grpc_lb_policy_unref(old_lb_policy); + if (old_lb_policy) { + grpc_lb_policy_unref(old_lb_policy); + } if (iomgr_success) { grpc_resolver_next(chand->resolver, &chand->incoming_configuration, &chand->on_config_changed); @@ -511,6 +520,7 @@ static void init_call_elem(grpc_call_element *elem, GPR_ASSERT(elem->filter == &grpc_client_channel_filter); GPR_ASSERT(server_transport_data == NULL); + gpr_mu_init(&calld->mu_state); calld->elem = elem; calld->state = CALL_CREATED; calld->deadline = gpr_inf_future; @@ -527,7 +537,7 @@ static void destroy_call_elem(grpc_call_element *elem) { gpr_mu_lock(&calld->mu_state); switch (calld->state) { case CALL_ACTIVE: - subchannel_call = calld->s.active.subchannel_call; + subchannel_call = calld->subchannel_call; gpr_mu_unlock(&calld->mu_state); grpc_subchannel_call_unref(subchannel_call); break; @@ -537,6 +547,7 @@ static void destroy_call_elem(grpc_call_element *elem) { break; case CALL_WAITING_FOR_PICK: case CALL_WAITING_FOR_CONFIG: + case CALL_WAITING_FOR_CALL: gpr_log(GPR_ERROR, "should never reach here"); abort(); break; @@ -550,12 +561,12 @@ static void init_channel_elem(grpc_channel_element *elem, int is_last) { channel_data *chand = elem->channel_data; - GPR_ASSERT(!is_first); + memset(chand, 0, sizeof(*chand)); + GPR_ASSERT(is_last); GPR_ASSERT(elem->filter == &grpc_client_channel_filter); gpr_mu_init(&chand->mu_config); - chand->resolver = NULL; chand->mdctx = metadata_context; grpc_iomgr_closure_init(&chand->on_config_changed, cc_on_config_changed, chand); } @@ -633,7 +644,7 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete( call_ops = gpr_malloc(sizeof(*call_ops) * waiting_child_count); for (i = 0; i < waiting_child_count; i++) { - call_ops[i] = waiting_children[i]->s.waiting_op; + call_ops[i] = waiting_children[i]->waiting_op; if (!prepare_activate(waiting_children[i]->elem, chand->active_child)) { waiting_children[i] = NULL; grpc_transport_stream_op_finish_with_failure(&call_ops[i]); diff --git a/src/core/client_config/client_config.c b/src/core/client_config/client_config.c index e7d7647b88..bc8dcec54e 100644 --- a/src/core/client_config/client_config.c +++ b/src/core/client_config/client_config.c @@ -61,7 +61,7 @@ void grpc_client_config_unref(grpc_client_config *c) { void grpc_client_config_set_lb_policy(grpc_client_config *c, grpc_lb_policy *lb_policy) { if (lb_policy) { - grpc_lb_policy_ref(c->lb_policy); + grpc_lb_policy_ref(lb_policy); } if (c->lb_policy) { grpc_lb_policy_unref(c->lb_policy); diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 46732b8444..83a25a9a72 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -32,3 +32,172 @@ */ #include "src/core/client_config/lb_policies/pick_first.h" + +#include <string.h> + +#include <grpc/support/alloc.h> + +typedef struct pending_pick { + struct pending_pick *next; + grpc_pollset *pollset; + grpc_subchannel **target; + grpc_iomgr_closure *on_complete; +} pending_pick; + +typedef struct { + /** base policy: must be first */ + grpc_lb_policy base; + /** ref count */ + gpr_refcount refs; + /** all our subchannels */ + grpc_subchannel **subchannels; + size_t num_subchannels; + + grpc_iomgr_closure connectivity_changed; + + /** mutex protecting remaining members */ + gpr_mu mu; + /** the selected channel + TODO(ctiller): this should be atomically set so we don't + need to take a mutex in the common case */ + grpc_subchannel *selected; + /** have we started picking? */ + int started_picking; + /** which subchannel are we watching? */ + size_t checking_subchannel; + /** what is the connectivity of that channel? */ + grpc_connectivity_state checking_connectivity; + /** list of picks that are waiting on connectivity */ + pending_pick *pending_picks; +} pick_first_lb_policy; + +void pf_ref(grpc_lb_policy *pol) { + pick_first_lb_policy *p = (pick_first_lb_policy *)pol; + gpr_ref(&p->refs); +} + +void pf_unref(grpc_lb_policy *pol) { + pick_first_lb_policy *p = (pick_first_lb_policy *)pol; + if (gpr_unref(&p->refs)) { + gpr_free(p->subchannels); + gpr_mu_destroy(&p->mu); + gpr_free(p); + } +} + +void pf_shutdown(grpc_lb_policy *pol) { + /* pick_first_lb_policy *p = (pick_first_lb_policy*)pol; */ + abort(); +} + +void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset, + grpc_metadata_batch *initial_metadata, grpc_subchannel **target, + grpc_iomgr_closure *on_complete) { + pick_first_lb_policy *p = (pick_first_lb_policy*)pol; + pending_pick *pp; + gpr_mu_lock(&p->mu); + if (p->selected) { + gpr_mu_unlock(&p->mu); + *target = p->selected; + on_complete->cb(on_complete->cb_arg, 1); + } else { + if (!p->started_picking) { + p->started_picking = 1; + p->checking_subchannel = 0; + p->checking_connectivity = GRPC_CHANNEL_IDLE; + pf_ref(pol); + grpc_subchannel_notify_on_state_change(p->subchannels[0], &p->checking_connectivity, &p->connectivity_changed); + } + grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel], pollset); + pp = gpr_malloc(sizeof(*pp)); + pp->next = p->pending_picks; + pp->pollset = pollset; + pp->target = target; + pp->on_complete = on_complete; + p->pending_picks = pp; + gpr_mu_unlock(&p->mu); + } +} + +static void del_interested_parties_locked(pick_first_lb_policy *p) { + pending_pick *pp; + for (pp = p->pending_picks; pp; pp = pp->next) { + grpc_subchannel_del_interested_party(p->subchannels[p->checking_subchannel], pp->pollset); + } +} + +static void add_interested_parties_locked(pick_first_lb_policy *p) { + pending_pick *pp; + for (pp = p->pending_picks; pp; pp = pp->next) { + grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel], pp->pollset); + } +} + +static void pf_connectivity_changed(void *arg, int iomgr_success) { + pick_first_lb_policy *p = arg; + pending_pick *pp; + int unref = 0; + + gpr_mu_lock(&p->mu); +loop: + switch (p->checking_connectivity) { + case GRPC_CHANNEL_READY: + p->selected = p->subchannels[p->checking_connectivity]; + while ((pp = p->pending_picks)) { + p->pending_picks = pp->next; + *pp->target = p->selected; + grpc_subchannel_del_interested_party(p->selected, pp->pollset); + grpc_iomgr_add_delayed_callback(pp->on_complete, 1); + gpr_free(pp); + } + unref = 1; + break; + case GRPC_CHANNEL_TRANSIENT_FAILURE: + del_interested_parties_locked(p); + p->checking_subchannel = (p->checking_subchannel + 1) % p->num_subchannels; + p->checking_connectivity = grpc_subchannel_check_connectivity(p->subchannels[p->checking_subchannel]); + add_interested_parties_locked(p); + goto loop; + case GRPC_CHANNEL_CONNECTING: + case GRPC_CHANNEL_IDLE: + grpc_subchannel_notify_on_state_change(p->subchannels[p->checking_subchannel], &p->checking_connectivity, &p->connectivity_changed); + break; + case GRPC_CHANNEL_FATAL_FAILURE: + del_interested_parties_locked(p); + GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel], p->subchannels[p->num_subchannels - 1]); + p->checking_subchannel %= p->num_subchannels; + p->checking_connectivity = grpc_subchannel_check_connectivity(p->subchannels[p->checking_subchannel]); + p->num_subchannels--; + grpc_subchannel_unref(p->subchannels[p->num_subchannels]); + add_interested_parties_locked(p); + if (p->num_subchannels == 0) { + abort(); + } else { + goto loop; + } + } + gpr_mu_unlock(&p->mu); + + if (unref) { + pf_unref(&p->base); + } +} + +static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { + pf_ref, pf_unref, pf_shutdown, pf_pick}; + +grpc_lb_policy *grpc_create_pick_first_lb_policy(grpc_subchannel **subchannels, + size_t num_subchannels) { + pick_first_lb_policy *p = gpr_malloc(sizeof(*p)); + GPR_ASSERT(num_subchannels); + memset(p, 0, sizeof(*p)); + p->base.vtable = &pick_first_lb_policy_vtable; + gpr_ref_init(&p->refs, 1); + p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * num_subchannels); + p->num_subchannels = num_subchannels; + memcpy(p->subchannels, subchannels, + sizeof(grpc_subchannel *) * num_subchannels); + grpc_iomgr_closure_init(&p->connectivity_changed, pf_connectivity_changed, p); + gpr_mu_init(&p->mu); + return &p->base; +} diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h index 4e185d9086..42929e933b 100644 --- a/src/core/client_config/lb_policy.h +++ b/src/core/client_config/lb_policy.h @@ -45,7 +45,7 @@ typedef void (*grpc_lb_completion)(void *cb_arg, grpc_subchannel *subchannel, grpc_status_code status, const char *errmsg); struct grpc_lb_policy { - grpc_lb_policy_vtable *vtable; + const grpc_lb_policy_vtable *vtable; }; struct grpc_lb_policy_vtable { diff --git a/src/core/client_config/resolver_factory.c b/src/core/client_config/resolver_factory.c index 378529d5b2..6721977e21 100644 --- a/src/core/client_config/resolver_factory.c +++ b/src/core/client_config/resolver_factory.c @@ -45,5 +45,6 @@ void grpc_resolver_factory_unref(grpc_resolver_factory *factory) { grpc_resolver *grpc_resolver_factory_create_resolver( grpc_resolver_factory *factory, grpc_uri *uri, grpc_subchannel_factory *subchannel_factory) { + if (!factory) return NULL; return factory->vtable->create_resolver(factory, uri, subchannel_factory); } diff --git a/src/core/client_config/resolver_registry.c b/src/core/client_config/resolver_registry.c index 770d4aca8e..abdb5f9377 100644 --- a/src/core/client_config/resolver_registry.c +++ b/src/core/client_config/resolver_registry.c @@ -46,57 +46,76 @@ typedef struct { grpc_resolver_factory *factory; } registered_resolver; -static registered_resolver all_of_the_resolvers[MAX_RESOLVERS]; -static int number_of_resolvers = 0; +static registered_resolver g_all_of_the_resolvers[MAX_RESOLVERS]; +static int g_number_of_resolvers = 0; -void grpc_resolver_registry_init(grpc_resolver_factory *r) { - number_of_resolvers = 0; - grpc_register_resolver_type("default-grpc-resolver", r); +static char *g_default_resolver_scheme; + +void grpc_resolver_registry_init(const char *default_resolver_scheme) { + g_number_of_resolvers = 0; + g_default_resolver_scheme = gpr_strdup(default_resolver_scheme); } void grpc_resolver_registry_shutdown(void) { int i; - for (i = 0; i < number_of_resolvers; i++) { - gpr_free(all_of_the_resolvers[i].scheme); - grpc_resolver_factory_unref(all_of_the_resolvers[i].factory); + for (i = 0; i < g_number_of_resolvers; i++) { + gpr_free(g_all_of_the_resolvers[i].scheme); + grpc_resolver_factory_unref(g_all_of_the_resolvers[i].factory); } + gpr_free(g_default_resolver_scheme); } void grpc_register_resolver_type(const char *scheme, grpc_resolver_factory *factory) { int i; - for (i = 0; i < number_of_resolvers; i++) { - GPR_ASSERT(0 != strcmp(scheme, all_of_the_resolvers[i].scheme)); + for (i = 0; i < g_number_of_resolvers; i++) { + GPR_ASSERT(0 != strcmp(scheme, g_all_of_the_resolvers[i].scheme)); } - GPR_ASSERT(number_of_resolvers != MAX_RESOLVERS); - all_of_the_resolvers[number_of_resolvers].scheme = gpr_strdup(scheme); + GPR_ASSERT(g_number_of_resolvers != MAX_RESOLVERS); + g_all_of_the_resolvers[g_number_of_resolvers].scheme = gpr_strdup(scheme); grpc_resolver_factory_ref(factory); - all_of_the_resolvers[number_of_resolvers].factory = factory; - number_of_resolvers++; + g_all_of_the_resolvers[g_number_of_resolvers].factory = factory; + g_number_of_resolvers++; +} + +static grpc_resolver_factory *lookup_factory(grpc_uri *uri) { + int i; + + /* handling NULL uri's here simplifies grpc_resolver_create */ + if (!uri) return NULL; + + for (i = 0; i < g_number_of_resolvers; i++) { + if (0 == strcmp(uri->scheme, g_all_of_the_resolvers[i].scheme)) { + return g_all_of_the_resolvers[i].factory; + } + } + + return NULL; } grpc_resolver *grpc_resolver_create( const char *name, grpc_subchannel_factory *subchannel_factory) { grpc_uri *uri; - int i; char *tmp; - grpc_resolver *resolver = NULL; - if (grpc_has_scheme(name)) { - uri = grpc_uri_parse(name); - if (!uri) { - return NULL; - } - for (i = 0; i < number_of_resolvers; i++) { - if (0 == strcmp(all_of_the_resolvers[i].scheme, uri->scheme)) { - grpc_resolver_factory_create_resolver(all_of_the_resolvers[i].factory, - uri, subchannel_factory); - } + grpc_resolver_factory *factory = NULL; + grpc_resolver *resolver; + + uri = grpc_uri_parse(name); + factory = lookup_factory(uri); + if (factory == NULL && g_default_resolver_scheme != NULL) { + grpc_uri_destroy(uri); + gpr_asprintf(&tmp, "%s%s", g_default_resolver_scheme, name); + uri = grpc_uri_parse(tmp); + factory = lookup_factory(uri); + if (factory == NULL) { + gpr_log(GPR_ERROR, "don't know how to resolve '%s' or '%s'", name, tmp); } - } else { - gpr_asprintf(&tmp, "default-grpc-resolver:%s", name); - GPR_ASSERT(grpc_has_scheme(tmp)); - resolver = grpc_resolver_create(tmp, subchannel_factory); gpr_free(tmp); + } else if (factory == NULL) { + gpr_log(GPR_ERROR, "don't know how to resolve '%s'", name); } + resolver = + grpc_resolver_factory_create_resolver(factory, uri, subchannel_factory); + grpc_uri_destroy(uri); return resolver; } diff --git a/src/core/client_config/resolver_registry.h b/src/core/client_config/resolver_registry.h index 53335172da..31aa47620a 100644 --- a/src/core/client_config/resolver_registry.h +++ b/src/core/client_config/resolver_registry.h @@ -36,7 +36,7 @@ #include "src/core/client_config/resolver_factory.h" -void grpc_resolver_registry_init(grpc_resolver_factory *default_resolver); +void grpc_resolver_registry_init(const char *default_prefix); void grpc_resolver_registry_shutdown(void); /** Register a resolver type. @@ -47,7 +47,15 @@ void grpc_resolver_registry_shutdown(void); void grpc_register_resolver_type(const char *scheme, grpc_resolver_factory *factory); -/** Create a resolver given a \a uri string (with an optional scheme prefix) */ +/** Create a resolver given \a name. + First tries to parse \a name as a URI. If this succeeds, tries + to locate a registered resolver factory based on the URI scheme. + If parsing or location fails, prefixes default_prefix from + grpc_resolver_registry_init to name, and tries again (if default_prefix + was not NULL). + If a resolver factory was found, use it to instantiate a resolver and + return it. + If a resolver factory was not found, return NULL. */ grpc_resolver *grpc_resolver_create( const char *name, grpc_subchannel_factory *subchannel_factory); diff --git a/src/core/client_config/resolvers/dns_resolver.c b/src/core/client_config/resolvers/dns_resolver.c index 95f38ecab7..ba82675275 100644 --- a/src/core/client_config/resolvers/dns_resolver.c +++ b/src/core/client_config/resolvers/dns_resolver.c @@ -38,6 +38,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/string_util.h> +#include "src/core/client_config/lb_policies/pick_first.h" #include "src/core/iomgr/resolve_address.h" #include "src/core/support/string.h" @@ -177,6 +178,8 @@ static void dns_start_resolving_locked(dns_resolver *r) { static void dns_maybe_finish_next_locked(dns_resolver *r) { if (r->next_completion != NULL && r->resolved_version != r->published_version) { + *r->target_config = r->resolved_config; + grpc_client_config_ref(r->resolved_config); grpc_iomgr_add_callback(r->next_completion); r->next_completion = NULL; r->published_version = r->resolved_version; @@ -191,8 +194,11 @@ static void dns_destroy(dns_resolver *r) { gpr_free(r); } -static grpc_resolver *dns_create(grpc_uri *uri, const char *default_port, - grpc_subchannel_factory *subchannel_factory) { +static grpc_resolver *dns_create( + grpc_uri *uri, const char *default_port, + grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels, + size_t num_subchannels), + grpc_subchannel_factory *subchannel_factory) { dns_resolver *r; const char *path = uri->path; @@ -211,6 +217,7 @@ static grpc_resolver *dns_create(grpc_uri *uri, const char *default_port, r->name = gpr_strdup(path); r->default_port = gpr_strdup(default_port); r->subchannel_factory = subchannel_factory; + r->lb_policy_factory = lb_policy_factory; grpc_subchannel_factory_ref(subchannel_factory); return &r->base; } @@ -219,43 +226,21 @@ static grpc_resolver *dns_create(grpc_uri *uri, const char *default_port, * FACTORY */ -typedef struct { - /** base: must be first */ - grpc_resolver_factory base; - /** ref count */ - gpr_refcount refs; - /** default port */ - char *default_port; -} dns_resolver_factory; - -static void dns_factory_ref(grpc_resolver_factory *factory) { - dns_resolver_factory *f = (dns_resolver_factory *)factory; - gpr_ref(&f->refs); -} +static void dns_factory_ref(grpc_resolver_factory *factory) {} -static void dns_factory_unref(grpc_resolver_factory *factory) { - dns_resolver_factory *f = (dns_resolver_factory *)factory; - if (gpr_unref(&f->refs)) { - gpr_free(f->default_port); - gpr_free(f); - } -} +static void dns_factory_unref(grpc_resolver_factory *factory) {} static grpc_resolver *dns_factory_create_resolver( grpc_resolver_factory *factory, grpc_uri *uri, grpc_subchannel_factory *subchannel_factory) { - dns_resolver_factory *f = (dns_resolver_factory *)factory; - return dns_create(uri, f->default_port, subchannel_factory); + return dns_create(uri, "https", grpc_create_pick_first_lb_policy, + subchannel_factory); } static const grpc_resolver_factory_vtable dns_factory_vtable = { dns_factory_ref, dns_factory_unref, dns_factory_create_resolver}; +static grpc_resolver_factory dns_resolver_factory = {&dns_factory_vtable}; -grpc_resolver_factory *grpc_dns_resolver_factory_create( - const char *default_port) { - dns_resolver_factory *f = gpr_malloc(sizeof(*f)); - memset(f, 0, sizeof(*f)); - f->base.vtable = &dns_factory_vtable; - f->default_port = gpr_strdup(default_port); - return &f->base; +grpc_resolver_factory *grpc_dns_resolver_factory_create() { + return &dns_resolver_factory; } diff --git a/src/core/client_config/resolvers/dns_resolver.h b/src/core/client_config/resolvers/dns_resolver.h index 9881448fd8..09322b16cf 100644 --- a/src/core/client_config/resolvers/dns_resolver.h +++ b/src/core/client_config/resolvers/dns_resolver.h @@ -37,7 +37,6 @@ #include "src/core/client_config/resolver_factory.h" /** Create a dns resolver for \a name */ -grpc_resolver_factory *grpc_dns_resolver_factory_create( - const char *default_port); +grpc_resolver_factory *grpc_dns_resolver_factory_create(void); #endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_RESOLVERS_DNS_RESOLVER_H */ diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 037f0c0ab0..9637cf39fe 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -37,6 +37,13 @@ #include <grpc/support/alloc.h> +#include "src/core/channel/channel_args.h" + +typedef struct { + gpr_refcount refs; + grpc_subchannel *subchannel; +} connection; + struct grpc_subchannel { gpr_refcount refs; grpc_connector *connector; @@ -49,24 +56,45 @@ struct grpc_subchannel { /** address to connect to */ struct sockaddr *addr; size_t addr_len; + + /** set during connection */ + grpc_transport *connecting_transport; + + /** callback for connection finishing */ + grpc_iomgr_closure connected; + + /** mutex protecting remaining elements */ + gpr_mu mu; + + /** active connection */ + connection *active; + /** are we connecting */ + int connecting; + /** closures waiting for a connection */ + grpc_iomgr_closure *waiting; }; struct grpc_subchannel_call { - grpc_subchannel *subchannel; + connection *connection; gpr_refcount refs; }; #define SUBCHANNEL_CALL_TO_CALL_STACK(call) (((grpc_call_stack *)(call)) + 1) +static grpc_subchannel_call *create_call(connection *con, grpc_transport_stream_op *initial_op); + /* * grpc_subchannel implementation */ -void grpc_subchannel_ref(grpc_subchannel *channel) { gpr_ref(&channel->refs); } +void grpc_subchannel_ref(grpc_subchannel *c) { gpr_ref(&c->refs); } -void grpc_subchannel_unref(grpc_subchannel *channel) { - if (gpr_unref(&channel->refs)) { - gpr_free(channel); +void grpc_subchannel_unref(grpc_subchannel *c) { + if (gpr_unref(&c->refs)) { + gpr_free(c->filters); + grpc_channel_args_destroy(c->args); + gpr_free(c->addr); + gpr_free(c); } } @@ -84,9 +112,39 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, c->addr = gpr_malloc(args->addr_len); memcpy(c->addr, args->addr, args->addr_len); c->addr_len = args->addr_len; + c->args = grpc_channel_args_copy(args->args); + gpr_mu_init(&c->mu); return c; } +void grpc_subchannel_create_call(grpc_subchannel *c, + grpc_mdctx *mdctx, + grpc_transport_stream_op *initial_op, + grpc_subchannel_call **target, + grpc_iomgr_closure *notify) { + connection *con; + gpr_mu_lock(&c->mu); + if (c->active != NULL) { + con = c->active; + gpr_ref(&con->refs); + gpr_mu_unlock(&c->mu); + + *target = create_call(con, initial_op); + notify->cb(notify->cb_arg, 1); + } else { + notify->next = c->waiting; + c->waiting = notify; + if (!c->connecting) { + c->connecting = 1; + gpr_mu_unlock(&c->mu); + + grpc_connector_connect(c->connector, c->args, mdctx, &c->connecting_transport, &c->connected); + } else { + gpr_mu_unlock(&c->mu); + } + } +} + /* * grpc_subchannel_call implementation */ @@ -98,7 +156,9 @@ void grpc_subchannel_call_ref(grpc_subchannel_call *call) { void grpc_subchannel_call_unref(grpc_subchannel_call *call) { if (gpr_unref(&call->refs)) { grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(call)); - grpc_subchannel_unref(call->subchannel); + if (gpr_unref(&call->connection->refs)) { + gpr_free(call->connection); + } gpr_free(call); } } @@ -109,3 +169,8 @@ void grpc_subchannel_call_process_op(grpc_subchannel_call *call, grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0); top_elem->filter->start_transport_stream_op(top_elem, op); } + +grpc_subchannel_call *create_call(connection *con, grpc_transport_stream_op *initial_op) { + abort(); + return NULL; +} diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index b4d40eadfa..8b3d82eb0a 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -59,9 +59,12 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *channel, grpc_connectivity_state *state, grpc_iomgr_closure *notify); +void grpc_subchannel_add_interested_party(grpc_subchannel *channel, grpc_pollset *pollset); +void grpc_subchannel_del_interested_party(grpc_subchannel *channel, grpc_pollset *pollset); + /** construct a call (possibly asynchronously) */ void grpc_subchannel_create_call(grpc_subchannel *subchannel, - grpc_call_element *parent, + grpc_mdctx *mdctx, grpc_transport_stream_op *initial_op, grpc_subchannel_call **target, grpc_iomgr_closure *notify); diff --git a/src/core/client_config/subchannel_factory.c b/src/core/client_config/subchannel_factory.c index 06e8a4bdaf..f71386594c 100644 --- a/src/core/client_config/subchannel_factory.c +++ b/src/core/client_config/subchannel_factory.c @@ -32,3 +32,15 @@ */ #include "src/core/client_config/subchannel_factory.h" + +void grpc_subchannel_factory_ref(grpc_subchannel_factory *factory) { + factory->vtable->ref(factory); +} +void grpc_subchannel_factory_unref(grpc_subchannel_factory *factory) { + factory->vtable->unref(factory); +} + +grpc_subchannel *grpc_subchannel_factory_create_subchannel( + grpc_subchannel_factory *factory, grpc_subchannel_args *args) { + return factory->vtable->create_subchannel(factory, args); +} diff --git a/src/core/client_config/uri_parser.c b/src/core/client_config/uri_parser.c index 24dfaae497..43b5b47f55 100644 --- a/src/core/client_config/uri_parser.c +++ b/src/core/client_config/uri_parser.c @@ -64,30 +64,6 @@ static char *copy_fragment(const char *src, int begin, int end) { return out; } -int grpc_has_scheme(const char *uri_text) { - int scheme_begin = 0; - int scheme_end = -1; - int i; - - for (i = scheme_begin; uri_text[i] != 0; i++) { - if (uri_text[i] == ':') { - scheme_end = i; - break; - } - if (uri_text[i] >= 'a' && uri_text[i] <= 'z') continue; - if (uri_text[i] >= 'A' && uri_text[i] <= 'Z') continue; - if (i != scheme_begin) { - if (uri_text[i] >= '0' && uri_text[i] <= '9') continue; - if (uri_text[i] == '+') continue; - if (uri_text[i] == '-') continue; - if (uri_text[i] == '.') continue; - } - break; - } - - return scheme_end != -1; -} - grpc_uri *grpc_uri_parse(const char *uri_text) { grpc_uri *uri; int scheme_begin = 0; @@ -162,6 +138,7 @@ grpc_uri *grpc_uri_parse(const char *uri_text) { } void grpc_uri_destroy(grpc_uri *uri) { + if (!uri) return; gpr_free(uri->scheme); gpr_free(uri->authority); gpr_free(uri->path); diff --git a/src/core/client_config/uri_parser.h b/src/core/client_config/uri_parser.h index 0e21d568a9..b6821f9621 100644 --- a/src/core/client_config/uri_parser.h +++ b/src/core/client_config/uri_parser.h @@ -43,9 +43,6 @@ typedef struct { /** parse a uri, return NULL on failure */ grpc_uri *grpc_uri_parse(const char *uri_text); -/** return 1 if uri_text has something that is likely a scheme, 0 otherwise */ -int grpc_has_scheme(const char *uri_text); - /** destroy a uri */ void grpc_uri_destroy(grpc_uri *uri); diff --git a/src/core/surface/init.c b/src/core/surface/init.c index ca61a38a35..03add81466 100644 --- a/src/core/surface/init.c +++ b/src/core/surface/init.c @@ -34,6 +34,8 @@ #include <grpc/census.h> #include <grpc/grpc.h> #include "src/core/channel/channel_stack.h" +#include "src/core/client_config/resolver_registry.h" +#include "src/core/client_config/resolvers/dns_resolver.h" #include "src/core/debug/trace.h" #include "src/core/iomgr/iomgr.h" #include "src/core/profiling/timers.h" @@ -56,6 +58,8 @@ void grpc_init(void) { gpr_mu_lock(&g_init_mu); if (++g_initializations == 1) { + grpc_resolver_registry_init("dns:///"); + grpc_register_resolver_type("dns", grpc_dns_resolver_factory_create()); grpc_register_tracer("channel", &grpc_trace_channel); grpc_register_tracer("surface", &grpc_surface_trace); grpc_register_tracer("http", &grpc_http_trace); diff --git a/test/core/end2end/fixtures/chttp2_fullstack.c b/test/core/end2end/fixtures/chttp2_fullstack.c index e647434509..8a1530e63b 100644 --- a/test/core/end2end/fixtures/chttp2_fullstack.c +++ b/test/core/end2end/fixtures/chttp2_fullstack.c @@ -73,6 +73,7 @@ void chttp2_init_client_fullstack(grpc_end2end_test_fixture *f, grpc_channel_args *client_args) { fullstack_fixture_data *ffd = f->fixture_data; f->client = grpc_channel_create(ffd->localaddr, client_args); + GPR_ASSERT(f->client); } void chttp2_init_server_fullstack(grpc_end2end_test_fixture *f, |