aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/channel/client_channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/channel/client_channel.c')
-rw-r--r--src/core/channel/client_channel.c801
1 files changed, 434 insertions, 367 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 711e105464..f890f99237 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -34,13 +34,15 @@
#include "src/core/channel/client_channel.h"
#include <stdio.h>
+#include <string.h>
#include "src/core/channel/channel_args.h"
-#include "src/core/channel/child_channel.h"
#include "src/core/channel/connected_channel.h"
+#include "src/core/surface/channel.h"
#include "src/core/iomgr/iomgr.h"
#include "src/core/iomgr/pollset_set.h"
#include "src/core/support/string.h"
+#include "src/core/transport/connectivity_state.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
@@ -51,31 +53,38 @@
typedef struct call_data call_data;
typedef struct {
- /* protects children, child_count, child_capacity, active_child,
- transport_setup_initiated
- does not protect channel stacks held by children
- transport_setup is assumed to be set once during construction */
- gpr_mu mu;
-
- /* the sending child (may be null) */
- grpc_child_channel *active_child;
+ /** metadata context for this channel */
grpc_mdctx *mdctx;
-
- /* calls waiting for a channel to be ready */
- call_data **waiting_children;
- size_t waiting_child_count;
- size_t waiting_child_capacity;
-
- /* transport setup for this channel */
- grpc_transport_setup *transport_setup;
- int transport_setup_initiated;
-
- grpc_channel_args *args;
+ /** resolver for this channel */
+ grpc_resolver *resolver;
+ /** master channel - the grpc_channel instance that ultimately owns
+ this channel_data via its channel stack.
+ We occasionally use this to bump the refcount on the master channel
+ to keep ourselves alive through an asynchronous operation. */
+ grpc_channel *master;
+
+ /** mutex protecting client configuration, including all
+ variables below in this data structure */
+ gpr_mu mu_config;
+ /** currently active load balancer - guarded by mu_config */
+ grpc_lb_policy *lb_policy;
+ /** incoming configuration - set by resolver.next
+ guarded by mu_config */
+ grpc_client_config *incoming_configuration;
+ /** a list of closures that are all waiting for config to come in */
+ grpc_iomgr_closure *waiting_for_config_closures;
+ /** resolver callback */
+ grpc_iomgr_closure on_config_changed;
+ /** connectivity state being tracked */
+ grpc_connectivity_state_tracker state_tracker;
} channel_data;
typedef enum {
CALL_CREATED,
- CALL_WAITING,
+ CALL_WAITING_FOR_SEND,
+ CALL_WAITING_FOR_CONFIG,
+ CALL_WAITING_FOR_PICK,
+ CALL_WAITING_FOR_CALL,
CALL_ACTIVE,
CALL_CANCELLED
} call_state;
@@ -84,75 +93,25 @@ struct call_data {
/* owning element */
grpc_call_element *elem;
+ gpr_mu mu_state;
+
call_state state;
gpr_timespec deadline;
- union {
- struct {
- /* our child call stack */
- grpc_child_call *child_call;
- } active;
- grpc_transport_op waiting_op;
- struct {
- grpc_linked_mdelem status;
- grpc_linked_mdelem details;
- } cancelled;
- } s;
+ grpc_subchannel *picked_channel;
+ grpc_iomgr_closure async_setup_task;
+ grpc_transport_stream_op waiting_op;
+ /* our child call stack */
+ grpc_subchannel_call *subchannel_call;
+ grpc_linked_mdelem status;
+ grpc_linked_mdelem details;
};
-static int prepare_activate(grpc_call_element *elem,
- grpc_child_channel *on_child) {
- call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
- if (calld->state == CALL_CANCELLED) return 0;
-
- /* no more access to calld->s.waiting allowed */
- GPR_ASSERT(calld->state == CALL_WAITING);
-
- if (calld->s.waiting_op.bind_pollset) {
- grpc_transport_setup_del_interested_party(chand->transport_setup,
- calld->s.waiting_op.bind_pollset);
- }
-
- calld->state = CALL_ACTIVE;
-
- /* create a child call */
- /* TODO(ctiller): pass the waiting op down here */
- calld->s.active.child_call =
- grpc_child_channel_create_call(on_child, elem, NULL);
-
- return 1;
-}
-
-static void complete_activate(grpc_call_element *elem, grpc_transport_op *op) {
- call_data *calld = elem->call_data;
- grpc_call_element *child_elem =
- grpc_child_call_get_top_element(calld->s.active.child_call);
-
- GPR_ASSERT(calld->state == CALL_ACTIVE);
-
- /* continue the start call down the stack, this nees to happen after metadata
- are flushed*/
- child_elem->filter->start_transport_op(child_elem, op);
-}
-
-static void remove_waiting_child(channel_data *chand, call_data *calld) {
- size_t new_count;
- size_t i;
- for (i = 0, new_count = 0; i < chand->waiting_child_count; i++) {
- if (chand->waiting_children[i] == calld) {
- grpc_transport_setup_del_interested_party(
- chand->transport_setup, calld->s.waiting_op.bind_pollset);
- continue;
- }
- chand->waiting_children[new_count++] = chand->waiting_children[i];
- }
- GPR_ASSERT(new_count == chand->waiting_child_count - 1 ||
- new_count == chand->waiting_child_count);
- chand->waiting_child_count = new_count;
-}
+static grpc_iomgr_closure *merge_into_waiting_op(
+ grpc_call_element *elem,
+ grpc_transport_stream_op *new_op) GRPC_MUST_USE_RESULT;
static void handle_op_after_cancellation(grpc_call_element *elem,
- grpc_transport_op *op) {
+ grpc_transport_stream_op *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
if (op->send_ops) {
@@ -163,15 +122,15 @@ static void handle_op_after_cancellation(grpc_call_element *elem,
char status[GPR_LTOA_MIN_BUFSIZE];
grpc_metadata_batch mdb;
gpr_ltoa(GRPC_STATUS_CANCELLED, status);
- calld->s.cancelled.status.md =
+ calld->status.md =
grpc_mdelem_from_strings(chand->mdctx, "grpc-status", status);
- calld->s.cancelled.details.md =
+ calld->details.md =
grpc_mdelem_from_strings(chand->mdctx, "grpc-message", "Cancelled");
- calld->s.cancelled.status.prev = calld->s.cancelled.details.next = NULL;
- calld->s.cancelled.status.next = &calld->s.cancelled.details;
- calld->s.cancelled.details.prev = &calld->s.cancelled.status;
- mdb.list.head = &calld->s.cancelled.status;
- mdb.list.tail = &calld->s.cancelled.details;
+ calld->status.prev = calld->details.next = NULL;
+ calld->status.next = &calld->details;
+ calld->details.prev = &calld->status;
+ mdb.list.head = &calld->status;
+ mdb.list.tail = &calld->details;
mdb.garbage.head = mdb.garbage.tail = NULL;
mdb.deadline = gpr_inf_future;
grpc_sopb_add_metadata(op->recv_ops, mdb);
@@ -183,192 +142,372 @@ static void handle_op_after_cancellation(grpc_call_element *elem,
}
}
-static void cc_start_transport_op(grpc_call_element *elem,
- grpc_transport_op *op) {
+typedef struct {
+ grpc_iomgr_closure closure;
+ grpc_call_element *elem;
+} waiting_call;
+
+static void perform_transport_stream_op(grpc_call_element *elem,
+ grpc_transport_stream_op *op,
+ int continuation);
+
+static void continue_with_pick(void *arg, int iomgr_success) {
+ waiting_call *wc = arg;
+ call_data *calld = wc->elem->call_data;
+ perform_transport_stream_op(wc->elem, &calld->waiting_op, 1);
+ gpr_free(wc);
+}
+
+static void add_to_lb_policy_wait_queue_locked_state_config(
+ grpc_call_element *elem) {
+ channel_data *chand = elem->channel_data;
+ waiting_call *wc = gpr_malloc(sizeof(*wc));
+ grpc_iomgr_closure_init(&wc->closure, continue_with_pick, wc);
+ wc->elem = elem;
+ wc->closure.next = chand->waiting_for_config_closures;
+ chand->waiting_for_config_closures = &wc->closure;
+}
+
+static int is_empty(void *p, int len) {
+ char *ptr = p;
+ int i;
+ for (i = 0; i < len; i++) {
+ if (ptr[i] != 0) return 0;
+ }
+ return 1;
+}
+
+static void started_call(void *arg, int iomgr_success) {
+ call_data *calld = arg;
+ grpc_transport_stream_op op;
+ int have_waiting;
+
+ gpr_mu_lock(&calld->mu_state);
+ if (calld->state == CALL_CANCELLED && calld->subchannel_call != NULL) {
+ memset(&op, 0, sizeof(op));
+ op.cancel_with_status = GRPC_STATUS_CANCELLED;
+ gpr_mu_unlock(&calld->mu_state);
+ grpc_subchannel_call_process_op(calld->subchannel_call, &op);
+ } else if (calld->state == CALL_WAITING_FOR_CALL) {
+ have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op));
+ if (calld->subchannel_call != NULL) {
+ calld->state = CALL_ACTIVE;
+ gpr_mu_unlock(&calld->mu_state);
+ if (have_waiting) {
+ grpc_subchannel_call_process_op(calld->subchannel_call,
+ &calld->waiting_op);
+ }
+ } else {
+ calld->state = CALL_CANCELLED;
+ gpr_mu_unlock(&calld->mu_state);
+ if (have_waiting) {
+ handle_op_after_cancellation(calld->elem, &calld->waiting_op);
+ }
+ }
+ } else {
+ GPR_ASSERT(calld->state == CALL_CANCELLED);
+ gpr_mu_unlock(&calld->mu_state);
+ }
+}
+
+static void picked_target(void *arg, int iomgr_success) {
+ call_data *calld = arg;
+ grpc_pollset *pollset;
+
+ if (calld->picked_channel == NULL) {
+ /* treat this like a cancellation */
+ calld->waiting_op.cancel_with_status = GRPC_STATUS_UNAVAILABLE;
+ perform_transport_stream_op(calld->elem, &calld->waiting_op, 1);
+ } else {
+ gpr_mu_lock(&calld->mu_state);
+ if (calld->state == CALL_CANCELLED) {
+ gpr_mu_unlock(&calld->mu_state);
+ handle_op_after_cancellation(calld->elem, &calld->waiting_op);
+ } else {
+ GPR_ASSERT(calld->state == CALL_WAITING_FOR_PICK);
+ calld->state = CALL_WAITING_FOR_CALL;
+ pollset = calld->waiting_op.bind_pollset;
+ gpr_mu_unlock(&calld->mu_state);
+ grpc_iomgr_closure_init(&calld->async_setup_task, started_call, calld);
+ grpc_subchannel_create_call(calld->picked_channel, pollset,
+ &calld->subchannel_call,
+ &calld->async_setup_task);
+ }
+ }
+}
+
+static void pick_target(grpc_lb_policy *lb_policy, call_data *calld) {
+ grpc_metadata_batch *initial_metadata;
+ grpc_transport_stream_op *op = &calld->waiting_op;
+
+ GPR_ASSERT(op->bind_pollset);
+ GPR_ASSERT(op->send_ops);
+ GPR_ASSERT(op->send_ops->nops >= 1);
+ GPR_ASSERT(op->send_ops->ops[0].type == GRPC_OP_METADATA);
+ initial_metadata = &op->send_ops->ops[0].data.metadata;
+
+ grpc_iomgr_closure_init(&calld->async_setup_task, picked_target, calld);
+ grpc_lb_policy_pick(lb_policy, op->bind_pollset, initial_metadata,
+ &calld->picked_channel, &calld->async_setup_task);
+}
+
+static grpc_iomgr_closure *merge_into_waiting_op(
+ grpc_call_element *elem, grpc_transport_stream_op *new_op) {
+ call_data *calld = elem->call_data;
+ grpc_iomgr_closure *consumed_op = NULL;
+ grpc_transport_stream_op *waiting_op = &calld->waiting_op;
+ GPR_ASSERT((waiting_op->send_ops != NULL) + (new_op->send_ops != NULL) <= 1);
+ GPR_ASSERT((waiting_op->recv_ops != NULL) + (new_op->recv_ops != NULL) <= 1);
+ if (new_op->send_ops != NULL) {
+ waiting_op->send_ops = new_op->send_ops;
+ waiting_op->is_last_send = new_op->is_last_send;
+ waiting_op->on_done_send = new_op->on_done_send;
+ }
+ if (new_op->recv_ops != NULL) {
+ waiting_op->recv_ops = new_op->recv_ops;
+ waiting_op->recv_state = new_op->recv_state;
+ waiting_op->on_done_recv = new_op->on_done_recv;
+ }
+ if (new_op->on_consumed != NULL) {
+ if (waiting_op->on_consumed != NULL) {
+ consumed_op = waiting_op->on_consumed;
+ }
+ waiting_op->on_consumed = new_op->on_consumed;
+ }
+ if (new_op->cancel_with_status != GRPC_STATUS_OK) {
+ waiting_op->cancel_with_status = new_op->cancel_with_status;
+ }
+ return consumed_op;
+}
+
+static void perform_transport_stream_op(grpc_call_element *elem,
+ grpc_transport_stream_op *op,
+ int continuation) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- grpc_call_element *child_elem;
- grpc_transport_op waiting_op;
+ grpc_subchannel_call *subchannel_call;
+ grpc_lb_policy *lb_policy;
+ grpc_transport_stream_op op2;
+ grpc_iomgr_closure *consumed_op = NULL;
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
- gpr_mu_lock(&chand->mu);
+ gpr_mu_lock(&calld->mu_state);
switch (calld->state) {
case CALL_ACTIVE:
- child_elem = grpc_child_call_get_top_element(calld->s.active.child_call);
- gpr_mu_unlock(&chand->mu);
- child_elem->filter->start_transport_op(child_elem, op);
+ GPR_ASSERT(!continuation);
+ subchannel_call = calld->subchannel_call;
+ gpr_mu_unlock(&calld->mu_state);
+ grpc_subchannel_call_process_op(subchannel_call, op);
break;
- case CALL_CREATED:
- if (op->cancel_with_status != GRPC_STATUS_OK) {
- calld->state = CALL_CANCELLED;
- gpr_mu_unlock(&chand->mu);
- handle_op_after_cancellation(elem, op);
- } else {
- calld->state = CALL_WAITING;
- calld->s.waiting_op.bind_pollset = NULL;
- if (chand->active_child) {
- /* channel is connected - use the connected stack */
- if (prepare_activate(elem, chand->active_child)) {
- gpr_mu_unlock(&chand->mu);
- /* activate the request (pass it down) outside the lock */
- complete_activate(elem, op);
- } else {
- gpr_mu_unlock(&chand->mu);
+ case CALL_CANCELLED:
+ gpr_mu_unlock(&calld->mu_state);
+ handle_op_after_cancellation(elem, op);
+ break;
+ case CALL_WAITING_FOR_SEND:
+ GPR_ASSERT(!continuation);
+ consumed_op = merge_into_waiting_op(elem, op);
+ if (!calld->waiting_op.send_ops &&
+ calld->waiting_op.cancel_with_status == GRPC_STATUS_OK) {
+ gpr_mu_unlock(&calld->mu_state);
+ break;
+ }
+ *op = calld->waiting_op;
+ memset(&calld->waiting_op, 0, sizeof(calld->waiting_op));
+ continuation = 1;
+ /* fall through */
+ case CALL_WAITING_FOR_CONFIG:
+ case CALL_WAITING_FOR_PICK:
+ case CALL_WAITING_FOR_CALL:
+ if (!continuation) {
+ if (op->cancel_with_status != GRPC_STATUS_OK) {
+ calld->state = CALL_CANCELLED;
+ op2 = calld->waiting_op;
+ memset(&calld->waiting_op, 0, sizeof(calld->waiting_op));
+ if (op->on_consumed) {
+ calld->waiting_op.on_consumed = op->on_consumed;
+ op->on_consumed = NULL;
+ } else if (op2.on_consumed) {
+ calld->waiting_op.on_consumed = op2.on_consumed;
+ op2.on_consumed = NULL;
}
+ gpr_mu_unlock(&calld->mu_state);
+ handle_op_after_cancellation(elem, op);
+ handle_op_after_cancellation(elem, &op2);
} else {
- /* check to see if we should initiate a connection (if we're not
- already),
- but don't do so until outside the lock to avoid re-entrancy
- problems if
- the callback is immediate */
- int initiate_transport_setup = 0;
- if (!chand->transport_setup_initiated) {
- chand->transport_setup_initiated = 1;
- initiate_transport_setup = 1;
- }
- /* add this call to the waiting set to be resumed once we have a child
- channel stack, growing the waiting set if needed */
- if (chand->waiting_child_count == chand->waiting_child_capacity) {
- chand->waiting_child_capacity =
- GPR_MAX(chand->waiting_child_capacity * 2, 8);
- chand->waiting_children = gpr_realloc(
- chand->waiting_children,
- chand->waiting_child_capacity * sizeof(call_data *));
- }
- calld->s.waiting_op = *op;
- chand->waiting_children[chand->waiting_child_count++] = calld;
- grpc_transport_setup_add_interested_party(chand->transport_setup,
- op->bind_pollset);
- gpr_mu_unlock(&chand->mu);
-
- /* finally initiate transport setup if needed */
- if (initiate_transport_setup) {
- grpc_transport_setup_initiate(chand->transport_setup);
- }
+ consumed_op = merge_into_waiting_op(elem, op);
+ gpr_mu_unlock(&calld->mu_state);
}
+ break;
}
- break;
- case CALL_WAITING:
+ /* fall through */
+ case CALL_CREATED:
if (op->cancel_with_status != GRPC_STATUS_OK) {
- waiting_op = calld->s.waiting_op;
- remove_waiting_child(chand, calld);
calld->state = CALL_CANCELLED;
- gpr_mu_unlock(&chand->mu);
- handle_op_after_cancellation(elem, &waiting_op);
+ gpr_mu_unlock(&calld->mu_state);
handle_op_after_cancellation(elem, op);
} else {
- GPR_ASSERT((calld->s.waiting_op.send_ops == NULL) !=
- (op->send_ops == NULL));
- GPR_ASSERT((calld->s.waiting_op.recv_ops == NULL) !=
- (op->recv_ops == NULL));
- if (op->send_ops) {
- calld->s.waiting_op.send_ops = op->send_ops;
- calld->s.waiting_op.is_last_send = op->is_last_send;
- calld->s.waiting_op.on_done_send = op->on_done_send;
- }
- if (op->recv_ops) {
- calld->s.waiting_op.recv_ops = op->recv_ops;
- calld->s.waiting_op.recv_state = op->recv_state;
- calld->s.waiting_op.on_done_recv = op->on_done_recv;
- }
- gpr_mu_unlock(&chand->mu);
- if (op->on_consumed) {
- op->on_consumed->cb(op->on_consumed->cb_arg, 0);
+ calld->waiting_op = *op;
+
+ if (op->send_ops == NULL) {
+ /* need to have some send ops before we can select the
+ lb target */
+ calld->state = CALL_WAITING_FOR_SEND;
+ gpr_mu_unlock(&calld->mu_state);
+ } else {
+ gpr_mu_lock(&chand->mu_config);
+ lb_policy = chand->lb_policy;
+ if (lb_policy) {
+ GRPC_LB_POLICY_REF(lb_policy, "pick");
+ gpr_mu_unlock(&chand->mu_config);
+ calld->state = CALL_WAITING_FOR_PICK;
+ gpr_mu_unlock(&calld->mu_state);
+
+ pick_target(lb_policy, calld);
+
+ GRPC_LB_POLICY_UNREF(lb_policy, "pick");
+ } else if (chand->resolver != NULL) {
+ calld->state = CALL_WAITING_FOR_CONFIG;
+ add_to_lb_policy_wait_queue_locked_state_config(elem);
+ gpr_mu_unlock(&chand->mu_config);
+ gpr_mu_unlock(&calld->mu_state);
+ } else {
+ calld->state = CALL_CANCELLED;
+ gpr_mu_unlock(&chand->mu_config);
+ gpr_mu_unlock(&calld->mu_state);
+ handle_op_after_cancellation(elem, op);
+ }
}
}
break;
- case CALL_CANCELLED:
- gpr_mu_unlock(&chand->mu);
- handle_op_after_cancellation(elem, op);
- break;
}
+
+ if (consumed_op != NULL) {
+ consumed_op->cb(consumed_op->cb_arg, 1);
+ }
+}
+
+static void cc_start_transport_stream_op(grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
+ perform_transport_stream_op(elem, op, 0);
+}
+
+static void cc_on_config_changed(void *arg, int iomgr_success) {
+ channel_data *chand = arg;
+ grpc_lb_policy *lb_policy = NULL;
+ grpc_lb_policy *old_lb_policy;
+ grpc_resolver *old_resolver;
+ grpc_iomgr_closure *wakeup_closures = NULL;
+
+ if (chand->incoming_configuration != NULL) {
+ lb_policy = grpc_client_config_get_lb_policy(chand->incoming_configuration);
+ GRPC_LB_POLICY_REF(lb_policy, "channel");
+
+ grpc_client_config_unref(chand->incoming_configuration);
+ }
+
+ chand->incoming_configuration = NULL;
+
+ gpr_mu_lock(&chand->mu_config);
+ old_lb_policy = chand->lb_policy;
+ chand->lb_policy = lb_policy;
+ if (lb_policy != NULL || chand->resolver == NULL /* disconnected */) {
+ wakeup_closures = chand->waiting_for_config_closures;
+ chand->waiting_for_config_closures = NULL;
+ }
+ gpr_mu_unlock(&chand->mu_config);
+
+ if (old_lb_policy) {
+ GRPC_LB_POLICY_UNREF(old_lb_policy, "channel");
+ }
+
+ gpr_mu_lock(&chand->mu_config);
+ if (iomgr_success && chand->resolver) {
+ grpc_resolver *resolver = chand->resolver;
+ GRPC_RESOLVER_REF(resolver, "channel-next");
+ gpr_mu_unlock(&chand->mu_config);
+ GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
+ grpc_resolver_next(resolver, &chand->incoming_configuration,
+ &chand->on_config_changed);
+ GRPC_RESOLVER_UNREF(resolver, "channel-next");
+ } else {
+ old_resolver = chand->resolver;
+ chand->resolver = NULL;
+ grpc_connectivity_state_set(&chand->state_tracker,
+ GRPC_CHANNEL_FATAL_FAILURE);
+ gpr_mu_unlock(&chand->mu_config);
+ if (old_resolver != NULL) {
+ grpc_resolver_shutdown(old_resolver);
+ GRPC_RESOLVER_UNREF(old_resolver, "channel");
+ }
+ }
+
+ while (wakeup_closures) {
+ grpc_iomgr_closure *next = wakeup_closures->next;
+ grpc_iomgr_add_callback(wakeup_closures);
+ wakeup_closures = next;
+ }
+
+ GRPC_CHANNEL_INTERNAL_UNREF(chand->master, "resolver");
}
-static void channel_op(grpc_channel_element *elem,
- grpc_channel_element *from_elem, grpc_channel_op *op) {
+static void cc_start_transport_op(grpc_channel_element *elem,
+ grpc_transport_op *op) {
+ grpc_lb_policy *lb_policy = NULL;
channel_data *chand = elem->channel_data;
- grpc_child_channel *child_channel;
- grpc_channel_op rop;
- GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
+ grpc_resolver *destroy_resolver = NULL;
+ grpc_iomgr_closure *on_consumed = op->on_consumed;
+ op->on_consumed = NULL;
+
+ GPR_ASSERT(op->set_accept_stream == NULL);
+ GPR_ASSERT(op->bind_pollset == NULL);
+
+ gpr_mu_lock(&chand->mu_config);
+ if (op->on_connectivity_state_change != NULL) {
+ grpc_connectivity_state_notify_on_state_change(
+ &chand->state_tracker, op->connectivity_state,
+ op->on_connectivity_state_change);
+ op->on_connectivity_state_change = NULL;
+ op->connectivity_state = NULL;
+ }
- switch (op->type) {
- case GRPC_CHANNEL_GOAWAY:
- /* sending goaway: clear out the active child on the way through */
- gpr_mu_lock(&chand->mu);
- child_channel = chand->active_child;
- chand->active_child = NULL;
- gpr_mu_unlock(&chand->mu);
- if (child_channel) {
- grpc_child_channel_handle_op(child_channel, op);
- grpc_child_channel_destroy(child_channel, 1);
- } else {
- gpr_slice_unref(op->data.goaway.message);
- }
- break;
- case GRPC_CHANNEL_DISCONNECT:
- /* sending disconnect: clear out the active child on the way through */
- gpr_mu_lock(&chand->mu);
- child_channel = chand->active_child;
- chand->active_child = NULL;
- gpr_mu_unlock(&chand->mu);
- if (child_channel) {
- grpc_child_channel_destroy(child_channel, 1);
- }
- /* fake a transport closed to satisfy the refcounting in client */
- rop.type = GRPC_TRANSPORT_CLOSED;
- rop.dir = GRPC_CALL_UP;
- grpc_channel_next_op(elem, &rop);
- break;
- case GRPC_TRANSPORT_GOAWAY:
- /* receiving goaway: if it's from our active child, drop the active child;
- in all cases consume the event here */
- gpr_mu_lock(&chand->mu);
- child_channel = grpc_channel_stack_from_top_element(from_elem);
- if (child_channel == chand->active_child) {
- chand->active_child = NULL;
- } else {
- child_channel = NULL;
- }
- gpr_mu_unlock(&chand->mu);
- if (child_channel) {
- grpc_child_channel_destroy(child_channel, 0);
- }
- gpr_slice_unref(op->data.goaway.message);
- break;
- case GRPC_TRANSPORT_CLOSED:
- /* receiving disconnect: if it's from our active child, drop the active
- child; in all cases consume the event here */
- gpr_mu_lock(&chand->mu);
- child_channel = grpc_channel_stack_from_top_element(from_elem);
- if (child_channel == chand->active_child) {
- chand->active_child = NULL;
- } else {
- child_channel = NULL;
- }
- gpr_mu_unlock(&chand->mu);
- if (child_channel) {
- grpc_child_channel_destroy(child_channel, 0);
- }
- break;
- default:
- switch (op->dir) {
- case GRPC_CALL_UP:
- grpc_channel_next_op(elem, op);
- break;
- case GRPC_CALL_DOWN:
- gpr_log(GPR_ERROR, "unhandled channel op: %d", op->type);
- abort();
- break;
- }
- break;
+ if (op->disconnect && chand->resolver != NULL) {
+ grpc_connectivity_state_set(&chand->state_tracker,
+ GRPC_CHANNEL_FATAL_FAILURE);
+ destroy_resolver = chand->resolver;
+ chand->resolver = NULL;
+ if (chand->lb_policy != NULL) {
+ grpc_lb_policy_shutdown(chand->lb_policy);
+ }
+ }
+
+ if (!is_empty(op, sizeof(*op))) {
+ lb_policy = chand->lb_policy;
+ if (lb_policy) {
+ GRPC_LB_POLICY_REF(lb_policy, "broadcast");
+ }
+ }
+ gpr_mu_unlock(&chand->mu_config);
+
+ if (destroy_resolver) {
+ grpc_resolver_shutdown(destroy_resolver);
+ GRPC_RESOLVER_UNREF(destroy_resolver, "channel");
+ }
+
+ if (lb_policy) {
+ grpc_lb_policy_broadcast(lb_policy, op);
+ GRPC_LB_POLICY_UNREF(lb_policy, "broadcast");
+ }
+
+ if (on_consumed) {
+ grpc_iomgr_add_callback(on_consumed);
}
}
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
const void *server_transport_data,
- grpc_transport_op *initial_op) {
+ grpc_transport_stream_op *initial_op) {
call_data *calld = elem->call_data;
/* TODO(ctiller): is there something useful we can do here? */
@@ -376,6 +515,7 @@ static void init_call_elem(grpc_call_element *elem,
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
GPR_ASSERT(server_transport_data == NULL);
+ gpr_mu_init(&calld->mu_state);
calld->elem = elem;
calld->state = CALL_CREATED;
calld->deadline = gpr_inf_future;
@@ -384,161 +524,88 @@ static void init_call_elem(grpc_call_element *elem,
/* Destructor for call_data */
static void destroy_call_elem(grpc_call_element *elem) {
call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
+ grpc_subchannel_call *subchannel_call;
/* if the call got activated, we need to destroy the child stack also, and
remove it from the in-flight requests tracked by the child_entry we
picked */
- gpr_mu_lock(&chand->mu);
+ gpr_mu_lock(&calld->mu_state);
switch (calld->state) {
case CALL_ACTIVE:
- gpr_mu_unlock(&chand->mu);
- grpc_child_call_destroy(calld->s.active.child_call);
+ subchannel_call = calld->subchannel_call;
+ gpr_mu_unlock(&calld->mu_state);
+ GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, "client_channel");
break;
- case CALL_WAITING:
- remove_waiting_child(chand, calld);
- gpr_mu_unlock(&chand->mu);
+ case CALL_CREATED:
+ case CALL_CANCELLED:
+ gpr_mu_unlock(&calld->mu_state);
break;
- default:
- gpr_mu_unlock(&chand->mu);
+ case CALL_WAITING_FOR_PICK:
+ case CALL_WAITING_FOR_CONFIG:
+ case CALL_WAITING_FOR_CALL:
+ case CALL_WAITING_FOR_SEND:
+ gpr_log(GPR_ERROR, "should never reach here");
+ abort();
break;
}
- GPR_ASSERT(calld->state != CALL_WAITING);
}
/* Constructor for channel_data */
-static void init_channel_elem(grpc_channel_element *elem,
+static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
const grpc_channel_args *args,
grpc_mdctx *metadata_context, int is_first,
int is_last) {
channel_data *chand = elem->channel_data;
- GPR_ASSERT(!is_first);
+ memset(chand, 0, sizeof(*chand));
+
GPR_ASSERT(is_last);
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
- gpr_mu_init(&chand->mu);
- chand->active_child = NULL;
- chand->waiting_children = NULL;
- chand->waiting_child_count = 0;
- chand->waiting_child_capacity = 0;
- chand->transport_setup = NULL;
- chand->transport_setup_initiated = 0;
- chand->args = grpc_channel_args_copy(args);
+ gpr_mu_init(&chand->mu_config);
chand->mdctx = metadata_context;
+ chand->master = master;
+ grpc_iomgr_closure_init(&chand->on_config_changed, cc_on_config_changed,
+ chand);
+
+ grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE);
}
/* Destructor for channel_data */
static void destroy_channel_elem(grpc_channel_element *elem) {
channel_data *chand = elem->channel_data;
- grpc_transport_setup_cancel(chand->transport_setup);
-
- if (chand->active_child) {
- grpc_child_channel_destroy(chand->active_child, 1);
- chand->active_child = NULL;
+ if (chand->resolver != NULL) {
+ grpc_resolver_shutdown(chand->resolver);
+ GRPC_RESOLVER_UNREF(chand->resolver, "channel");
}
-
- grpc_channel_args_destroy(chand->args);
-
- gpr_mu_destroy(&chand->mu);
- GPR_ASSERT(chand->waiting_child_count == 0);
- gpr_free(chand->waiting_children);
+ if (chand->lb_policy != NULL) {
+ GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel");
+ }
+ gpr_mu_destroy(&chand->mu_config);
}
const grpc_channel_filter grpc_client_channel_filter = {
- cc_start_transport_op, channel_op, sizeof(call_data),
- init_call_elem, destroy_call_elem, sizeof(channel_data),
- init_channel_elem, destroy_channel_elem, "client-channel",
+ cc_start_transport_stream_op,
+ cc_start_transport_op,
+ sizeof(call_data),
+ init_call_elem,
+ destroy_call_elem,
+ sizeof(channel_data),
+ init_channel_elem,
+ destroy_channel_elem,
+ "client-channel",
};
-grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
- grpc_channel_stack *channel_stack, grpc_transport *transport,
- grpc_channel_filter const **channel_filters, size_t num_channel_filters,
- grpc_mdctx *mdctx) {
- /* we just got a new transport: lets create a child channel stack for it */
- grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
- channel_data *chand = elem->channel_data;
- size_t num_child_filters = 2 + num_channel_filters;
- grpc_channel_filter const **child_filters;
- grpc_transport_setup_result result;
- grpc_child_channel *old_active = NULL;
- call_data **waiting_children;
- size_t waiting_child_count;
- size_t i;
- grpc_transport_op *call_ops;
-
- /* build the child filter stack */
- child_filters = gpr_malloc(sizeof(grpc_channel_filter *) * num_child_filters);
- /* we always need a link back filter to get back to the connected channel */
- child_filters[0] = &grpc_child_channel_top_filter;
- for (i = 0; i < num_channel_filters; i++) {
- child_filters[i + 1] = channel_filters[i];
- }
- /* and we always need a connected channel to talk to the transport */
- child_filters[num_child_filters - 1] = &grpc_connected_channel_filter;
-
- GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
-
- /* BEGIN LOCKING CHANNEL */
- gpr_mu_lock(&chand->mu);
- chand->transport_setup_initiated = 0;
-
- if (chand->active_child) {
- old_active = chand->active_child;
- }
- chand->active_child = grpc_child_channel_create(
- elem, child_filters, num_child_filters, chand->args, mdctx);
- result =
- grpc_connected_channel_bind_transport(chand->active_child, transport);
-
- /* capture the waiting children - we'll activate them outside the lock
- to avoid re-entrancy problems */
- waiting_children = chand->waiting_children;
- waiting_child_count = chand->waiting_child_count;
- /* bumping up inflight_requests here avoids taking a lock per rpc below */
-
- chand->waiting_children = NULL;
- chand->waiting_child_count = 0;
- chand->waiting_child_capacity = 0;
-
- call_ops = gpr_malloc(sizeof(*call_ops) * waiting_child_count);
-
- for (i = 0; i < waiting_child_count; i++) {
- call_ops[i] = waiting_children[i]->s.waiting_op;
- if (!prepare_activate(waiting_children[i]->elem, chand->active_child)) {
- waiting_children[i] = NULL;
- grpc_transport_op_finish_with_failure(&call_ops[i]);
- }
- }
-
- /* END LOCKING CHANNEL */
- gpr_mu_unlock(&chand->mu);
-
- /* activate any pending operations - this is safe to do as we guarantee one
- and only one write operation per request at the surface api - if we lose
- that guarantee we need to do some curly locking here */
- for (i = 0; i < waiting_child_count; i++) {
- if (waiting_children[i]) {
- complete_activate(waiting_children[i]->elem, &call_ops[i]);
- }
- }
- gpr_free(waiting_children);
- gpr_free(call_ops);
- gpr_free(child_filters);
-
- if (old_active) {
- grpc_child_channel_destroy(old_active, 1);
- }
-
- return result;
-}
-
-void grpc_client_channel_set_transport_setup(grpc_channel_stack *channel_stack,
- grpc_transport_setup *setup) {
+void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack,
+ grpc_resolver *resolver) {
/* post construction initialization: set the transport setup pointer */
grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
channel_data *chand = elem->channel_data;
- GPR_ASSERT(!chand->transport_setup);
- chand->transport_setup = setup;
+ GPR_ASSERT(!chand->resolver);
+ chand->resolver = resolver;
+ GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
+ GRPC_RESOLVER_REF(resolver, "channel");
+ grpc_resolver_next(resolver, &chand->incoming_configuration,
+ &chand->on_config_changed);
}