diff options
Diffstat (limited to 'src/core/channel/client_channel.c')
-rw-r--r-- | src/core/channel/client_channel.c | 313 |
1 files changed, 130 insertions, 183 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 9791f98be8..78f8d06d89 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -38,7 +38,6 @@ #include "src/core/channel/channel_args.h" #include "src/core/channel/child_channel.h" #include "src/core/channel/connected_channel.h" -#include "src/core/channel/metadata_buffer.h" #include "src/core/iomgr/iomgr.h" #include "src/core/support/string.h" #include <grpc/support/alloc.h> @@ -59,6 +58,7 @@ typedef struct { /* the sending child (may be null) */ grpc_child_channel *active_child; + grpc_mdctx *mdctx; /* calls waiting for a channel to be ready */ call_data **waiting_children; @@ -70,9 +70,6 @@ typedef struct { int transport_setup_initiated; grpc_channel_args *args; - - /* metadata cache */ - grpc_mdelem *cancel_status; } channel_data; typedef enum { @@ -87,19 +84,17 @@ struct call_data { grpc_call_element *elem; call_state state; - grpc_metadata_buffer pending_metadata; gpr_timespec deadline; union { struct { /* our child call stack */ grpc_child_call *child_call; } active; + grpc_transport_op waiting_op; struct { - void (*on_complete)(void *user_data, grpc_op_error error); - void *on_complete_user_data; - gpr_uint32 start_flags; - grpc_pollset *pollset; - } waiting; + grpc_linked_mdelem status; + grpc_linked_mdelem details; + } cancelled; } s; }; @@ -113,89 +108,23 @@ static int prepare_activate(grpc_call_element *elem, calld->state = CALL_ACTIVE; /* create a child call */ - calld->s.active.child_call = grpc_child_channel_create_call(on_child, elem); + /* TODO(ctiller): pass the waiting op down here */ + calld->s.active.child_call = + grpc_child_channel_create_call(on_child, elem, NULL); return 1; } -static void do_nothing(void *ignored, grpc_op_error error) {} - -static void complete_activate(grpc_call_element *elem, grpc_call_op *op) { +static void complete_activate(grpc_call_element *elem, grpc_transport_op *op) { call_data *calld = elem->call_data; grpc_call_element *child_elem = grpc_child_call_get_top_element(calld->s.active.child_call); GPR_ASSERT(calld->state == CALL_ACTIVE); - /* sending buffered metadata down the stack before the start call */ - grpc_metadata_buffer_flush(&calld->pending_metadata, child_elem); - - if (gpr_time_cmp(calld->deadline, gpr_inf_future) != 0) { - grpc_call_op dop; - dop.type = GRPC_SEND_DEADLINE; - dop.dir = GRPC_CALL_DOWN; - dop.flags = 0; - dop.data.deadline = calld->deadline; - dop.done_cb = do_nothing; - dop.user_data = NULL; - child_elem->filter->call_op(child_elem, elem, &dop); - } - /* continue the start call down the stack, this nees to happen after metadata are flushed*/ - child_elem->filter->call_op(child_elem, elem, op); -} - -static void start_rpc(grpc_call_element *elem, grpc_call_op *op) { - call_data *calld = elem->call_data; - channel_data *chand = elem->channel_data; - gpr_mu_lock(&chand->mu); - if (calld->state == CALL_CANCELLED) { - gpr_mu_unlock(&chand->mu); - op->done_cb(op->user_data, GRPC_OP_ERROR); - return; - } - GPR_ASSERT(calld->state == CALL_CREATED); - calld->state = CALL_WAITING; - if (chand->active_child) { - /* channel is connected - use the connected stack */ - if (prepare_activate(elem, chand->active_child)) { - gpr_mu_unlock(&chand->mu); - /* activate the request (pass it down) outside the lock */ - complete_activate(elem, op); - } else { - gpr_mu_unlock(&chand->mu); - } - } else { - /* check to see if we should initiate a connection (if we're not already), - but don't do so until outside the lock to avoid re-entrancy problems if - the callback is immediate */ - int initiate_transport_setup = 0; - if (!chand->transport_setup_initiated) { - chand->transport_setup_initiated = 1; - initiate_transport_setup = 1; - } - /* add this call to the waiting set to be resumed once we have a child - channel stack, growing the waiting set if needed */ - if (chand->waiting_child_count == chand->waiting_child_capacity) { - chand->waiting_child_capacity = - GPR_MAX(chand->waiting_child_capacity * 2, 8); - chand->waiting_children = - gpr_realloc(chand->waiting_children, - chand->waiting_child_capacity * sizeof(call_data *)); - } - calld->s.waiting.on_complete = op->done_cb; - calld->s.waiting.on_complete_user_data = op->user_data; - calld->s.waiting.start_flags = op->flags; - calld->s.waiting.pollset = op->data.start.pollset; - chand->waiting_children[chand->waiting_child_count++] = calld; - gpr_mu_unlock(&chand->mu); - - /* finally initiate transport setup if needed */ - if (initiate_transport_setup) { - grpc_transport_setup_initiate(chand->transport_setup); - } - } + child_elem->filter->start_transport_op(child_elem, op); } static void remove_waiting_child(channel_data *chand, call_data *calld) { @@ -210,94 +139,128 @@ static void remove_waiting_child(channel_data *chand, call_data *calld) { chand->waiting_child_count = new_count; } -static void send_up_cancelled_ops(grpc_call_element *elem) { - grpc_call_op finish_op; +static void handle_op_after_cancellation(grpc_call_element *elem, + grpc_transport_op *op) { + call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - /* send up a synthesized status */ - finish_op.type = GRPC_RECV_METADATA; - finish_op.dir = GRPC_CALL_UP; - finish_op.flags = 0; - finish_op.data.metadata = grpc_mdelem_ref(chand->cancel_status); - finish_op.done_cb = do_nothing; - finish_op.user_data = NULL; - grpc_call_next_op(elem, &finish_op); - /* send up a finish */ - finish_op.type = GRPC_RECV_FINISH; - finish_op.dir = GRPC_CALL_UP; - finish_op.flags = 0; - finish_op.done_cb = do_nothing; - finish_op.user_data = NULL; - grpc_call_next_op(elem, &finish_op); + if (op->send_ops) { + op->on_done_send(op->send_user_data, 0); + } + if (op->recv_ops) { + char status[GPR_LTOA_MIN_BUFSIZE]; + grpc_metadata_batch mdb; + gpr_ltoa(GRPC_STATUS_CANCELLED, status); + calld->s.cancelled.status.md = + grpc_mdelem_from_strings(chand->mdctx, "grpc-status", status); + calld->s.cancelled.details.md = + grpc_mdelem_from_strings(chand->mdctx, "grpc-message", "Cancelled"); + calld->s.cancelled.status.prev = calld->s.cancelled.details.next = NULL; + calld->s.cancelled.status.next = &calld->s.cancelled.details; + calld->s.cancelled.details.prev = &calld->s.cancelled.status; + mdb.list.head = &calld->s.cancelled.status; + mdb.list.tail = &calld->s.cancelled.details; + mdb.garbage.head = mdb.garbage.tail = NULL; + mdb.deadline = gpr_inf_future; + grpc_sopb_add_metadata(op->recv_ops, mdb); + *op->recv_state = GRPC_STREAM_CLOSED; + op->on_done_recv(op->recv_user_data, 1); + } } -static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) { +static void cc_start_transport_op(grpc_call_element *elem, + grpc_transport_op *op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; grpc_call_element *child_elem; + grpc_transport_op waiting_op; + GPR_ASSERT(elem->filter == &grpc_client_channel_filter); + GRPC_CALL_LOG_OP(GPR_INFO, elem, op); gpr_mu_lock(&chand->mu); switch (calld->state) { case CALL_ACTIVE: child_elem = grpc_child_call_get_top_element(calld->s.active.child_call); gpr_mu_unlock(&chand->mu); - child_elem->filter->call_op(child_elem, elem, op); - return; /* early out */ - case CALL_WAITING: - remove_waiting_child(chand, calld); - calld->state = CALL_CANCELLED; - gpr_mu_unlock(&chand->mu); - send_up_cancelled_ops(elem); - calld->s.waiting.on_complete(calld->s.waiting.on_complete_user_data, - GRPC_OP_ERROR); - return; /* early out */ - case CALL_CREATED: - calld->state = CALL_CANCELLED; - gpr_mu_unlock(&chand->mu); - send_up_cancelled_ops(elem); - return; /* early out */ - case CALL_CANCELLED: - gpr_mu_unlock(&chand->mu); - return; /* early out */ - } - gpr_log(GPR_ERROR, "should never reach here"); - abort(); -} - -static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, - grpc_call_op *op) { - call_data *calld = elem->call_data; - GPR_ASSERT(elem->filter == &grpc_client_channel_filter); - GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - - switch (op->type) { - case GRPC_SEND_METADATA: - grpc_metadata_buffer_queue(&calld->pending_metadata, op); - break; - case GRPC_SEND_DEADLINE: - calld->deadline = op->data.deadline; - op->done_cb(op->user_data, GRPC_OP_OK); - break; - case GRPC_SEND_START: - /* filter out the start event to find which child to send on */ - start_rpc(elem, op); + child_elem->filter->start_transport_op(child_elem, op); break; - case GRPC_CANCEL_OP: - cancel_rpc(elem, op); + case CALL_CREATED: + if (op->cancel_with_status != GRPC_STATUS_OK) { + calld->state = CALL_CANCELLED; + gpr_mu_unlock(&chand->mu); + handle_op_after_cancellation(elem, op); + } else { + calld->state = CALL_WAITING; + if (chand->active_child) { + /* channel is connected - use the connected stack */ + if (prepare_activate(elem, chand->active_child)) { + gpr_mu_unlock(&chand->mu); + /* activate the request (pass it down) outside the lock */ + complete_activate(elem, op); + } else { + gpr_mu_unlock(&chand->mu); + } + } else { + /* check to see if we should initiate a connection (if we're not + already), + but don't do so until outside the lock to avoid re-entrancy + problems if + the callback is immediate */ + int initiate_transport_setup = 0; + if (!chand->transport_setup_initiated) { + chand->transport_setup_initiated = 1; + initiate_transport_setup = 1; + } + /* add this call to the waiting set to be resumed once we have a child + channel stack, growing the waiting set if needed */ + if (chand->waiting_child_count == chand->waiting_child_capacity) { + chand->waiting_child_capacity = + GPR_MAX(chand->waiting_child_capacity * 2, 8); + chand->waiting_children = gpr_realloc( + chand->waiting_children, + chand->waiting_child_capacity * sizeof(call_data *)); + } + calld->s.waiting_op = *op; + chand->waiting_children[chand->waiting_child_count++] = calld; + gpr_mu_unlock(&chand->mu); + + /* finally initiate transport setup if needed */ + if (initiate_transport_setup) { + grpc_transport_setup_initiate(chand->transport_setup); + } + } + } break; - case GRPC_SEND_MESSAGE: - case GRPC_SEND_FINISH: - case GRPC_REQUEST_DATA: - if (calld->state == CALL_ACTIVE) { - grpc_call_element *child_elem = - grpc_child_call_get_top_element(calld->s.active.child_call); - child_elem->filter->call_op(child_elem, elem, op); + case CALL_WAITING: + if (op->cancel_with_status != GRPC_STATUS_OK) { + waiting_op = calld->s.waiting_op; + remove_waiting_child(chand, calld); + calld->state = CALL_CANCELLED; + gpr_mu_unlock(&chand->mu); + handle_op_after_cancellation(elem, &waiting_op); + handle_op_after_cancellation(elem, op); } else { - op->done_cb(op->user_data, GRPC_OP_ERROR); + GPR_ASSERT((calld->s.waiting_op.send_ops == NULL) != + (op->send_ops == NULL)); + GPR_ASSERT((calld->s.waiting_op.recv_ops == NULL) != + (op->recv_ops == NULL)); + if (op->send_ops) { + calld->s.waiting_op.send_ops = op->send_ops; + calld->s.waiting_op.is_last_send = op->is_last_send; + calld->s.waiting_op.on_done_send = op->on_done_send; + calld->s.waiting_op.send_user_data = op->send_user_data; + } + if (op->recv_ops) { + calld->s.waiting_op.recv_ops = op->recv_ops; + calld->s.waiting_op.recv_state = op->recv_state; + calld->s.waiting_op.on_done_recv = op->on_done_recv; + calld->s.waiting_op.recv_user_data = op->recv_user_data; + } + gpr_mu_unlock(&chand->mu); } break; - default: - GPR_ASSERT(op->dir == GRPC_CALL_UP); - grpc_call_next_op(elem, op); + case CALL_CANCELLED: + gpr_mu_unlock(&chand->mu); + handle_op_after_cancellation(elem, op); break; } } @@ -382,39 +345,33 @@ static void channel_op(grpc_channel_element *elem, } } -static void error_bad_on_complete(void *arg, grpc_op_error error) { - gpr_log(GPR_ERROR, - "Waiting finished but not started? Bad on_complete callback"); - abort(); -} - /* Constructor for call_data */ static void init_call_elem(grpc_call_element *elem, - const void *server_transport_data) { + const void *server_transport_data, + grpc_transport_op *initial_op) { call_data *calld = elem->call_data; + /* TODO(ctiller): is there something useful we can do here? */ + GPR_ASSERT(initial_op == NULL); + GPR_ASSERT(elem->filter == &grpc_client_channel_filter); GPR_ASSERT(server_transport_data == NULL); calld->elem = elem; calld->state = CALL_CREATED; calld->deadline = gpr_inf_future; - calld->s.waiting.on_complete = error_bad_on_complete; - calld->s.waiting.on_complete_user_data = NULL; - grpc_metadata_buffer_init(&calld->pending_metadata); } /* Destructor for call_data */ static void destroy_call_elem(grpc_call_element *elem) { call_data *calld = elem->call_data; - /* if the metadata buffer is not flushed, destroy it here. */ - grpc_metadata_buffer_destroy(&calld->pending_metadata, GRPC_OP_OK); /* if the call got activated, we need to destroy the child stack also, and remove it from the in-flight requests tracked by the child_entry we picked */ if (calld->state == CALL_ACTIVE) { grpc_child_call_destroy(calld->s.active.child_call); } + GPR_ASSERT(calld->state != CALL_WAITING); } /* Constructor for channel_data */ @@ -423,7 +380,6 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_mdctx *metadata_context, int is_first, int is_last) { channel_data *chand = elem->channel_data; - char temp[GPR_LTOA_MIN_BUFSIZE]; GPR_ASSERT(!is_first); GPR_ASSERT(is_last); @@ -437,10 +393,7 @@ static void init_channel_elem(grpc_channel_element *elem, chand->transport_setup = NULL; chand->transport_setup_initiated = 0; chand->args = grpc_channel_args_copy(args); - - gpr_ltoa(GRPC_STATUS_CANCELLED, temp); - chand->cancel_status = - grpc_mdelem_from_strings(metadata_context, "grpc-status", temp); + chand->mdctx = metadata_context; } /* Destructor for channel_data */ @@ -455,7 +408,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } grpc_channel_args_destroy(chand->args); - grpc_mdelem_unref(chand->cancel_status); gpr_mu_destroy(&chand->mu); GPR_ASSERT(chand->waiting_child_count == 0); @@ -463,9 +415,10 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } const grpc_channel_filter grpc_client_channel_filter = { - call_op, channel_op, sizeof(call_data), - init_call_elem, destroy_call_elem, sizeof(channel_data), - init_channel_elem, destroy_channel_elem, "client-channel", }; + cc_start_transport_op, channel_op, sizeof(call_data), init_call_elem, + destroy_call_elem, sizeof(channel_data), init_channel_elem, + destroy_channel_elem, "client-channel", +}; grpc_transport_setup_result grpc_client_channel_transport_setup_complete( grpc_channel_stack *channel_stack, grpc_transport *transport, @@ -481,7 +434,7 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete( call_data **waiting_children; size_t waiting_child_count; size_t i; - grpc_call_op *call_ops; + grpc_transport_op *call_ops; /* build the child filter stack */ child_filters = gpr_malloc(sizeof(grpc_channel_filter *) * num_child_filters); @@ -517,19 +470,13 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete( chand->waiting_child_count = 0; chand->waiting_child_capacity = 0; - call_ops = gpr_malloc(sizeof(grpc_call_op) * waiting_child_count); + call_ops = gpr_malloc(sizeof(*call_ops) * waiting_child_count); for (i = 0; i < waiting_child_count; i++) { - call_ops[i].type = GRPC_SEND_START; - call_ops[i].dir = GRPC_CALL_DOWN; - call_ops[i].flags = waiting_children[i]->s.waiting.start_flags; - call_ops[i].done_cb = waiting_children[i]->s.waiting.on_complete; - call_ops[i].user_data = - waiting_children[i]->s.waiting.on_complete_user_data; - call_ops[i].data.start.pollset = waiting_children[i]->s.waiting.pollset; + call_ops[i] = waiting_children[i]->s.waiting_op; if (!prepare_activate(waiting_children[i]->elem, chand->active_child)) { waiting_children[i] = NULL; - call_ops[i].done_cb(call_ops[i].user_data, GRPC_OP_ERROR); + grpc_transport_op_finish_with_failure(&call_ops[i]); } } |