aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/channel/client_channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/channel/client_channel.c')
-rw-r--r--src/core/channel/client_channel.c313
1 files changed, 130 insertions, 183 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 9791f98be8..78f8d06d89 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -38,7 +38,6 @@
#include "src/core/channel/channel_args.h"
#include "src/core/channel/child_channel.h"
#include "src/core/channel/connected_channel.h"
-#include "src/core/channel/metadata_buffer.h"
#include "src/core/iomgr/iomgr.h"
#include "src/core/support/string.h"
#include <grpc/support/alloc.h>
@@ -59,6 +58,7 @@ typedef struct {
/* the sending child (may be null) */
grpc_child_channel *active_child;
+ grpc_mdctx *mdctx;
/* calls waiting for a channel to be ready */
call_data **waiting_children;
@@ -70,9 +70,6 @@ typedef struct {
int transport_setup_initiated;
grpc_channel_args *args;
-
- /* metadata cache */
- grpc_mdelem *cancel_status;
} channel_data;
typedef enum {
@@ -87,19 +84,17 @@ struct call_data {
grpc_call_element *elem;
call_state state;
- grpc_metadata_buffer pending_metadata;
gpr_timespec deadline;
union {
struct {
/* our child call stack */
grpc_child_call *child_call;
} active;
+ grpc_transport_op waiting_op;
struct {
- void (*on_complete)(void *user_data, grpc_op_error error);
- void *on_complete_user_data;
- gpr_uint32 start_flags;
- grpc_pollset *pollset;
- } waiting;
+ grpc_linked_mdelem status;
+ grpc_linked_mdelem details;
+ } cancelled;
} s;
};
@@ -113,89 +108,23 @@ static int prepare_activate(grpc_call_element *elem,
calld->state = CALL_ACTIVE;
/* create a child call */
- calld->s.active.child_call = grpc_child_channel_create_call(on_child, elem);
+ /* TODO(ctiller): pass the waiting op down here */
+ calld->s.active.child_call =
+ grpc_child_channel_create_call(on_child, elem, NULL);
return 1;
}
-static void do_nothing(void *ignored, grpc_op_error error) {}
-
-static void complete_activate(grpc_call_element *elem, grpc_call_op *op) {
+static void complete_activate(grpc_call_element *elem, grpc_transport_op *op) {
call_data *calld = elem->call_data;
grpc_call_element *child_elem =
grpc_child_call_get_top_element(calld->s.active.child_call);
GPR_ASSERT(calld->state == CALL_ACTIVE);
- /* sending buffered metadata down the stack before the start call */
- grpc_metadata_buffer_flush(&calld->pending_metadata, child_elem);
-
- if (gpr_time_cmp(calld->deadline, gpr_inf_future) != 0) {
- grpc_call_op dop;
- dop.type = GRPC_SEND_DEADLINE;
- dop.dir = GRPC_CALL_DOWN;
- dop.flags = 0;
- dop.data.deadline = calld->deadline;
- dop.done_cb = do_nothing;
- dop.user_data = NULL;
- child_elem->filter->call_op(child_elem, elem, &dop);
- }
-
/* continue the start call down the stack, this nees to happen after metadata
are flushed*/
- child_elem->filter->call_op(child_elem, elem, op);
-}
-
-static void start_rpc(grpc_call_element *elem, grpc_call_op *op) {
- call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
- gpr_mu_lock(&chand->mu);
- if (calld->state == CALL_CANCELLED) {
- gpr_mu_unlock(&chand->mu);
- op->done_cb(op->user_data, GRPC_OP_ERROR);
- return;
- }
- GPR_ASSERT(calld->state == CALL_CREATED);
- calld->state = CALL_WAITING;
- if (chand->active_child) {
- /* channel is connected - use the connected stack */
- if (prepare_activate(elem, chand->active_child)) {
- gpr_mu_unlock(&chand->mu);
- /* activate the request (pass it down) outside the lock */
- complete_activate(elem, op);
- } else {
- gpr_mu_unlock(&chand->mu);
- }
- } else {
- /* check to see if we should initiate a connection (if we're not already),
- but don't do so until outside the lock to avoid re-entrancy problems if
- the callback is immediate */
- int initiate_transport_setup = 0;
- if (!chand->transport_setup_initiated) {
- chand->transport_setup_initiated = 1;
- initiate_transport_setup = 1;
- }
- /* add this call to the waiting set to be resumed once we have a child
- channel stack, growing the waiting set if needed */
- if (chand->waiting_child_count == chand->waiting_child_capacity) {
- chand->waiting_child_capacity =
- GPR_MAX(chand->waiting_child_capacity * 2, 8);
- chand->waiting_children =
- gpr_realloc(chand->waiting_children,
- chand->waiting_child_capacity * sizeof(call_data *));
- }
- calld->s.waiting.on_complete = op->done_cb;
- calld->s.waiting.on_complete_user_data = op->user_data;
- calld->s.waiting.start_flags = op->flags;
- calld->s.waiting.pollset = op->data.start.pollset;
- chand->waiting_children[chand->waiting_child_count++] = calld;
- gpr_mu_unlock(&chand->mu);
-
- /* finally initiate transport setup if needed */
- if (initiate_transport_setup) {
- grpc_transport_setup_initiate(chand->transport_setup);
- }
- }
+ child_elem->filter->start_transport_op(child_elem, op);
}
static void remove_waiting_child(channel_data *chand, call_data *calld) {
@@ -210,94 +139,128 @@ static void remove_waiting_child(channel_data *chand, call_data *calld) {
chand->waiting_child_count = new_count;
}
-static void send_up_cancelled_ops(grpc_call_element *elem) {
- grpc_call_op finish_op;
+static void handle_op_after_cancellation(grpc_call_element *elem,
+ grpc_transport_op *op) {
+ call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- /* send up a synthesized status */
- finish_op.type = GRPC_RECV_METADATA;
- finish_op.dir = GRPC_CALL_UP;
- finish_op.flags = 0;
- finish_op.data.metadata = grpc_mdelem_ref(chand->cancel_status);
- finish_op.done_cb = do_nothing;
- finish_op.user_data = NULL;
- grpc_call_next_op(elem, &finish_op);
- /* send up a finish */
- finish_op.type = GRPC_RECV_FINISH;
- finish_op.dir = GRPC_CALL_UP;
- finish_op.flags = 0;
- finish_op.done_cb = do_nothing;
- finish_op.user_data = NULL;
- grpc_call_next_op(elem, &finish_op);
+ if (op->send_ops) {
+ op->on_done_send(op->send_user_data, 0);
+ }
+ if (op->recv_ops) {
+ char status[GPR_LTOA_MIN_BUFSIZE];
+ grpc_metadata_batch mdb;
+ gpr_ltoa(GRPC_STATUS_CANCELLED, status);
+ calld->s.cancelled.status.md =
+ grpc_mdelem_from_strings(chand->mdctx, "grpc-status", status);
+ calld->s.cancelled.details.md =
+ grpc_mdelem_from_strings(chand->mdctx, "grpc-message", "Cancelled");
+ calld->s.cancelled.status.prev = calld->s.cancelled.details.next = NULL;
+ calld->s.cancelled.status.next = &calld->s.cancelled.details;
+ calld->s.cancelled.details.prev = &calld->s.cancelled.status;
+ mdb.list.head = &calld->s.cancelled.status;
+ mdb.list.tail = &calld->s.cancelled.details;
+ mdb.garbage.head = mdb.garbage.tail = NULL;
+ mdb.deadline = gpr_inf_future;
+ grpc_sopb_add_metadata(op->recv_ops, mdb);
+ *op->recv_state = GRPC_STREAM_CLOSED;
+ op->on_done_recv(op->recv_user_data, 1);
+ }
}
-static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) {
+static void cc_start_transport_op(grpc_call_element *elem,
+ grpc_transport_op *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
grpc_call_element *child_elem;
+ grpc_transport_op waiting_op;
+ GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
gpr_mu_lock(&chand->mu);
switch (calld->state) {
case CALL_ACTIVE:
child_elem = grpc_child_call_get_top_element(calld->s.active.child_call);
gpr_mu_unlock(&chand->mu);
- child_elem->filter->call_op(child_elem, elem, op);
- return; /* early out */
- case CALL_WAITING:
- remove_waiting_child(chand, calld);
- calld->state = CALL_CANCELLED;
- gpr_mu_unlock(&chand->mu);
- send_up_cancelled_ops(elem);
- calld->s.waiting.on_complete(calld->s.waiting.on_complete_user_data,
- GRPC_OP_ERROR);
- return; /* early out */
- case CALL_CREATED:
- calld->state = CALL_CANCELLED;
- gpr_mu_unlock(&chand->mu);
- send_up_cancelled_ops(elem);
- return; /* early out */
- case CALL_CANCELLED:
- gpr_mu_unlock(&chand->mu);
- return; /* early out */
- }
- gpr_log(GPR_ERROR, "should never reach here");
- abort();
-}
-
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
- call_data *calld = elem->call_data;
- GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
-
- switch (op->type) {
- case GRPC_SEND_METADATA:
- grpc_metadata_buffer_queue(&calld->pending_metadata, op);
- break;
- case GRPC_SEND_DEADLINE:
- calld->deadline = op->data.deadline;
- op->done_cb(op->user_data, GRPC_OP_OK);
- break;
- case GRPC_SEND_START:
- /* filter out the start event to find which child to send on */
- start_rpc(elem, op);
+ child_elem->filter->start_transport_op(child_elem, op);
break;
- case GRPC_CANCEL_OP:
- cancel_rpc(elem, op);
+ case CALL_CREATED:
+ if (op->cancel_with_status != GRPC_STATUS_OK) {
+ calld->state = CALL_CANCELLED;
+ gpr_mu_unlock(&chand->mu);
+ handle_op_after_cancellation(elem, op);
+ } else {
+ calld->state = CALL_WAITING;
+ if (chand->active_child) {
+ /* channel is connected - use the connected stack */
+ if (prepare_activate(elem, chand->active_child)) {
+ gpr_mu_unlock(&chand->mu);
+ /* activate the request (pass it down) outside the lock */
+ complete_activate(elem, op);
+ } else {
+ gpr_mu_unlock(&chand->mu);
+ }
+ } else {
+ /* check to see if we should initiate a connection (if we're not
+ already),
+ but don't do so until outside the lock to avoid re-entrancy
+ problems if
+ the callback is immediate */
+ int initiate_transport_setup = 0;
+ if (!chand->transport_setup_initiated) {
+ chand->transport_setup_initiated = 1;
+ initiate_transport_setup = 1;
+ }
+ /* add this call to the waiting set to be resumed once we have a child
+ channel stack, growing the waiting set if needed */
+ if (chand->waiting_child_count == chand->waiting_child_capacity) {
+ chand->waiting_child_capacity =
+ GPR_MAX(chand->waiting_child_capacity * 2, 8);
+ chand->waiting_children = gpr_realloc(
+ chand->waiting_children,
+ chand->waiting_child_capacity * sizeof(call_data *));
+ }
+ calld->s.waiting_op = *op;
+ chand->waiting_children[chand->waiting_child_count++] = calld;
+ gpr_mu_unlock(&chand->mu);
+
+ /* finally initiate transport setup if needed */
+ if (initiate_transport_setup) {
+ grpc_transport_setup_initiate(chand->transport_setup);
+ }
+ }
+ }
break;
- case GRPC_SEND_MESSAGE:
- case GRPC_SEND_FINISH:
- case GRPC_REQUEST_DATA:
- if (calld->state == CALL_ACTIVE) {
- grpc_call_element *child_elem =
- grpc_child_call_get_top_element(calld->s.active.child_call);
- child_elem->filter->call_op(child_elem, elem, op);
+ case CALL_WAITING:
+ if (op->cancel_with_status != GRPC_STATUS_OK) {
+ waiting_op = calld->s.waiting_op;
+ remove_waiting_child(chand, calld);
+ calld->state = CALL_CANCELLED;
+ gpr_mu_unlock(&chand->mu);
+ handle_op_after_cancellation(elem, &waiting_op);
+ handle_op_after_cancellation(elem, op);
} else {
- op->done_cb(op->user_data, GRPC_OP_ERROR);
+ GPR_ASSERT((calld->s.waiting_op.send_ops == NULL) !=
+ (op->send_ops == NULL));
+ GPR_ASSERT((calld->s.waiting_op.recv_ops == NULL) !=
+ (op->recv_ops == NULL));
+ if (op->send_ops) {
+ calld->s.waiting_op.send_ops = op->send_ops;
+ calld->s.waiting_op.is_last_send = op->is_last_send;
+ calld->s.waiting_op.on_done_send = op->on_done_send;
+ calld->s.waiting_op.send_user_data = op->send_user_data;
+ }
+ if (op->recv_ops) {
+ calld->s.waiting_op.recv_ops = op->recv_ops;
+ calld->s.waiting_op.recv_state = op->recv_state;
+ calld->s.waiting_op.on_done_recv = op->on_done_recv;
+ calld->s.waiting_op.recv_user_data = op->recv_user_data;
+ }
+ gpr_mu_unlock(&chand->mu);
}
break;
- default:
- GPR_ASSERT(op->dir == GRPC_CALL_UP);
- grpc_call_next_op(elem, op);
+ case CALL_CANCELLED:
+ gpr_mu_unlock(&chand->mu);
+ handle_op_after_cancellation(elem, op);
break;
}
}
@@ -382,39 +345,33 @@ static void channel_op(grpc_channel_element *elem,
}
}
-static void error_bad_on_complete(void *arg, grpc_op_error error) {
- gpr_log(GPR_ERROR,
- "Waiting finished but not started? Bad on_complete callback");
- abort();
-}
-
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
- const void *server_transport_data) {
+ const void *server_transport_data,
+ grpc_transport_op *initial_op) {
call_data *calld = elem->call_data;
+ /* TODO(ctiller): is there something useful we can do here? */
+ GPR_ASSERT(initial_op == NULL);
+
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
GPR_ASSERT(server_transport_data == NULL);
calld->elem = elem;
calld->state = CALL_CREATED;
calld->deadline = gpr_inf_future;
- calld->s.waiting.on_complete = error_bad_on_complete;
- calld->s.waiting.on_complete_user_data = NULL;
- grpc_metadata_buffer_init(&calld->pending_metadata);
}
/* Destructor for call_data */
static void destroy_call_elem(grpc_call_element *elem) {
call_data *calld = elem->call_data;
- /* if the metadata buffer is not flushed, destroy it here. */
- grpc_metadata_buffer_destroy(&calld->pending_metadata, GRPC_OP_OK);
/* if the call got activated, we need to destroy the child stack also, and
remove it from the in-flight requests tracked by the child_entry we
picked */
if (calld->state == CALL_ACTIVE) {
grpc_child_call_destroy(calld->s.active.child_call);
}
+ GPR_ASSERT(calld->state != CALL_WAITING);
}
/* Constructor for channel_data */
@@ -423,7 +380,6 @@ static void init_channel_elem(grpc_channel_element *elem,
grpc_mdctx *metadata_context, int is_first,
int is_last) {
channel_data *chand = elem->channel_data;
- char temp[GPR_LTOA_MIN_BUFSIZE];
GPR_ASSERT(!is_first);
GPR_ASSERT(is_last);
@@ -437,10 +393,7 @@ static void init_channel_elem(grpc_channel_element *elem,
chand->transport_setup = NULL;
chand->transport_setup_initiated = 0;
chand->args = grpc_channel_args_copy(args);
-
- gpr_ltoa(GRPC_STATUS_CANCELLED, temp);
- chand->cancel_status =
- grpc_mdelem_from_strings(metadata_context, "grpc-status", temp);
+ chand->mdctx = metadata_context;
}
/* Destructor for channel_data */
@@ -455,7 +408,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
grpc_channel_args_destroy(chand->args);
- grpc_mdelem_unref(chand->cancel_status);
gpr_mu_destroy(&chand->mu);
GPR_ASSERT(chand->waiting_child_count == 0);
@@ -463,9 +415,10 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_client_channel_filter = {
- call_op, channel_op, sizeof(call_data),
- init_call_elem, destroy_call_elem, sizeof(channel_data),
- init_channel_elem, destroy_channel_elem, "client-channel", };
+ cc_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
+ destroy_call_elem, sizeof(channel_data), init_channel_elem,
+ destroy_channel_elem, "client-channel",
+};
grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
grpc_channel_stack *channel_stack, grpc_transport *transport,
@@ -481,7 +434,7 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
call_data **waiting_children;
size_t waiting_child_count;
size_t i;
- grpc_call_op *call_ops;
+ grpc_transport_op *call_ops;
/* build the child filter stack */
child_filters = gpr_malloc(sizeof(grpc_channel_filter *) * num_child_filters);
@@ -517,19 +470,13 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
chand->waiting_child_count = 0;
chand->waiting_child_capacity = 0;
- call_ops = gpr_malloc(sizeof(grpc_call_op) * waiting_child_count);
+ call_ops = gpr_malloc(sizeof(*call_ops) * waiting_child_count);
for (i = 0; i < waiting_child_count; i++) {
- call_ops[i].type = GRPC_SEND_START;
- call_ops[i].dir = GRPC_CALL_DOWN;
- call_ops[i].flags = waiting_children[i]->s.waiting.start_flags;
- call_ops[i].done_cb = waiting_children[i]->s.waiting.on_complete;
- call_ops[i].user_data =
- waiting_children[i]->s.waiting.on_complete_user_data;
- call_ops[i].data.start.pollset = waiting_children[i]->s.waiting.pollset;
+ call_ops[i] = waiting_children[i]->s.waiting_op;
if (!prepare_activate(waiting_children[i]->elem, chand->active_child)) {
waiting_children[i] = NULL;
- call_ops[i].done_cb(call_ops[i].user_data, GRPC_OP_ERROR);
+ grpc_transport_op_finish_with_failure(&call_ops[i]);
}
}