aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/channel/client_channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/channel/client_channel.c')
-rw-r--r--src/core/channel/client_channel.c329
1 files changed, 170 insertions, 159 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 9630f6898d..965d4e53dc 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -34,6 +34,7 @@
#include "src/core/channel/client_channel.h"
#include <stdio.h>
+#include <string.h>
#include "src/core/channel/channel_args.h"
#include "src/core/channel/connected_channel.h"
@@ -75,6 +76,7 @@ typedef enum {
CALL_CREATED,
CALL_WAITING_FOR_CONFIG,
CALL_WAITING_FOR_PICK,
+ CALL_WAITING_FOR_CALL,
CALL_ACTIVE,
CALL_CANCELLED
} call_state;
@@ -87,17 +89,13 @@ struct call_data {
call_state state;
gpr_timespec deadline;
- union {
- struct {
- /* our child call stack */
- grpc_subchannel_call *subchannel_call;
- } active;
- grpc_transport_stream_op waiting_op;
- struct {
- grpc_linked_mdelem status;
- grpc_linked_mdelem details;
- } cancelled;
- } s;
+ grpc_subchannel *picked_channel;
+ grpc_iomgr_closure async_setup_task;
+ grpc_transport_stream_op waiting_op;
+ /* our child call stack */
+ grpc_subchannel_call *subchannel_call;
+ grpc_linked_mdelem status;
+ grpc_linked_mdelem details;
};
#if 0
@@ -110,9 +108,9 @@ static int prepare_activate(grpc_call_element *elem,
/* no more access to calld->s.waiting allowed */
GPR_ASSERT(calld->state == CALL_WAITING);
- if (calld->s.waiting_op.bind_pollset) {
+ if (calld->waiting_op.bind_pollset) {
grpc_transport_setup_del_interested_party(chand->transport_setup,
- calld->s.waiting_op.bind_pollset);
+ calld->waiting_op.bind_pollset);
}
calld->state = CALL_ACTIVE;
@@ -143,7 +141,7 @@ static void remove_waiting_child(channel_data *chand, call_data *calld) {
for (i = 0, new_count = 0; i < chand->waiting_child_count; i++) {
if (chand->waiting_children[i] == calld) {
grpc_transport_setup_del_interested_party(
- chand->transport_setup, calld->s.waiting_op.bind_pollset);
+ chand->transport_setup, calld->waiting_op.bind_pollset);
continue;
}
chand->waiting_children[new_count++] = chand->waiting_children[i];
@@ -166,15 +164,15 @@ static void handle_op_after_cancellation(grpc_call_element *elem,
char status[GPR_LTOA_MIN_BUFSIZE];
grpc_metadata_batch mdb;
gpr_ltoa(GRPC_STATUS_CANCELLED, status);
- calld->s.cancelled.status.md =
+ calld->status.md =
grpc_mdelem_from_strings(chand->mdctx, "grpc-status", status);
- calld->s.cancelled.details.md =
+ calld->details.md =
grpc_mdelem_from_strings(chand->mdctx, "grpc-message", "Cancelled");
- calld->s.cancelled.status.prev = calld->s.cancelled.details.next = NULL;
- calld->s.cancelled.status.next = &calld->s.cancelled.details;
- calld->s.cancelled.details.prev = &calld->s.cancelled.status;
- mdb.list.head = &calld->s.cancelled.status;
- mdb.list.tail = &calld->s.cancelled.details;
+ calld->status.prev = calld->details.next = NULL;
+ calld->status.next = &calld->details;
+ calld->details.prev = &calld->status;
+ mdb.list.head = &calld->status;
+ mdb.list.tail = &calld->details;
mdb.garbage.head = mdb.garbage.tail = NULL;
mdb.deadline = gpr_inf_future;
grpc_sopb_add_metadata(op->recv_ops, mdb);
@@ -186,16 +184,111 @@ static void handle_op_after_cancellation(grpc_call_element *elem,
}
}
-static void add_to_lb_policy_wait_queue_locked_state_config(channel_data *chand, call_data *calld) {
- abort();
+typedef struct {
+ grpc_iomgr_closure closure;
+ grpc_call_element *elem;
+} waiting_call;
+
+static void perform_transport_stream_op(grpc_call_element *elem, grpc_transport_stream_op *op, int continuation);
+
+static void continue_with_pick(void *arg, int iomgr_success) {
+ waiting_call *wc = arg;
+ call_data *calld = wc->elem->call_data;
+ perform_transport_stream_op(wc->elem, &calld->waiting_op, 1);
+ gpr_free(wc);
+}
+
+static void add_to_lb_policy_wait_queue_locked_state_config(grpc_call_element *elem) {
+ channel_data *chand = elem->channel_data;
+ waiting_call *wc = gpr_malloc(sizeof(*wc));
+ grpc_iomgr_closure_init(&wc->closure, continue_with_pick, wc);
+ wc->elem = elem;
+ wc->closure.next = chand->waiting_for_config_closures;
+ chand->waiting_for_config_closures = &wc->closure;
+}
+
+static int is_empty(void *p, int len) {
+ char *ptr = p;
+ int i;
+ for (i = 0; i < len; i++) {
+ if (ptr[i] != 0) return 0;
+ }
+ return 1;
+}
+
+static void started_call(void *arg, int iomgr_success) {
+ call_data *calld = arg;
+ grpc_transport_stream_op op;
+ int have_waiting;
+
+ gpr_mu_lock(&calld->mu_state);
+ if (calld->state == CALL_CANCELLED && calld->subchannel_call != NULL) {
+ memset(&op, 0, sizeof(op));
+ op.cancel_with_status = GRPC_STATUS_CANCELLED;
+ gpr_mu_unlock(&calld->mu_state);
+ grpc_subchannel_call_process_op(calld->subchannel_call, &op);
+ } else if (calld->state == CALL_WAITING_FOR_CALL) {
+ have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op));
+ if (calld->subchannel_call != NULL) {
+ calld->state = CALL_ACTIVE;
+ gpr_mu_unlock(&calld->mu_state);
+ if (have_waiting) {
+ grpc_subchannel_call_process_op(calld->subchannel_call, &calld->waiting_op);
+ }
+ } else {
+ calld->state = CALL_CANCELLED;
+ gpr_mu_unlock(&calld->mu_state);
+ if (have_waiting) {
+ handle_op_after_cancellation(calld->elem, &calld->waiting_op);
+ }
+ }
+ } else {
+ GPR_ASSERT(calld->state == CALL_CANCELLED);
+ }
+}
+
+static void picked_target(void *arg, int iomgr_success) {
+ call_data *calld = arg;
+ channel_data *chand = calld->elem->channel_data;
+ grpc_transport_stream_op op;
+
+ if (calld->picked_channel == NULL) {
+ /* treat this like a cancellation */
+ calld->waiting_op.cancel_with_status = GRPC_STATUS_UNAVAILABLE;
+ perform_transport_stream_op(calld->elem, &calld->waiting_op, 1);
+ } else {
+ gpr_mu_lock(&calld->mu_state);
+ if (calld->state == CALL_CANCELLED) {
+ gpr_mu_unlock(&calld->mu_state);
+ handle_op_after_cancellation(calld->elem, &calld->waiting_op);
+ } else {
+ GPR_ASSERT(calld->state == CALL_WAITING_FOR_PICK);
+ calld->state = CALL_WAITING_FOR_CALL;
+ op = calld->waiting_op;
+ memset(&calld->waiting_op, 0, sizeof(calld->waiting_op));
+ gpr_mu_unlock(&calld->mu_state);
+ grpc_iomgr_closure_init(&calld->async_setup_task, started_call, calld);
+ grpc_subchannel_create_call(calld->picked_channel, chand->mdctx, &op, &calld->subchannel_call, &calld->async_setup_task);
+ }
+ }
}
static void pick_target(grpc_lb_policy *lb_policy, call_data *calld) {
- abort();
+ grpc_metadata_batch *initial_metadata;
+ grpc_transport_stream_op *op = &calld->waiting_op;
+
+ GPR_ASSERT(op->bind_pollset);
+ GPR_ASSERT(op->send_ops);
+ GPR_ASSERT(op->send_ops->nops >= 1);
+ GPR_ASSERT(op->send_ops->ops[0].type == GRPC_OP_METADATA);
+ initial_metadata = &op->send_ops->ops[0].data.metadata;
+
+ grpc_iomgr_closure_init(&calld->async_setup_task, picked_target, calld);
+ grpc_lb_policy_pick(lb_policy, op->bind_pollset,
+ initial_metadata, &calld->picked_channel, &calld->async_setup_task);
}
-static void cc_start_transport_stream_op(grpc_call_element *elem,
- grpc_transport_stream_op *op) {
+static void perform_transport_stream_op(grpc_call_element *elem, grpc_transport_stream_op *op, int continuation) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
grpc_subchannel_call *subchannel_call;
@@ -206,7 +299,8 @@ static void cc_start_transport_stream_op(grpc_call_element *elem,
gpr_mu_lock(&calld->mu_state);
switch (calld->state) {
case CALL_ACTIVE:
- subchannel_call = calld->s.active.subchannel_call;
+ GPR_ASSERT(!continuation);
+ subchannel_call = calld->subchannel_call;
gpr_mu_unlock(&calld->mu_state);
grpc_subchannel_call_process_op(subchannel_call, op);
break;
@@ -214,13 +308,44 @@ static void cc_start_transport_stream_op(grpc_call_element *elem,
gpr_mu_unlock(&calld->mu_state);
handle_op_after_cancellation(elem, op);
break;
+ case CALL_WAITING_FOR_CONFIG:
+ case CALL_WAITING_FOR_PICK:
+ case CALL_WAITING_FOR_CALL:
+ if (!continuation) {
+ if (op->cancel_with_status != GRPC_STATUS_OK) {
+ calld->state = CALL_CANCELLED;
+ gpr_mu_unlock(&calld->mu_state);
+ handle_op_after_cancellation(elem, op);
+ } else {
+ GPR_ASSERT((calld->waiting_op.send_ops == NULL) !=
+ (op->send_ops == NULL));
+ GPR_ASSERT((calld->waiting_op.recv_ops == NULL) !=
+ (op->recv_ops == NULL));
+ if (op->send_ops != NULL) {
+ calld->waiting_op.send_ops = op->send_ops;
+ calld->waiting_op.is_last_send = op->is_last_send;
+ calld->waiting_op.on_done_send = op->on_done_send;
+ }
+ if (op->recv_ops != NULL) {
+ calld->waiting_op.recv_ops = op->recv_ops;
+ calld->waiting_op.recv_state = op->recv_state;
+ calld->waiting_op.on_done_recv = op->on_done_recv;
+ }
+ gpr_mu_unlock(&calld->mu_state);
+ if (op->on_consumed != NULL) {
+ op->on_consumed->cb(op->on_consumed->cb_arg, 0);
+ }
+ }
+ break;
+ }
+ /* fall through */
case CALL_CREATED:
if (op->cancel_with_status != GRPC_STATUS_OK) {
calld->state = CALL_CANCELLED;
gpr_mu_unlock(&calld->mu_state);
handle_op_after_cancellation(elem, op);
} else {
- calld->s.waiting_op = *op;
+ calld->waiting_op = *op;
gpr_mu_lock(&chand->mu_config);
lb_policy = chand->lb_policy;
@@ -235,141 +360,22 @@ static void cc_start_transport_stream_op(grpc_call_element *elem,
grpc_lb_policy_unref(lb_policy);
} else {
calld->state = CALL_WAITING_FOR_CONFIG;
- add_to_lb_policy_wait_queue_locked_state_config(chand, calld);
+ add_to_lb_policy_wait_queue_locked_state_config(elem);
gpr_mu_unlock(&chand->mu_config);
gpr_mu_unlock(&calld->mu_state);
}
}
break;
- case CALL_WAITING_FOR_CONFIG:
- case CALL_WAITING_FOR_PICK:
- if (op->cancel_with_status != GRPC_STATUS_OK) {
- calld->state = CALL_CANCELLED;
- gpr_mu_unlock(&calld->mu_state);
- handle_op_after_cancellation(elem, op);
- } else {
- GPR_ASSERT((calld->s.waiting_op.send_ops == NULL) !=
- (op->send_ops == NULL));
- GPR_ASSERT((calld->s.waiting_op.recv_ops == NULL) !=
- (op->recv_ops == NULL));
- if (op->send_ops != NULL) {
- calld->s.waiting_op.send_ops = op->send_ops;
- calld->s.waiting_op.is_last_send = op->is_last_send;
- calld->s.waiting_op.on_done_send = op->on_done_send;
- }
- if (op->recv_ops != NULL) {
- calld->s.waiting_op.recv_ops = op->recv_ops;
- calld->s.waiting_op.recv_state = op->recv_state;
- calld->s.waiting_op.on_done_recv = op->on_done_recv;
- }
- gpr_mu_unlock(&calld->mu_state);
- if (op->on_consumed != NULL) {
- op->on_consumed->cb(op->on_consumed->cb_arg, 0);
- }
- }
- break;
}
+}
-
-
-
-#if 0
- gpr_mu_lock(&chand->mu);
- switch (calld->state) {
- case CALL_ACTIVE:
- child_elem = grpc_child_call_get_top_element(calld->s.active.child_call);
- gpr_mu_unlock(&chand->mu);
- child_elem->filter->start_transport_op(child_elem, op);
- break;
- case CALL_CREATED:
- if (op->cancel_with_status != GRPC_STATUS_OK) {
- calld->state = CALL_CANCELLED;
- gpr_mu_unlock(&chand->mu);
- handle_op_after_cancellation(elem, op);
- } else {
- calld->state = CALL_WAITING;
- calld->s.waiting_op.bind_pollset = NULL;
- if (chand->active_child) {
- /* channel is connected - use the connected stack */
- if (prepare_activate(elem, chand->active_child)) {
- gpr_mu_unlock(&chand->mu);
- /* activate the request (pass it down) outside the lock */
- complete_activate(elem, op);
- } else {
- gpr_mu_unlock(&chand->mu);
- }
- } else {
- /* check to see if we should initiate a connection (if we're not
- already),
- but don't do so until outside the lock to avoid re-entrancy
- problems if
- the callback is immediate */
- int initiate_transport_setup = 0;
- if (!chand->transport_setup_initiated) {
- chand->transport_setup_initiated = 1;
- initiate_transport_setup = 1;
- }
- /* add this call to the waiting set to be resumed once we have a child
- channel stack, growing the waiting set if needed */
- if (chand->waiting_child_count == chand->waiting_child_capacity) {
- chand->waiting_child_capacity =
- GPR_MAX(chand->waiting_child_capacity * 2, 8);
- chand->waiting_children = gpr_realloc(
- chand->waiting_children,
- chand->waiting_child_capacity * sizeof(call_data *));
- }
- calld->s.waiting_op = *op;
- chand->waiting_children[chand->waiting_child_count++] = calld;
- grpc_transport_setup_add_interested_party(chand->transport_setup,
- op->bind_pollset);
- gpr_mu_unlock(&chand->mu);
-
- /* finally initiate transport setup if needed */
- if (initiate_transport_setup) {
- grpc_transport_setup_initiate(chand->transport_setup);
- }
- }
- }
- break;
- case CALL_WAITING:
- if (op->cancel_with_status != GRPC_STATUS_OK) {
- waiting_op = calld->s.waiting_op;
- remove_waiting_child(chand, calld);
- calld->state = CALL_CANCELLED;
- gpr_mu_unlock(&chand->mu);
- handle_op_after_cancellation(elem, &waiting_op);
- handle_op_after_cancellation(elem, op);
- } else {
- GPR_ASSERT((calld->s.waiting_op.send_ops == NULL) !=
- (op->send_ops == NULL));
- GPR_ASSERT((calld->s.waiting_op.recv_ops == NULL) !=
- (op->recv_ops == NULL));
- if (op->send_ops) {
- calld->s.waiting_op.send_ops = op->send_ops;
- calld->s.waiting_op.is_last_send = op->is_last_send;
- calld->s.waiting_op.on_done_send = op->on_done_send;
- }
- if (op->recv_ops) {
- calld->s.waiting_op.recv_ops = op->recv_ops;
- calld->s.waiting_op.recv_state = op->recv_state;
- calld->s.waiting_op.on_done_recv = op->on_done_recv;
- }
- gpr_mu_unlock(&chand->mu);
- if (op->on_consumed) {
- op->on_consumed->cb(op->on_consumed->cb_arg, 0);
- }
- }
- break;
- case CALL_CANCELLED:
- gpr_mu_unlock(&chand->mu);
- handle_op_after_cancellation(elem, op);
- break;
- }
-#endif
+static void cc_start_transport_stream_op(grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
+ perform_transport_stream_op(elem, op, 0);
}
static void update_state_locked(channel_data *chand) {
-
+ gpr_log(GPR_ERROR, "update_state_locked not implemented");
}
static void cc_on_config_changed(void *arg, int iomgr_success) {
@@ -382,9 +388,10 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
if (chand->incoming_configuration) {
lb_policy = grpc_client_config_get_lb_policy(chand->incoming_configuration);
grpc_lb_policy_ref(lb_policy);
+
+ grpc_client_config_unref(chand->incoming_configuration);
}
- grpc_client_config_unref(chand->incoming_configuration);
chand->incoming_configuration = NULL;
gpr_mu_lock(&chand->mu_config);
@@ -402,7 +409,9 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
wakeup_closures = next;
}
- grpc_lb_policy_unref(old_lb_policy);
+ if (old_lb_policy) {
+ grpc_lb_policy_unref(old_lb_policy);
+ }
if (iomgr_success) {
grpc_resolver_next(chand->resolver, &chand->incoming_configuration, &chand->on_config_changed);
@@ -511,6 +520,7 @@ static void init_call_elem(grpc_call_element *elem,
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
GPR_ASSERT(server_transport_data == NULL);
+ gpr_mu_init(&calld->mu_state);
calld->elem = elem;
calld->state = CALL_CREATED;
calld->deadline = gpr_inf_future;
@@ -527,7 +537,7 @@ static void destroy_call_elem(grpc_call_element *elem) {
gpr_mu_lock(&calld->mu_state);
switch (calld->state) {
case CALL_ACTIVE:
- subchannel_call = calld->s.active.subchannel_call;
+ subchannel_call = calld->subchannel_call;
gpr_mu_unlock(&calld->mu_state);
grpc_subchannel_call_unref(subchannel_call);
break;
@@ -537,6 +547,7 @@ static void destroy_call_elem(grpc_call_element *elem) {
break;
case CALL_WAITING_FOR_PICK:
case CALL_WAITING_FOR_CONFIG:
+ case CALL_WAITING_FOR_CALL:
gpr_log(GPR_ERROR, "should never reach here");
abort();
break;
@@ -550,12 +561,12 @@ static void init_channel_elem(grpc_channel_element *elem,
int is_last) {
channel_data *chand = elem->channel_data;
- GPR_ASSERT(!is_first);
+ memset(chand, 0, sizeof(*chand));
+
GPR_ASSERT(is_last);
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
gpr_mu_init(&chand->mu_config);
- chand->resolver = NULL;
chand->mdctx = metadata_context;
grpc_iomgr_closure_init(&chand->on_config_changed, cc_on_config_changed, chand);
}
@@ -633,7 +644,7 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
call_ops = gpr_malloc(sizeof(*call_ops) * waiting_child_count);
for (i = 0; i < waiting_child_count; i++) {
- call_ops[i] = waiting_children[i]->s.waiting_op;
+ call_ops[i] = waiting_children[i]->waiting_op;
if (!prepare_activate(waiting_children[i]->elem, chand->active_child)) {
waiting_children[i] = NULL;
grpc_transport_stream_op_finish_with_failure(&call_ops[i]);