diff options
Diffstat (limited to 'src/core')
28 files changed, 954 insertions, 1383 deletions
diff --git a/src/core/channel/census_filter.c b/src/core/channel/census_filter.c index 9c0c20af22..3e0fc39fc9 100644 --- a/src/core/channel/census_filter.c +++ b/src/core/channel/census_filter.c @@ -49,6 +49,11 @@ typedef struct call_data { census_op_id op_id; census_rpc_stats stats; gpr_timespec start_ts; + + /* recv callback */ + grpc_stream_op_buffer *recv_ops; + void (*on_done_recv)(void *user_data, int success); + void *recv_user_data; } call_data; typedef struct channel_data { @@ -60,55 +65,58 @@ static void init_rpc_stats(census_rpc_stats* stats) { stats->cnt = 1; } -static void extract_and_annotate_method_tag(grpc_call_op* op, call_data* calld, +static void extract_and_annotate_method_tag(grpc_stream_op_buffer* sopb, call_data* calld, channel_data* chand) { grpc_linked_mdelem* m; - for (m = op->data.metadata.list.head; m != NULL; m = m->next) { - if (m->md->key == chand->path_str) { - gpr_log(GPR_DEBUG, "%s", (const char*)GPR_SLICE_START_PTR(m->md->value->slice)); - census_add_method_tag( - calld->op_id, (const char*)GPR_SLICE_START_PTR(m->md->value->slice)); + size_t i; + for (i = 0; i < sopb->nops; i++) { + grpc_stream_op * op = &sopb->ops[i]; + if (op->type != GRPC_OP_METADATA) continue; + for (m = op->data.metadata.list.head; m != NULL; m = m->next) { + if (m->md->key == chand->path_str) { + gpr_log(GPR_DEBUG, "%s", (const char*)GPR_SLICE_START_PTR(m->md->value->slice)); + census_add_method_tag( + calld->op_id, (const char*)GPR_SLICE_START_PTR(m->md->value->slice)); + } } } } -static void client_call_op(grpc_call_element* elem, - grpc_call_element* from_elem, grpc_call_op* op) { +static void client_start_transport_op(grpc_call_element* elem, grpc_transport_op* op) { call_data* calld = elem->call_data; channel_data* chand = elem->channel_data; GPR_ASSERT(calld != NULL); GPR_ASSERT(chand != NULL); GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0)); - switch (op->type) { - case GRPC_SEND_METADATA: - extract_and_annotate_method_tag(op, calld, chand); - break; - case GRPC_RECV_FINISH: - /* Should we stop timing the rpc here? */ - break; - default: - break; + if (op->send_ops) { + extract_and_annotate_method_tag(op->send_ops, calld, chand); } - /* Always pass control up or down the stack depending on op->dir */ grpc_call_next_op(elem, op); } -static void server_call_op(grpc_call_element* elem, - grpc_call_element* from_elem, grpc_call_op* op) { +static void server_on_done_recv(void *ptr, int success) { + grpc_call_element *elem = ptr; + call_data* calld = elem->call_data; + channel_data* chand = elem->channel_data; + if (success) { + extract_and_annotate_method_tag(calld->recv_ops, calld, chand); + } + calld->on_done_recv(calld->recv_user_data, success); +} + +static void server_start_transport_op(grpc_call_element* elem, grpc_transport_op* op) { call_data* calld = elem->call_data; channel_data* chand = elem->channel_data; GPR_ASSERT(calld != NULL); GPR_ASSERT(chand != NULL); GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0)); - switch (op->type) { - case GRPC_RECV_METADATA: - extract_and_annotate_method_tag(op, calld, chand); - break; - case GRPC_SEND_FINISH: - /* Should we stop timing the rpc here? */ - break; - default: - break; + if (op->recv_ops) { + /* substitute our callback for the op callback */ + calld->recv_ops = op->recv_ops; + calld->on_done_recv = op->on_done_recv; + calld->recv_user_data = op->recv_user_data; + op->on_done_recv = server_on_done_recv; + op->recv_user_data = elem; } /* Always pass control up or down the stack depending on op->dir */ grpc_call_next_op(elem, op); @@ -180,11 +188,11 @@ static void destroy_channel_elem(grpc_channel_element* elem) { } const grpc_channel_filter grpc_client_census_filter = { - client_call_op, channel_op, sizeof(call_data), client_init_call_elem, + client_start_transport_op, channel_op, sizeof(call_data), client_init_call_elem, client_destroy_call_elem, sizeof(channel_data), init_channel_elem, destroy_channel_elem, "census-client"}; const grpc_channel_filter grpc_server_census_filter = { - server_call_op, channel_op, sizeof(call_data), server_init_call_elem, + server_start_transport_op, channel_op, sizeof(call_data), server_init_call_elem, server_destroy_call_elem, sizeof(channel_data), init_channel_elem, destroy_channel_elem, "census-server"}; diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c index 3a3a3a75b7..c121e27005 100644 --- a/src/core/channel/channel_stack.c +++ b/src/core/channel/channel_stack.c @@ -35,6 +35,7 @@ #include <grpc/support/log.h> #include <stdlib.h> +#include <string.h> int grpc_trace_channel = 0; @@ -181,12 +182,9 @@ void grpc_call_stack_destroy(grpc_call_stack *stack) { } } -void grpc_call_next_op(grpc_call_element *elem, grpc_call_op *op) { - grpc_call_element *next_elem = elem + op->dir; - if (op->type == GRPC_SEND_METADATA || op->type == GRPC_RECV_METADATA) { - grpc_metadata_batch_assert_ok(&op->data.metadata); - } - next_elem->filter->call_op(next_elem, elem, op); +void grpc_call_next_op(grpc_call_element *elem, grpc_transport_op *op) { + grpc_call_element *next_elem = elem + 1; + next_elem->filter->start_transport_op(next_elem, op); } void grpc_channel_next_op(grpc_channel_element *elem, grpc_channel_op *op) { @@ -205,39 +203,15 @@ grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem) { sizeof(grpc_call_stack))); } -static void do_nothing(void *user_data, grpc_op_error error) {} - void grpc_call_element_send_cancel(grpc_call_element *cur_elem) { - grpc_call_op cancel_op; - cancel_op.type = GRPC_CANCEL_OP; - cancel_op.dir = GRPC_CALL_DOWN; - cancel_op.done_cb = do_nothing; - cancel_op.user_data = NULL; - cancel_op.flags = 0; - cancel_op.bind_pollset = NULL; - grpc_call_next_op(cur_elem, &cancel_op); -} - -void grpc_call_element_send_finish(grpc_call_element *cur_elem) { - grpc_call_op finish_op; - finish_op.type = GRPC_SEND_FINISH; - finish_op.dir = GRPC_CALL_DOWN; - finish_op.done_cb = do_nothing; - finish_op.user_data = NULL; - finish_op.flags = 0; - finish_op.bind_pollset = NULL; - grpc_call_next_op(cur_elem, &finish_op); + grpc_transport_op op; + memset(&op, 0, sizeof(op)); + op.cancel_with_status = GRPC_STATUS_CANCELLED; + grpc_call_next_op(cur_elem, &op); } void grpc_call_element_recv_status(grpc_call_element *cur_elem, grpc_status_code status, const char *message) { - grpc_call_op op; - op.type = GRPC_RECV_SYNTHETIC_STATUS; - op.dir = GRPC_CALL_UP; - op.done_cb = do_nothing; - op.user_data = NULL; - op.data.synthetic_status.status = status; - op.data.synthetic_status.message = message; - grpc_call_next_op(cur_elem, &op); + abort(); } diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h index addc92b272..75897ff651 100644 --- a/src/core/channel/channel_stack.h +++ b/src/core/channel/channel_stack.h @@ -51,78 +51,11 @@ typedef struct grpc_channel_element grpc_channel_element; typedef struct grpc_call_element grpc_call_element; -/* Call operations - things that can be sent and received. - - Threading: - SEND, RECV, and CANCEL ops can be active on a call at the same time, but - only one SEND, one RECV, and one CANCEL can be active at a time. - - If state is shared between send/receive/cancel operations, it is up to - filters to provide their own protection around that. */ -typedef enum { - /* send metadata to the channels peer */ - GRPC_SEND_METADATA, - /* send a message to the channels peer */ - GRPC_SEND_MESSAGE, - /* send a pre-formatted message to the channels peer */ - GRPC_SEND_PREFORMATTED_MESSAGE, - /* send half-close to the channels peer */ - GRPC_SEND_FINISH, - /* request that more data be allowed through flow control */ - GRPC_REQUEST_DATA, - /* metadata was received from the channels peer */ - GRPC_RECV_METADATA, - /* a message was received from the channels peer */ - GRPC_RECV_MESSAGE, - /* half-close was received from the channels peer */ - GRPC_RECV_HALF_CLOSE, - /* full close was received from the channels peer */ - GRPC_RECV_FINISH, - /* a status has been sythesized locally */ - GRPC_RECV_SYNTHETIC_STATUS, - /* the call has been abnormally terminated */ - GRPC_CANCEL_OP -} grpc_call_op_type; - /* The direction of the call. The values of the enums (1, -1) matter here - they are used to increment or decrement a pointer to find the next element to call */ typedef enum { GRPC_CALL_DOWN = 1, GRPC_CALL_UP = -1 } grpc_call_dir; -/* A single filterable operation to be performed on a call */ -typedef struct { - /* The type of operation we're performing */ - grpc_call_op_type type; - /* The directionality of this call - does the operation begin at the bottom - of the stack and flow up, or does the operation start at the top of the - stack and flow down through the filters. */ - grpc_call_dir dir; - - /* Flags associated with this call: see GRPC_WRITE_* in grpc.h */ - gpr_uint32 flags; - - /* Argument data, matching up with grpc_call_op_type names */ - union { - grpc_byte_buffer *message; - grpc_metadata_batch metadata; - struct { - grpc_status_code status; - const char *message; - } synthetic_status; - } data; - - grpc_pollset *bind_pollset; - - /* Must be called when processing of this call-op is complete. - Signature chosen to match transport flow control callbacks */ - void (*done_cb)(void *user_data, grpc_op_error error); - /* User data to be passed into done_cb */ - void *user_data; -} grpc_call_op; - -/* returns a string representation of op, that can be destroyed with gpr_free */ -char *grpc_call_op_string(grpc_call_op *op); - typedef enum { /* send a goaway message to remote channels indicating that we are going to disconnect in the future */ @@ -170,8 +103,7 @@ typedef struct { typedef struct { /* Called to eg. send/receive data on a call. See grpc_call_next_op on how to call the next element in the stack */ - void (*call_op)(grpc_call_element *elem, grpc_call_element *from_elem, - grpc_call_op *op); + void (*start_transport_op)(grpc_call_element *elem, grpc_transport_op *op); /* Called to handle channel level operations - e.g. new calls, or transport closure. See grpc_channel_next_op on how to call the next element in the stack */ @@ -272,8 +204,8 @@ void grpc_call_stack_init(grpc_channel_stack *channel_stack, /* Destroy a call stack */ void grpc_call_stack_destroy(grpc_call_stack *stack); -/* Call the next operation (depending on call directionality) in a call stack */ -void grpc_call_next_op(grpc_call_element *elem, grpc_call_op *op); +/* Call the next operation in a call stack */ +void grpc_call_next_op(grpc_call_element *elem, grpc_transport_op *op); /* Call the next operation (depending on call directionality) in a channel stack */ void grpc_channel_next_op(grpc_channel_element *elem, grpc_channel_op *op); @@ -285,10 +217,9 @@ grpc_channel_stack *grpc_channel_stack_from_top_element( grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem); void grpc_call_log_op(char *file, int line, gpr_log_severity severity, - grpc_call_element *elem, grpc_call_op *op); + grpc_call_element *elem, grpc_transport_op *op); void grpc_call_element_send_cancel(grpc_call_element *cur_elem); -void grpc_call_element_send_finish(grpc_call_element *cur_elem); void grpc_call_element_recv_status(grpc_call_element *cur_elem, grpc_status_code status, const char *message); diff --git a/src/core/channel/child_channel.c b/src/core/channel/child_channel.c index 2cb03829c7..244417384a 100644 --- a/src/core/channel/child_channel.c +++ b/src/core/channel/child_channel.c @@ -61,22 +61,11 @@ typedef struct { } lb_channel_data; typedef struct { - grpc_call_element *back; grpc_child_channel *channel; } lb_call_data; -static void lb_call_op(grpc_call_element *elem, grpc_call_element *from_elem, - grpc_call_op *op) { - lb_call_data *calld = elem->call_data; - - switch (op->dir) { - case GRPC_CALL_UP: - calld->back->filter->call_op(calld->back, elem, op); - break; - case GRPC_CALL_DOWN: - grpc_call_next_op(elem, op); - break; - } +static void lb_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) { + grpc_call_next_op(elem, op); } /* Currently we assume all channel operations should just be pushed up. */ @@ -165,7 +154,7 @@ static void lb_destroy_channel_elem(grpc_channel_element *elem) { } const grpc_channel_filter grpc_child_channel_top_filter = { - lb_call_op, lb_channel_op, sizeof(lb_call_data), + lb_start_transport_op, lb_channel_op, sizeof(lb_call_data), lb_init_call_elem, lb_destroy_call_elem, sizeof(lb_channel_data), lb_init_channel_elem, lb_destroy_channel_elem, "child-channel", }; @@ -282,7 +271,6 @@ grpc_child_call *grpc_child_channel_create_call(grpc_child_channel *channel, lbelem = LINK_BACK_ELEM_FROM_CALL(stk); lbchand = lbelem->channel_data; lbcalld = lbelem->call_data; - lbcalld->back = parent; lbcalld->channel = channel; gpr_mu_lock(&lbchand->mu); diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index bc481e59ca..6ad50cb944 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -82,7 +82,7 @@ struct call_data { /* owning element */ grpc_call_element *elem; - gpr_uint8 got_first_send; + gpr_uint8 got_first_op; call_state state; gpr_timespec deadline; @@ -91,7 +91,7 @@ struct call_data { /* our child call stack */ grpc_child_call *child_call; } active; - grpc_call_op waiting_op; + grpc_transport_op waiting_op; } s; }; @@ -110,9 +110,7 @@ static int prepare_activate(grpc_call_element *elem, return 1; } -static void do_nothing(void *ignored, grpc_op_error error) {} - -static void complete_activate(grpc_call_element *elem, grpc_call_op *op) { +static void complete_activate(grpc_call_element *elem, grpc_transport_op *op) { call_data *calld = elem->call_data; grpc_call_element *child_elem = grpc_child_call_get_top_element(calld->s.active.child_call); @@ -121,17 +119,16 @@ static void complete_activate(grpc_call_element *elem, grpc_call_op *op) { /* continue the start call down the stack, this nees to happen after metadata are flushed*/ - child_elem->filter->call_op(child_elem, elem, op); + child_elem->filter->start_transport_op(child_elem, op); } -static void start_rpc(grpc_call_element *elem, grpc_call_op *op) { +static void start_rpc(grpc_call_element *elem, grpc_transport_op *op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; gpr_mu_lock(&chand->mu); if (calld->state == CALL_CANCELLED) { gpr_mu_unlock(&chand->mu); - grpc_metadata_batch_destroy(&op->data.metadata); - op->done_cb(op->user_data, GRPC_OP_ERROR); + grpc_transport_op_finish_with_failure(op); return; } GPR_ASSERT(calld->state == CALL_CREATED); @@ -187,19 +184,10 @@ static void remove_waiting_child(channel_data *chand, call_data *calld) { } static void send_up_cancelled_ops(grpc_call_element *elem) { - grpc_call_op finish_op; - /* send up a synthesized status */ - grpc_call_element_recv_status(elem, GRPC_STATUS_CANCELLED, "Cancelled"); - /* send up a finish */ - finish_op.type = GRPC_RECV_FINISH; - finish_op.dir = GRPC_CALL_UP; - finish_op.flags = 0; - finish_op.done_cb = do_nothing; - finish_op.user_data = NULL; - grpc_call_next_op(elem, &finish_op); + abort(); } -static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) { +static void cancel_rpc(grpc_call_element *elem, grpc_transport_op *op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; grpc_call_element *child_elem; @@ -209,15 +197,13 @@ static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) { case CALL_ACTIVE: child_elem = grpc_child_call_get_top_element(calld->s.active.child_call); gpr_mu_unlock(&chand->mu); - child_elem->filter->call_op(child_elem, elem, op); + child_elem->filter->start_transport_op(child_elem, op); return; /* early out */ case CALL_WAITING: - grpc_metadata_batch_destroy(&calld->s.waiting_op.data.metadata); remove_waiting_child(chand, calld); calld->state = CALL_CANCELLED; gpr_mu_unlock(&chand->mu); send_up_cancelled_ops(elem); - calld->s.waiting_op.done_cb(calld->s.waiting_op.user_data, GRPC_OP_ERROR); return; /* early out */ case CALL_CREATED: calld->state = CALL_CANCELLED; @@ -232,40 +218,27 @@ static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) { abort(); } -static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, - grpc_call_op *op) { +static void cc_start_transport_op(grpc_call_element *elem, + grpc_transport_op *op) { call_data *calld = elem->call_data; GPR_ASSERT(elem->filter == &grpc_client_channel_filter); GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - switch (op->type) { - case GRPC_SEND_METADATA: - if (!calld->got_first_send) { - /* filter out the start event to find which child to send on */ - calld->got_first_send = 1; - start_rpc(elem, op); - } else { - grpc_call_next_op(elem, op); - } - break; - case GRPC_CANCEL_OP: - cancel_rpc(elem, op); - break; - case GRPC_SEND_MESSAGE: - case GRPC_SEND_FINISH: - case GRPC_REQUEST_DATA: - if (calld->state == CALL_ACTIVE) { - grpc_call_element *child_elem = - grpc_child_call_get_top_element(calld->s.active.child_call); - child_elem->filter->call_op(child_elem, elem, op); - } else { - op->done_cb(op->user_data, GRPC_OP_ERROR); - } - break; - default: - GPR_ASSERT(op->dir == GRPC_CALL_UP); - grpc_call_next_op(elem, op); - break; + if (op->cancel_with_status != GRPC_STATUS_OK) { + GPR_ASSERT(op->send_ops == NULL); + GPR_ASSERT(op->recv_ops == NULL); + + cancel_rpc(elem, op); + return; + } + + if (!calld->got_first_op) { + calld->got_first_op = 1; + start_rpc(elem, op); + } else { + grpc_call_element *child_elem = + grpc_child_call_get_top_element(calld->s.active.child_call); + child_elem->filter->start_transport_op(child_elem, op); } } @@ -359,7 +332,7 @@ static void init_call_elem(grpc_call_element *elem, calld->elem = elem; calld->state = CALL_CREATED; calld->deadline = gpr_inf_future; - calld->got_first_send = 0; + calld->got_first_op = 0; } /* Destructor for call_data */ @@ -372,9 +345,7 @@ static void destroy_call_elem(grpc_call_element *elem) { if (calld->state == CALL_ACTIVE) { grpc_child_call_destroy(calld->s.active.child_call); } - if (calld->state == CALL_WAITING) { - grpc_metadata_batch_destroy(&calld->s.waiting_op.data.metadata); - } + GPR_ASSERT(calld->state != CALL_WAITING); } /* Constructor for channel_data */ @@ -417,7 +388,7 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } const grpc_channel_filter grpc_client_channel_filter = { - call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, + cc_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, sizeof(channel_data), init_channel_elem, destroy_channel_elem, "client-channel", }; @@ -436,7 +407,7 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete( call_data **waiting_children; size_t waiting_child_count; size_t i; - grpc_call_op *call_ops; + grpc_transport_op *call_ops; /* build the child filter stack */ child_filters = gpr_malloc(sizeof(grpc_channel_filter *) * num_child_filters); @@ -472,13 +443,13 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete( chand->waiting_child_count = 0; chand->waiting_child_capacity = 0; - call_ops = gpr_malloc(sizeof(grpc_call_op) * waiting_child_count); + call_ops = gpr_malloc(sizeof(*call_ops) * waiting_child_count); for (i = 0; i < waiting_child_count; i++) { call_ops[i] = waiting_children[i]->s.waiting_op; if (!prepare_activate(waiting_children[i]->elem, chand->active_child)) { waiting_children[i] = NULL; - call_ops[i].done_cb(call_ops[i].user_data, GRPC_OP_ERROR); + grpc_transport_op_finish_with_failure(&call_ops[i]); } } diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c index 711274bfe1..9e2d92ffbc 100644 --- a/src/core/channel/connected_channel.c +++ b/src/core/channel/connected_channel.c @@ -45,24 +45,13 @@ #include <grpc/support/slice_buffer.h> #define MAX_BUFFER_LENGTH 8192 -/* the protobuf library will (by default) start warning at 100megs */ -#define DEFAULT_MAX_MESSAGE_LENGTH (100 * 1024 * 1024) typedef struct connected_channel_channel_data { grpc_transport *transport; - gpr_uint32 max_message_length; } channel_data; typedef struct connected_channel_call_data { - grpc_call_element *elem; - grpc_stream_op_buffer outgoing_sopb; - - gpr_uint32 max_message_length; - gpr_uint32 incoming_message_length; - gpr_uint8 reading_message; - gpr_uint8 got_read_close; - gpr_slice_buffer incoming_message; - gpr_uint32 outgoing_buffer_length_estimate; + void *unused; } call_data; /* We perform a small hack to locate transport data alongside the connected @@ -72,91 +61,15 @@ typedef struct connected_channel_call_data { #define CALL_DATA_FROM_TRANSPORT_STREAM(transport_stream) \ (((call_data *)(transport_stream)) - 1) -/* Copy the contents of a byte buffer into stream ops */ -static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer, - grpc_stream_op_buffer *sopb) { - size_t i; - - switch (byte_buffer->type) { - case GRPC_BB_SLICE_BUFFER: - for (i = 0; i < byte_buffer->data.slice_buffer.count; i++) { - gpr_slice slice = byte_buffer->data.slice_buffer.slices[i]; - gpr_slice_ref(slice); - grpc_sopb_add_slice(sopb, slice); - } - break; - } -} - -/* Flush queued stream operations onto the transport */ -static void end_bufferable_op(grpc_call_op *op, channel_data *chand, - call_data *calld, int is_last) { - size_t nops; - - if (op->flags & GRPC_WRITE_BUFFER_HINT) { - if (calld->outgoing_buffer_length_estimate < MAX_BUFFER_LENGTH) { - op->done_cb(op->user_data, GRPC_OP_OK); - return; - } - } - - calld->outgoing_buffer_length_estimate = 0; - grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb, op->user_data); - - nops = calld->outgoing_sopb.nops; - calld->outgoing_sopb.nops = 0; - grpc_transport_send_batch(chand->transport, - TRANSPORT_STREAM_FROM_CALL_DATA(calld), - calld->outgoing_sopb.ops, nops, is_last); -} - /* Intercept a call operation and either push it directly up or translate it into transport stream operations */ -static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, - grpc_call_op *op) { +static void con_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - if (op->bind_pollset) { - grpc_transport_add_to_pollset(chand->transport, op->bind_pollset); - } - - switch (op->type) { - case GRPC_SEND_METADATA: - grpc_sopb_add_metadata(&calld->outgoing_sopb, op->data.metadata); - end_bufferable_op(op, chand, calld, 0); - break; - case GRPC_SEND_MESSAGE: - grpc_sopb_add_begin_message(&calld->outgoing_sopb, - grpc_byte_buffer_length(op->data.message), - op->flags); - /* fall-through */ - case GRPC_SEND_PREFORMATTED_MESSAGE: - copy_byte_buffer_to_stream_ops(op->data.message, &calld->outgoing_sopb); - calld->outgoing_buffer_length_estimate += - (5 + grpc_byte_buffer_length(op->data.message)); - end_bufferable_op(op, chand, calld, 0); - break; - case GRPC_SEND_FINISH: - end_bufferable_op(op, chand, calld, 1); - break; - case GRPC_REQUEST_DATA: - /* re-arm window updates if they were disarmed by finish_message */ - grpc_transport_set_allow_window_updates( - chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), 1); - break; - case GRPC_CANCEL_OP: - grpc_transport_abort_stream(chand->transport, - TRANSPORT_STREAM_FROM_CALL_DATA(calld), - GRPC_STATUS_CANCELLED); - break; - default: - GPR_ASSERT(op->dir == GRPC_CALL_UP); - grpc_call_next_op(elem, op); - break; - } + grpc_transport_perform_op(chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), op); } /* Currently we assume all channel operations should just be pushed up. */ @@ -188,14 +101,6 @@ static void init_call_elem(grpc_call_element *elem, int r; GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); - calld->elem = elem; - grpc_sopb_init(&calld->outgoing_sopb); - - calld->reading_message = 0; - calld->got_read_close = 0; - calld->outgoing_buffer_length_estimate = 0; - calld->max_message_length = chand->max_message_length; - gpr_slice_buffer_init(&calld->incoming_message); r = grpc_transport_init_stream(chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), server_transport_data); @@ -207,8 +112,6 @@ static void destroy_call_elem(grpc_call_element *elem) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); - grpc_sopb_destroy(&calld->outgoing_sopb); - gpr_slice_buffer_destroy(&calld->incoming_message); grpc_transport_destroy_stream(chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld)); } @@ -218,28 +121,10 @@ static void init_channel_elem(grpc_channel_element *elem, const grpc_channel_args *args, grpc_mdctx *mdctx, int is_first, int is_last) { channel_data *cd = (channel_data *)elem->channel_data; - size_t i; GPR_ASSERT(!is_first); GPR_ASSERT(is_last); GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); cd->transport = NULL; - - cd->max_message_length = DEFAULT_MAX_MESSAGE_LENGTH; - if (args) { - for (i = 0; i < args->num_args; i++) { - if (0 == strcmp(args->args[i].key, GRPC_ARG_MAX_MESSAGE_LENGTH)) { - if (args->args[i].type != GRPC_ARG_INTEGER) { - gpr_log(GPR_ERROR, "%s ignored: it must be an integer", - GRPC_ARG_MAX_MESSAGE_LENGTH); - } else if (args->args[i].value.integer < 0) { - gpr_log(GPR_ERROR, "%s ignored: it must be >= 0", - GRPC_ARG_MAX_MESSAGE_LENGTH); - } else { - cd->max_message_length = args->args[i].value.integer; - } - } - } - } } /* Destructor for channel_data */ @@ -250,15 +135,10 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } const grpc_channel_filter grpc_connected_channel_filter = { - call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, + con_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, sizeof(channel_data), init_channel_elem, destroy_channel_elem, "connected", }; -static gpr_slice alloc_recv_buffer(void *user_data, grpc_transport *transport, - grpc_stream *stream, size_t size_hint) { - return gpr_slice_malloc(size_hint); -} - /* Transport callback to accept a new stream... calls up to handle it */ static void accept_stream(void *user_data, grpc_transport *transport, const void *transport_server_data) { @@ -276,168 +156,6 @@ static void accept_stream(void *user_data, grpc_transport *transport, channel_op(elem, NULL, &op); } -static void recv_error(channel_data *chand, call_data *calld, int line, - const char *message) { - gpr_log_message(__FILE__, line, GPR_LOG_SEVERITY_ERROR, message); - - if (chand->transport) { - grpc_transport_abort_stream(chand->transport, - TRANSPORT_STREAM_FROM_CALL_DATA(calld), - GRPC_STATUS_INVALID_ARGUMENT); - } -} - -static void do_nothing(void *calldata, grpc_op_error error) {} - -static void finish_message(channel_data *chand, call_data *calld) { - grpc_call_element *elem = calld->elem; - grpc_call_op call_op; - call_op.dir = GRPC_CALL_UP; - call_op.flags = 0; - /* if we got all the bytes for this message, call up the stack */ - call_op.type = GRPC_RECV_MESSAGE; - call_op.done_cb = do_nothing; - /* TODO(ctiller): this could be a lot faster if coded directly */ - call_op.data.message = grpc_byte_buffer_create(calld->incoming_message.slices, - calld->incoming_message.count); - gpr_slice_buffer_reset_and_unref(&calld->incoming_message); - - /* disable window updates until we get a request more from above */ - grpc_transport_set_allow_window_updates( - chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), 0); - - GPR_ASSERT(calld->incoming_message.count == 0); - calld->reading_message = 0; - grpc_call_next_op(elem, &call_op); -} - -static void got_metadata(grpc_call_element *elem, - grpc_metadata_batch metadata) { - grpc_call_op op; - op.type = GRPC_RECV_METADATA; - op.dir = GRPC_CALL_UP; - op.flags = 0; - op.data.metadata = metadata; - op.done_cb = do_nothing; - op.user_data = NULL; - - grpc_call_next_op(elem, &op); -} - -/* Handle incoming stream ops from the transport, translating them into - call_ops to pass up the call stack */ -static void recv_batch(void *user_data, grpc_transport *transport, - grpc_stream *stream, grpc_stream_op *ops, - size_t ops_count, grpc_stream_state final_state) { - call_data *calld = CALL_DATA_FROM_TRANSPORT_STREAM(stream); - grpc_call_element *elem = calld->elem; - channel_data *chand = elem->channel_data; - grpc_stream_op *stream_op; - grpc_call_op call_op; - size_t i; - gpr_uint32 length; - - GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); - - for (i = 0; i < ops_count; i++) { - stream_op = ops + i; - switch (stream_op->type) { - case GRPC_OP_FLOW_CTL_CB: - stream_op->data.flow_ctl_cb.cb(stream_op->data.flow_ctl_cb.arg, 1); - break; - case GRPC_NO_OP: - break; - case GRPC_OP_METADATA: - got_metadata(elem, stream_op->data.metadata); - break; - case GRPC_OP_BEGIN_MESSAGE: - /* can't begin a message when we're still reading a message */ - if (calld->reading_message) { - char *message = NULL; - gpr_asprintf(&message, - "Message terminated early; read %d bytes, expected %d", - (int)calld->incoming_message.length, - (int)calld->incoming_message_length); - recv_error(chand, calld, __LINE__, message); - gpr_free(message); - return; - } - /* stash away parameters, and prepare for incoming slices */ - length = stream_op->data.begin_message.length; - if (length > calld->max_message_length) { - char *message = NULL; - gpr_asprintf( - &message, - "Maximum message length of %d exceeded by a message of length %d", - calld->max_message_length, length); - recv_error(chand, calld, __LINE__, message); - gpr_free(message); - } else if (length > 0) { - calld->reading_message = 1; - calld->incoming_message_length = length; - } else { - finish_message(chand, calld); - } - break; - case GRPC_OP_SLICE: - if (GPR_SLICE_LENGTH(stream_op->data.slice) == 0) { - gpr_slice_unref(stream_op->data.slice); - break; - } - /* we have to be reading a message to know what to do here */ - if (!calld->reading_message) { - recv_error(chand, calld, __LINE__, - "Received payload data while not reading a message"); - return; - } - /* append the slice to the incoming buffer */ - gpr_slice_buffer_add(&calld->incoming_message, stream_op->data.slice); - if (calld->incoming_message.length > calld->incoming_message_length) { - /* if we got too many bytes, complain */ - char *message = NULL; - gpr_asprintf(&message, - "Receiving message overflow; read %d bytes, expected %d", - (int)calld->incoming_message.length, - (int)calld->incoming_message_length); - recv_error(chand, calld, __LINE__, message); - gpr_free(message); - return; - } else if (calld->incoming_message.length == - calld->incoming_message_length) { - finish_message(chand, calld); - } - } - } - /* if the stream closed, then call up the stack to let it know */ - if (!calld->got_read_close && (final_state == GRPC_STREAM_RECV_CLOSED || - final_state == GRPC_STREAM_CLOSED)) { - calld->got_read_close = 1; - if (calld->reading_message) { - char *message = NULL; - gpr_asprintf(&message, - "Last message truncated; read %d bytes, expected %d", - (int)calld->incoming_message.length, - (int)calld->incoming_message_length); - recv_error(chand, calld, __LINE__, message); - gpr_free(message); - } - call_op.type = GRPC_RECV_HALF_CLOSE; - call_op.dir = GRPC_CALL_UP; - call_op.flags = 0; - call_op.done_cb = do_nothing; - call_op.user_data = NULL; - grpc_call_next_op(elem, &call_op); - } - if (final_state == GRPC_STREAM_CLOSED) { - call_op.type = GRPC_RECV_FINISH; - call_op.dir = GRPC_CALL_UP; - call_op.flags = 0; - call_op.done_cb = do_nothing; - call_op.user_data = NULL; - grpc_call_next_op(elem, &call_op); - } -} - static void transport_goaway(void *user_data, grpc_transport *transport, grpc_status_code status, gpr_slice debug) { /* transport got goaway ==> call up and handle it */ @@ -470,7 +188,7 @@ static void transport_closed(void *user_data, grpc_transport *transport) { } const grpc_transport_callbacks connected_channel_transport_callbacks = { - alloc_recv_buffer, accept_stream, recv_batch, + accept_stream, transport_goaway, transport_closed, }; diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c index 56e12342d7..7287408401 100644 --- a/src/core/channel/http_client_filter.c +++ b/src/core/channel/http_client_filter.c @@ -39,6 +39,12 @@ typedef struct call_data { grpc_linked_mdelem scheme; grpc_linked_mdelem te_trailers; grpc_linked_mdelem content_type; + int sent_initial_metadata; + + int got_initial_metadata; + grpc_stream_op_buffer *recv_ops; + void (*on_done_recv)(void *user_data, int success); + void *recv_user_data; } call_data; typedef struct channel_data { @@ -64,22 +70,39 @@ static grpc_mdelem *client_filter(void *user_data, grpc_mdelem *md) { return md; } -/* Called either: - - in response to an API call (or similar) from above, to send something - - a network event (or similar) from below, to receive something - op contains type and call direction information, in addition to the data - that is being sent or received. */ -static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, - grpc_call_op *op) { +static void hc_on_recv(void *user_data, int success) { + grpc_call_element *elem = user_data; + call_data *calld = elem->call_data; + if (success) { + size_t i; + size_t nops = calld->recv_ops->nops; + grpc_stream_op *ops = calld->recv_ops->ops; + for (i = 0; i < nops; i++) { + grpc_stream_op *op = &ops[i]; + if (op->type != GRPC_OP_METADATA) continue; + calld->got_initial_metadata = 1; + grpc_metadata_batch_filter(&op->data.metadata, client_filter, elem); + } + } + calld->on_done_recv(calld->recv_user_data, success); +} + +static void hc_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; + size_t i; GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - switch (op->type) { - case GRPC_SEND_METADATA: + if (op->send_ops && !calld->sent_initial_metadata) { + size_t nops = op->send_ops->nops; + grpc_stream_op *ops = op->send_ops->ops; + for (i = 0; i < nops; i++) { + grpc_stream_op *op = &ops[i]; + if (op->type != GRPC_OP_METADATA) continue; + calld->sent_initial_metadata = 1; /* Send : prefixed headers, which have to be before any application - * layer headers. */ + layer headers. */ grpc_metadata_batch_add_head(&op->data.metadata, &calld->method, grpc_mdelem_ref(channeld->method)); grpc_metadata_batch_add_head(&op->data.metadata, &calld->scheme, @@ -88,17 +111,20 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, grpc_mdelem_ref(channeld->te_trailers)); grpc_metadata_batch_add_tail(&op->data.metadata, &calld->content_type, grpc_mdelem_ref(channeld->content_type)); - grpc_call_next_op(elem, op); - break; - case GRPC_RECV_METADATA: - grpc_metadata_batch_filter(&op->data.metadata, client_filter, elem); - grpc_call_next_op(elem, op); - break; - default: - /* pass control up or down the stack depending on op->dir */ - grpc_call_next_op(elem, op); break; + } } + + if (op->recv_ops && !calld->got_initial_metadata) { + /* substitute our callback for the higher callback */ + calld->recv_ops = op->recv_ops; + calld->on_done_recv = op->on_done_recv; + calld->recv_user_data = op->recv_user_data; + op->on_done_recv = hc_on_recv; + op->recv_user_data = elem; + } + + grpc_call_next_op(elem, op); } /* Called on special channel events, such as disconnection or new incoming @@ -120,7 +146,11 @@ static void channel_op(grpc_channel_element *elem, /* Constructor for call_data */ static void init_call_elem(grpc_call_element *elem, - const void *server_transport_data) {} + const void *server_transport_data) { + call_data *calld = elem->call_data; + calld->sent_initial_metadata = 0; + calld->got_initial_metadata = 0; +} /* Destructor for call_data */ static void destroy_call_elem(grpc_call_element *elem) { @@ -181,6 +211,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } const grpc_channel_filter grpc_http_client_filter = { - call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, + hc_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, sizeof(channel_data), init_channel_elem, destroy_channel_elem, "http-client"}; diff --git a/src/core/channel/http_filter.c b/src/core/channel/http_filter.c deleted file mode 100644 index 453a0422d8..0000000000 --- a/src/core/channel/http_filter.c +++ /dev/null @@ -1,137 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include "src/core/channel/http_filter.h" -#include <grpc/support/log.h> - -typedef struct call_data { - int unused; /* C89 requires at least one struct element */ -} call_data; - -typedef struct channel_data { - int unused; /* C89 requires at least one struct element */ -} channel_data; - -/* used to silence 'variable not used' warnings */ -static void ignore_unused(void *ignored) {} - -/* Called either: - - in response to an API call (or similar) from above, to send something - - a network event (or similar) from below, to receive something - op contains type and call direction information, in addition to the data - that is being sent or received. */ -static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, - grpc_call_op *op) { - /* grab pointers to our data from the call element */ - call_data *calld = elem->call_data; - channel_data *channeld = elem->channel_data; - GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - - ignore_unused(calld); - ignore_unused(channeld); - - switch (op->type) { - default: - /* pass control up or down the stack depending on op->dir */ - grpc_call_next_op(elem, op); - break; - } -} - -/* Called on special channel events, such as disconnection or new incoming - calls on the server */ -static void channel_op(grpc_channel_element *elem, - grpc_channel_element *from_elem, grpc_channel_op *op) { - /* grab pointers to our data from the channel element */ - channel_data *channeld = elem->channel_data; - - ignore_unused(channeld); - - switch (op->type) { - default: - /* pass control up or down the stack depending on op->dir */ - grpc_channel_next_op(elem, op); - break; - } -} - -/* Constructor for call_data */ -static void init_call_elem(grpc_call_element *elem, - const void *server_transport_data) { - /* grab pointers to our data from the call element */ - call_data *calld = elem->call_data; - channel_data *channeld = elem->channel_data; - - /* initialize members */ - calld->unused = channeld->unused; -} - -/* Destructor for call_data */ -static void destroy_call_elem(grpc_call_element *elem) { - /* grab pointers to our data from the call element */ - call_data *calld = elem->call_data; - channel_data *channeld = elem->channel_data; - - ignore_unused(calld); - ignore_unused(channeld); -} - -/* Constructor for channel_data */ -static void init_channel_elem(grpc_channel_element *elem, - const grpc_channel_args *args, grpc_mdctx *mdctx, - int is_first, int is_last) { - /* grab pointers to our data from the channel element */ - channel_data *channeld = elem->channel_data; - - /* The first and the last filters tend to be implemented differently to - handle the case that there's no 'next' filter to call on the up or down - path */ - GPR_ASSERT(!is_first); - GPR_ASSERT(!is_last); - - /* initialize members */ - channeld->unused = 0; -} - -/* Destructor for channel data */ -static void destroy_channel_elem(grpc_channel_element *elem) { - /* grab pointers to our data from the channel element */ - channel_data *channeld = elem->channel_data; - - ignore_unused(channeld); -} - -const grpc_channel_filter grpc_http_filter = { - call_op, channel_op, sizeof(call_data), - init_call_elem, destroy_call_elem, sizeof(channel_data), - init_channel_elem, destroy_channel_elem, "http"}; diff --git a/src/core/channel/http_filter.h b/src/core/channel/http_filter.h deleted file mode 100644 index 1b116ad61f..0000000000 --- a/src/core/channel/http_filter.h +++ /dev/null @@ -1,43 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPC_INTERNAL_CORE_CHANNEL_HTTP_FILTER_H -#define GRPC_INTERNAL_CORE_CHANNEL_HTTP_FILTER_H - -#include "src/core/channel/channel_stack.h" - -/* Processes metadata that is common to both client and server for HTTP2 - transports. */ -extern const grpc_channel_filter grpc_http_filter; - -#endif /* GRPC_INTERNAL_CORE_CHANNEL_HTTP_FILTER_H */ diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c index 0bfe2f2e30..31ed8a3393 100644 --- a/src/core/channel/http_server_filter.c +++ b/src/core/channel/http_server_filter.c @@ -52,6 +52,10 @@ typedef struct call_data { gpr_uint8 seen_scheme; gpr_uint8 seen_te_trailers; grpc_linked_mdelem status; + + grpc_stream_op_buffer *recv_ops; + void (*on_done_recv)(void *user_data, int success); + void *recv_user_data; } call_data; typedef struct channel_data { @@ -143,66 +147,76 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { } } -/* Called either: - - in response to an API call (or similar) from above, to send something - - a network event (or similar) from below, to receive something - op contains type and call direction information, in addition to the data - that is being sent or received. */ -static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, - grpc_call_op *op) { +static void hs_on_recv(void *user_data, int success) { + grpc_call_element *elem = user_data; + call_data *calld = elem->call_data; + if (success) { + size_t i; + size_t nops = calld->recv_ops->nops; + grpc_stream_op *ops = calld->recv_ops->ops; + for (i = 0; i < nops; i++) { + grpc_stream_op *op = &ops[i]; + if (op->type != GRPC_OP_METADATA) continue; + calld->got_initial_metadata = 1; + grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem); + /* Have we seen the required http2 transport headers? + (:method, :scheme, content-type, with :path and :authority covered + at the channel level right now) */ + if (calld->seen_post && calld->seen_scheme && calld->seen_te_trailers && + calld->seen_path) { + /* do nothing */ + } else { + if (!calld->seen_path) { + gpr_log(GPR_ERROR, "Missing :path header"); + } + if (!calld->seen_post) { + gpr_log(GPR_ERROR, "Missing :method header"); + } + if (!calld->seen_scheme) { + gpr_log(GPR_ERROR, "Missing :scheme header"); + } + if (!calld->seen_te_trailers) { + gpr_log(GPR_ERROR, "Missing te trailers header"); + } + /* Error this call out */ + success = 0; + grpc_call_element_send_cancel(elem); + } + } + } + calld->on_done_recv(calld->recv_user_data, success); +} + +static void hs_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; + size_t i; GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - switch (op->type) { - case GRPC_RECV_METADATA: - grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem); - if (!calld->got_initial_metadata) { - calld->got_initial_metadata = 1; - /* Have we seen the required http2 transport headers? - (:method, :scheme, content-type, with :path and :authority covered - at the channel level right now) */ - if (calld->seen_post && calld->seen_scheme && calld->seen_te_trailers && - calld->seen_path) { - grpc_call_next_op(elem, op); - } else { - if (!calld->seen_path) { - gpr_log(GPR_ERROR, "Missing :path header"); - } - if (!calld->seen_post) { - gpr_log(GPR_ERROR, "Missing :method header"); - } - if (!calld->seen_scheme) { - gpr_log(GPR_ERROR, "Missing :scheme header"); - } - if (!calld->seen_te_trailers) { - gpr_log(GPR_ERROR, "Missing te trailers header"); - } - /* Error this call out */ - grpc_metadata_batch_destroy(&op->data.metadata); - op->done_cb(op->user_data, GRPC_OP_OK); - grpc_call_element_send_cancel(elem); - } - } else { - grpc_call_next_op(elem, op); - } - break; - case GRPC_SEND_METADATA: - /* If we haven't sent status 200 yet, we need to so so because it needs to - come before any non : prefixed metadata. */ - if (!calld->sent_status) { - calld->sent_status = 1; - grpc_metadata_batch_add_head(&op->data.metadata, &calld->status, - grpc_mdelem_ref(channeld->status_ok)); - } - grpc_call_next_op(elem, op); - break; - default: - /* pass control up or down the stack depending on op->dir */ - grpc_call_next_op(elem, op); + if (op->send_ops && !calld->sent_status) { + size_t nops = op->send_ops->nops; + grpc_stream_op *ops = op->send_ops->ops; + for (i = 0; i < nops; i++) { + grpc_stream_op *op = &ops[i]; + if (op->type != GRPC_OP_METADATA) continue; + calld->sent_status = 1; + grpc_metadata_batch_add_head(&op->data.metadata, &calld->status, + grpc_mdelem_ref(channeld->status_ok)); break; + } } + + if (op->recv_ops && !calld->got_initial_metadata) { + /* substitute our callback for the higher callback */ + calld->recv_ops = op->recv_ops; + calld->on_done_recv = op->on_done_recv; + calld->recv_user_data = op->recv_user_data; + op->on_done_recv = hs_on_recv; + op->recv_user_data = elem; + } + + grpc_call_next_op(elem, op); } /* Called on special channel events, such as disconnection or new incoming @@ -324,6 +338,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } const grpc_channel_filter grpc_http_server_filter = { - call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, + hs_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, sizeof(channel_data), init_channel_elem, destroy_channel_elem, "http-server"}; diff --git a/src/core/channel/noop_filter.c b/src/core/channel/noop_filter.c index d987fa2bc1..403b60901b 100644 --- a/src/core/channel/noop_filter.c +++ b/src/core/channel/noop_filter.c @@ -50,8 +50,7 @@ static void ignore_unused(void *ignored) {} - a network event (or similar) from below, to receive something op contains type and call direction information, in addition to the data that is being sent or received. */ -static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, - grpc_call_op *op) { +static void noop_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; @@ -59,12 +58,8 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, ignore_unused(calld); ignore_unused(channeld); - switch (op->type) { - default: - /* pass control up or down the stack depending on op->dir */ - grpc_call_next_op(elem, op); - break; - } + /* pass control down the stack */ + grpc_call_next_op(elem, op); } /* Called on special channel events, such as disconnection or new incoming @@ -131,6 +126,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } const grpc_channel_filter grpc_no_op_filter = { - call_op, channel_op, sizeof(call_data), + noop_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, sizeof(channel_data), init_channel_elem, destroy_channel_elem, "no-op"}; diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 2949805622..18be81308d 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -81,9 +81,9 @@ typedef struct { grpc_ioreq_completion_func on_complete; void *user_data; /* a bit mask of which request ops are needed (1u << opid) */ - gpr_uint32 need_mask; + gpr_uint16 need_mask; /* a bit mask of which request ops are now completed */ - gpr_uint32 complete_mask; + gpr_uint16 complete_mask; } reqinfo_master; /* Status data for a request can come from several sources; this @@ -144,12 +144,17 @@ struct grpc_call { gpr_uint8 have_alarm; /* are we currently performing a send operation */ gpr_uint8 sending; + /* are we currently performing a recv operation */ + gpr_uint8 receiving; /* are we currently completing requests */ gpr_uint8 completing; /* pairs with completed_requests */ gpr_uint8 num_completed_requests; - /* flag that we need to request more data */ - gpr_uint8 need_more_data; + /* are we currently reading a message? */ + gpr_uint8 reading_message; + /* flags with bits corresponding to write states allowing us to determine + what was sent */ + gpr_uint16 last_send_contains; /* Active ioreqs. request_set and request_data contain one element per active ioreq @@ -214,6 +219,13 @@ struct grpc_call { size_t send_initial_metadata_count; gpr_timespec send_deadline; + grpc_stream_op_buffer send_ops; + grpc_stream_op_buffer recv_ops; + grpc_stream_state recv_state; + + gpr_slice_buffer incoming_message; + gpr_uint32 incoming_message_length; + /* Data that the legacy api needs to track. To be deleted at some point soon */ legacy_state *legacy_state; @@ -234,9 +246,13 @@ struct grpc_call { } while (0) static void do_nothing(void *ignored, grpc_op_error also_ignored) {} -static send_action choose_send_action(grpc_call *call); -static void enact_send_action(grpc_call *call, send_action sa); static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline); +static void call_on_done_recv(void *call, int success); +static void call_on_done_send(void *call, int success); +static int fill_send_ops(grpc_call *call, grpc_transport_op *op); +static void execute_op(grpc_call *call, grpc_transport_op *op); +static void recv_metadata(grpc_call *call, grpc_metadata_batch *metadata); +static void finish_read_ops(grpc_call *call); grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, const void *server_transport_data, @@ -267,6 +283,8 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, call->send_deadline = send_deadline; grpc_channel_internal_ref(channel); call->metadata_context = grpc_channel_get_metadata_context(channel); + grpc_sopb_init(&call->send_ops); + grpc_sopb_init(&call->recv_ops); /* one ref is dropped in response to destroy, the other in stream_closed */ gpr_ref_init(&call->internal_refcount, 2); @@ -314,6 +332,8 @@ static void destroy_call(void *call, int ignored_success) { destroy_legacy_state(c->legacy_state); } grpc_bbq_destroy(&c->incoming_queue); + grpc_sopb_destroy(&c->send_ops); + grpc_sopb_destroy(&c->recv_ops); gpr_free(c); } @@ -359,20 +379,6 @@ static grpc_call_error bind_cq(grpc_call *call, grpc_completion_queue *cq) { return GRPC_CALL_OK; } -static void request_more_data(grpc_call *call) { - grpc_call_op op; - - /* call down */ - op.type = GRPC_REQUEST_DATA; - op.dir = GRPC_CALL_DOWN; - op.flags = 0; - op.done_cb = do_nothing; - op.user_data = NULL; - op.bind_pollset = NULL; - - grpc_call_execute_op(call, &op); -} - static int is_op_live(grpc_call *call, grpc_ioreq_op op) { gpr_uint8 set = call->request_set[op]; reqinfo_master *master; @@ -383,17 +389,42 @@ static int is_op_live(grpc_call *call, grpc_ioreq_op op) { static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); } +static int need_more_data(grpc_call *call) { + return is_op_live(call, GRPC_IOREQ_RECV_INITIAL_METADATA) || + is_op_live(call, GRPC_IOREQ_RECV_MESSAGE) || + is_op_live(call, GRPC_IOREQ_RECV_TRAILING_METADATA) || + is_op_live(call, GRPC_IOREQ_RECV_STATUS) || + is_op_live(call, GRPC_IOREQ_RECV_STATUS_DETAILS) || + is_op_live(call, GRPC_IOREQ_RECV_CLOSE); +} + static void unlock(grpc_call *call) { - send_action sa = SEND_NOTHING; + grpc_transport_op op; completed_request completed_requests[GRPC_IOREQ_OP_COUNT]; int completing_requests = 0; - int need_more_data = - call->need_more_data && - (call->write_state >= WRITE_STATE_STARTED || !call->is_client); + int start_op = 0; int i; - if (need_more_data) { - call->need_more_data = 0; + memset(&op, 0, sizeof(op)); + + if (!call->receiving && + (call->write_state >= WRITE_STATE_STARTED || !call->is_client) && + need_more_data(call)) { + op.recv_ops = &call->recv_ops; + op.recv_state = &call->recv_state; + op.on_done_recv = call_on_done_recv; + op.recv_user_data = call; + call->receiving = 1; + grpc_call_internal_ref(call); + start_op = 1; + } + + if (!call->sending) { + if (fill_send_ops(call, &op)) { + call->sending = 1; + grpc_call_internal_ref(call); + start_op = 1; + } } if (!call->completing && call->num_completed_requests != 0) { @@ -405,22 +436,10 @@ static void unlock(grpc_call *call) { grpc_call_internal_ref(call); } - if (!call->sending) { - sa = choose_send_action(call); - if (sa != SEND_NOTHING) { - call->sending = 1; - grpc_call_internal_ref(call); - } - } - gpr_mu_unlock(&call->mu); - if (need_more_data) { - request_more_data(call); - } - - if (sa != SEND_NOTHING) { - enact_send_action(call, sa); + if (start_op) { + execute_op(call, &op); } if (completing_requests > 0) { @@ -554,64 +573,137 @@ static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op, } } -static void finish_send_op(grpc_call *call, grpc_ioreq_op op, write_state ws, - grpc_op_error error) { +static void call_on_done_send(void *pc, int success) { + grpc_call *call = pc; + grpc_op_error error = success ? GRPC_OP_OK : GRPC_OP_ERROR; lock(call); - finish_ioreq_op(call, op, error); + if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_INITIAL_METADATA)) { + finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, error); + } + if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_MESSAGE)) { + finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, error); + } + if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_CLOSE)) { + finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, error); + } call->sending = 0; - call->write_state = ws; unlock(call); grpc_call_internal_unref(call, 0); } -static void finish_write_step(void *pc, grpc_op_error error) { - finish_send_op(pc, GRPC_IOREQ_SEND_MESSAGE, WRITE_STATE_STARTED, error); +static void finish_message(grpc_call *call) { + /* TODO(ctiller): this could be a lot faster if coded directly */ + grpc_byte_buffer *byte_buffer = grpc_byte_buffer_create( + call->incoming_message.slices, call->incoming_message.count); + gpr_slice_buffer_reset_and_unref(&call->incoming_message); + + grpc_bbq_push(&call->incoming_queue, byte_buffer); + + GPR_ASSERT(call->incoming_message.count == 0); + call->reading_message = 0; } -static void finish_finish_step(void *pc, grpc_op_error error) { - finish_send_op(pc, GRPC_IOREQ_SEND_CLOSE, WRITE_STATE_WRITE_CLOSED, error); +static int begin_message(grpc_call *call, grpc_begin_message msg) { + /* can't begin a message when we're still reading a message */ + if (call->reading_message) { + char *message = NULL; + gpr_asprintf( + &message, "Message terminated early; read %d bytes, expected %d", + (int)call->incoming_message.length, (int)call->incoming_message_length); + grpc_call_cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message); + gpr_free(message); + return 0; + } + /* stash away parameters, and prepare for incoming slices */ + if (msg.length > grpc_channel_get_max_message_length(call->channel)) { + char *message = NULL; + gpr_asprintf( + &message, + "Maximum message length of %d exceeded by a message of length %d", + grpc_channel_get_max_message_length(call->channel), msg.length); + grpc_call_cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message); + gpr_free(message); + return 0; + } else if (msg.length > 0) { + call->reading_message = 1; + call->incoming_message_length = msg.length; + return 1; + } else { + finish_message(call); + return 1; + } } -static void finish_start_step(void *pc, grpc_op_error error) { - finish_send_op(pc, GRPC_IOREQ_SEND_INITIAL_METADATA, WRITE_STATE_STARTED, - error); +static int add_slice_to_message(grpc_call *call, gpr_slice slice) { + if (GPR_SLICE_LENGTH(slice) == 0) { + gpr_slice_unref(slice); + return 1; + } + /* we have to be reading a message to know what to do here */ + if (!call->reading_message) { + grpc_call_cancel_with_status( + call, GRPC_STATUS_INVALID_ARGUMENT, + "Received payload data while not reading a message"); + return 0; + } + /* append the slice to the incoming buffer */ + gpr_slice_buffer_add(&call->incoming_message, slice); + if (call->incoming_message.length > call->incoming_message_length) { + /* if we got too many bytes, complain */ + char *message = NULL; + gpr_asprintf( + &message, "Receiving message overflow; read %d bytes, expected %d", + (int)call->incoming_message.length, (int)call->incoming_message_length); + grpc_call_cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message); + gpr_free(message); + return 0; + } else if (call->incoming_message.length == call->incoming_message_length) { + finish_message(call); + return 1; + } else { + return 1; + } } -static send_action choose_send_action(grpc_call *call) { - switch (call->write_state) { - case WRITE_STATE_INITIAL: - if (is_op_live(call, GRPC_IOREQ_SEND_INITIAL_METADATA)) { - if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE) || - is_op_live(call, GRPC_IOREQ_SEND_CLOSE)) { - return SEND_BUFFERED_INITIAL_METADATA; - } else { - return SEND_INITIAL_METADATA; - } - } - return SEND_NOTHING; - case WRITE_STATE_STARTED: - if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE)) { - if (is_op_live(call, GRPC_IOREQ_SEND_CLOSE)) { - return SEND_BUFFERED_MESSAGE; - } else { - return SEND_MESSAGE; - } - } else if (is_op_live(call, GRPC_IOREQ_SEND_CLOSE)) { - finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, GRPC_OP_OK); - finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, GRPC_OP_OK); - if (call->is_client) { - return SEND_FINISH; - } else { - return SEND_TRAILING_METADATA_AND_FINISH; - } - } - return SEND_NOTHING; - case WRITE_STATE_WRITE_CLOSED: - return SEND_NOTHING; +static void call_on_done_recv(void *pc, int success) { + grpc_call *call = pc; + size_t i; + int unref = 0; + lock(call); + for (i = 0; success && i < call->recv_ops.nops; i++) { + grpc_stream_op *op = &call->recv_ops.ops[i]; + switch (op->type) { + case GRPC_NO_OP: + break; + case GRPC_OP_METADATA: + recv_metadata(call, &op->data.metadata); + break; + case GRPC_OP_BEGIN_MESSAGE: + success = begin_message(call, op->data.begin_message); + break; + case GRPC_OP_SLICE: + success = add_slice_to_message(call, op->data.slice); + break; + } + } + if (call->recv_state == GRPC_STREAM_RECV_CLOSED) { + GPR_ASSERT(call->read_state <= READ_STATE_READ_CLOSED); + call->read_state = READ_STATE_READ_CLOSED; + } + if (call->recv_state == GRPC_STREAM_CLOSED) { + GPR_ASSERT(call->read_state <= READ_STATE_STREAM_CLOSED); + call->read_state = READ_STATE_STREAM_CLOSED; + unref = 1; + } + if (!success) { + abort(); + } + finish_read_ops(call); + unlock(call); + + if (unref) { + grpc_call_internal_unref(call, 0); } - gpr_log(GPR_ERROR, "should never reach here"); - abort(); - return SEND_NOTHING; } static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count, @@ -639,97 +731,100 @@ static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count, return out; } -static void enact_send_action(grpc_call *call, send_action sa) { +/* Copy the contents of a byte buffer into stream ops */ +static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer, + grpc_stream_op_buffer *sopb) { + size_t i; + + switch (byte_buffer->type) { + case GRPC_BB_SLICE_BUFFER: + for (i = 0; i < byte_buffer->data.slice_buffer.count; i++) { + gpr_slice slice = byte_buffer->data.slice_buffer.slices[i]; + gpr_slice_ref(slice); + grpc_sopb_add_slice(sopb, slice); + } + break; + } +} + +static int fill_send_ops(grpc_call *call, grpc_transport_op *op) { grpc_ioreq_data data; - grpc_call_op op; + grpc_metadata_batch mdb; size_t i; - gpr_uint32 flags = 0; char status_str[GPR_LTOA_MIN_BUFSIZE]; + GPR_ASSERT(op->send_ops == NULL); - switch (sa) { - case SEND_NOTHING: - abort(); - break; - case SEND_BUFFERED_INITIAL_METADATA: - flags |= GRPC_WRITE_BUFFER_HINT; - /* fallthrough */ - case SEND_INITIAL_METADATA: + switch (call->write_state) { + case WRITE_STATE_INITIAL: + if (!is_op_live(call, GRPC_IOREQ_SEND_INITIAL_METADATA)) { + break; + } data = call->request_data[GRPC_IOREQ_SEND_INITIAL_METADATA]; - op.type = GRPC_SEND_METADATA; - op.dir = GRPC_CALL_DOWN; - op.flags = flags; - op.data.metadata.list = chain_metadata_from_app( - call, data.send_metadata.count, data.send_metadata.metadata); - op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL; - op.data.metadata.deadline = call->send_deadline; + mdb.list = chain_metadata_from_app(call, data.send_metadata.count, + data.send_metadata.metadata); + mdb.garbage.head = mdb.garbage.tail = NULL; + mdb.deadline = call->send_deadline; for (i = 0; i < call->send_initial_metadata_count; i++) { - grpc_metadata_batch_link_head(&op.data.metadata, - &call->send_initial_metadata[i]); + grpc_metadata_batch_link_head(&mdb, &call->send_initial_metadata[i]); } - call->send_initial_metadata_count = 0; - op.done_cb = finish_start_step; - op.user_data = call; - op.bind_pollset = grpc_cq_pollset(call->cq); - grpc_call_execute_op(call, &op); - break; - case SEND_BUFFERED_MESSAGE: - flags |= GRPC_WRITE_BUFFER_HINT; - /* fallthrough */ - case SEND_MESSAGE: - data = call->request_data[GRPC_IOREQ_SEND_MESSAGE]; - op.type = GRPC_SEND_MESSAGE; - op.dir = GRPC_CALL_DOWN; - op.flags = flags; - op.data.message = data.send_message; - op.done_cb = finish_write_step; - op.user_data = call; - op.bind_pollset = NULL; - grpc_call_execute_op(call, &op); - break; - case SEND_TRAILING_METADATA_AND_FINISH: - /* send trailing metadata */ - data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA]; - op.type = GRPC_SEND_METADATA; - op.dir = GRPC_CALL_DOWN; - op.flags = flags; - op.data.metadata.list = chain_metadata_from_app( - call, data.send_metadata.count, data.send_metadata.metadata); - op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL; - op.data.metadata.deadline = call->send_deadline; - op.bind_pollset = NULL; - /* send status */ - /* TODO(ctiller): cache common status values */ - data = call->request_data[GRPC_IOREQ_SEND_STATUS]; - gpr_ltoa(data.send_status.code, status_str); - grpc_metadata_batch_add_tail( - &op.data.metadata, &call->status_link, - grpc_mdelem_from_metadata_strings( - call->metadata_context, - grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)), - grpc_mdstr_from_string(call->metadata_context, status_str))); - if (data.send_status.details) { - grpc_metadata_batch_add_tail( - &op.data.metadata, &call->details_link, - grpc_mdelem_from_metadata_strings( - call->metadata_context, - grpc_mdstr_ref(grpc_channel_get_message_string(call->channel)), - grpc_mdstr_from_string(call->metadata_context, - data.send_status.details))); + grpc_sopb_add_metadata(&call->send_ops, mdb); + op->send_ops = &call->send_ops; + op->bind_pollset = grpc_cq_pollset(call->cq); + call->last_send_contains |= 1 << GRPC_IOREQ_SEND_INITIAL_METADATA; + call->write_state = WRITE_STATE_STARTED; + /* fall through intended */ + case WRITE_STATE_STARTED: + if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE)) { + data = call->request_data[GRPC_IOREQ_SEND_MESSAGE]; + grpc_sopb_add_begin_message( + &call->send_ops, grpc_byte_buffer_length(data.send_message), 0); + copy_byte_buffer_to_stream_ops(data.send_message, &call->send_ops); + op->send_ops = &call->send_ops; + call->last_send_contains |= 1 << GRPC_IOREQ_SEND_MESSAGE; + } + if (is_op_live(call, GRPC_IOREQ_SEND_CLOSE)) { + op->is_last_send = 1; + op->send_ops = &call->send_ops; + call->last_send_contains |= 1 << GRPC_IOREQ_SEND_CLOSE; + call->write_state = WRITE_STATE_WRITE_CLOSED; + if (!call->is_client) { + /* send trailing metadata */ + data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA]; + mdb.list = chain_metadata_from_app(call, data.send_metadata.count, + data.send_metadata.metadata); + mdb.garbage.head = mdb.garbage.tail = NULL; + mdb.deadline = call->send_deadline; + /* send status */ + /* TODO(ctiller): cache common status values */ + data = call->request_data[GRPC_IOREQ_SEND_STATUS]; + gpr_ltoa(data.send_status.code, status_str); + grpc_metadata_batch_add_tail( + &mdb, &call->status_link, + grpc_mdelem_from_metadata_strings( + call->metadata_context, + grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)), + grpc_mdstr_from_string(call->metadata_context, status_str))); + if (data.send_status.details) { + grpc_metadata_batch_add_tail( + &mdb, &call->details_link, + grpc_mdelem_from_metadata_strings( + call->metadata_context, + grpc_mdstr_ref( + grpc_channel_get_message_string(call->channel)), + grpc_mdstr_from_string(call->metadata_context, + data.send_status.details))); + } + } } - op.done_cb = do_nothing; - op.user_data = NULL; - grpc_call_execute_op(call, &op); - /* fallthrough: see choose_send_action for details */ - case SEND_FINISH: - op.type = GRPC_SEND_FINISH; - op.dir = GRPC_CALL_DOWN; - op.flags = 0; - op.done_cb = finish_finish_step; - op.user_data = call; - op.bind_pollset = NULL; - grpc_call_execute_op(call, &op); break; + case WRITE_STATE_WRITE_CLOSED: + break; + } + if (op->send_ops) { + op->on_done_send = call_on_done_send; + op->send_user_data = call; } + return op->send_ops != NULL; } static grpc_call_error start_ioreq_error(grpc_call *call, @@ -838,10 +933,6 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, master->on_complete = completion; master->user_data = user_data; - if (have_ops & (1u << GRPC_IOREQ_RECV_MESSAGE)) { - call->need_more_data = 1; - } - finish_read_ops(call); early_out_write_ops(call); @@ -871,19 +962,12 @@ void grpc_call_destroy(grpc_call *c) { grpc_call_internal_unref(c, 1); } -grpc_call_error grpc_call_cancel(grpc_call *c) { - grpc_call_element *elem; - grpc_call_op op; - - op.type = GRPC_CANCEL_OP; - op.dir = GRPC_CALL_DOWN; - op.flags = 0; - op.done_cb = do_nothing; - op.user_data = NULL; - op.bind_pollset = NULL; +grpc_call_error grpc_call_cancel(grpc_call *call) { + grpc_transport_op op; + memset(&op, 0, sizeof(op)); + op.cancel_with_status = GRPC_STATUS_CANCELLED; - elem = CALL_ELEM_FROM_CALL(c, 0); - elem->filter->call_op(elem, NULL, &op); + execute_op(call, &op); return GRPC_CALL_OK; } @@ -901,11 +985,10 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c, return grpc_call_cancel(c); } -void grpc_call_execute_op(grpc_call *call, grpc_call_op *op) { +static void execute_op(grpc_call *call, grpc_transport_op *op) { grpc_call_element *elem; - GPR_ASSERT(op->dir == GRPC_CALL_DOWN); elem = CALL_ELEM_FROM_CALL(call, 0); - elem->filter->call_op(elem, NULL, op); + elem->filter->start_transport_op(elem, op); } grpc_call *grpc_call_from_top_element(grpc_call_element *elem) { @@ -934,28 +1017,6 @@ static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) { grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now()); } -static void set_read_state_locked(grpc_call *call, read_state state) { - GPR_ASSERT(call->read_state < state); - call->read_state = state; - finish_read_ops(call); -} - -static void set_read_state(grpc_call *call, read_state state) { - lock(call); - set_read_state_locked(call, state); - unlock(call); -} - -void grpc_call_read_closed(grpc_call_element *elem) { - set_read_state(CALL_FROM_TOP_ELEM(elem), READ_STATE_READ_CLOSED); -} - -void grpc_call_stream_closed(grpc_call_element *elem) { - grpc_call *call = CALL_FROM_TOP_ELEM(elem); - set_read_state(call, READ_STATE_STREAM_CLOSED); - grpc_call_internal_unref(call, 0); -} - /* we offset status by a small amount when storing it into transport metadata as metadata cannot store a 0 value (which is used as OK for grpc_status_codes */ @@ -979,35 +1040,13 @@ static gpr_uint32 decode_status(grpc_mdelem *md) { return status; } -void grpc_call_recv_message(grpc_call_element *elem, - grpc_byte_buffer *byte_buffer) { - grpc_call *call = CALL_FROM_TOP_ELEM(elem); - lock(call); - grpc_bbq_push(&call->incoming_queue, byte_buffer); - finish_read_ops(call); - unlock(call); -} - -void grpc_call_recv_synthetic_status(grpc_call_element *elem, - grpc_status_code status, - const char *message) { - grpc_call *call = CALL_FROM_TOP_ELEM(elem); - lock(call); - set_status_code(call, STATUS_FROM_CORE, status); - set_status_details(call, STATUS_FROM_CORE, - grpc_mdstr_from_string(call->metadata_context, message)); - unlock(call); -} - -int grpc_call_recv_metadata(grpc_call_element *elem, grpc_metadata_batch *md) { - grpc_call *call = CALL_FROM_TOP_ELEM(elem); +static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) { grpc_linked_mdelem *l; grpc_metadata_array *dest; grpc_metadata *mdusr; int is_trailing; grpc_mdctx *mdctx = call->metadata_context; - lock(call); is_trailing = call->read_state >= READ_STATE_GOT_INITIAL_METADATA; for (l = md->list.head; l != NULL; l = l->next) { grpc_mdelem *md = l->md; @@ -1043,9 +1082,8 @@ int grpc_call_recv_metadata(grpc_call_element *elem, grpc_metadata_batch *md) { set_deadline_alarm(call, md->deadline); } if (!is_trailing) { - set_read_state_locked(call, READ_STATE_GOT_INITIAL_METADATA); + call->read_state = READ_STATE_GOT_INITIAL_METADATA; } - unlock(call); grpc_mdctx_lock(mdctx); for (l = md->list.head; l; l = l->next) { @@ -1055,8 +1093,6 @@ int grpc_call_recv_metadata(grpc_call_element *elem, grpc_metadata_batch *md) { grpc_mdctx_locked_mdelem_unref(mdctx, l->md); } grpc_mdctx_unlock(mdctx); - - return !is_trailing; } grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) { diff --git a/src/core/surface/call.h b/src/core/surface/call.h index f8d0915349..199beb1738 100644 --- a/src/core/surface/call.h +++ b/src/core/surface/call.h @@ -96,27 +96,12 @@ grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call); void grpc_call_internal_ref(grpc_call *call); void grpc_call_internal_unref(grpc_call *call, int allow_immediate_deletion); -/* Helpers for grpc_client, grpc_server filters to publish received data to - the completion queue/surface layer */ -/* receive metadata - returns 1 if this was initial metadata */ -int grpc_call_recv_metadata(grpc_call_element *surface_element, - grpc_metadata_batch *md); -void grpc_call_recv_message(grpc_call_element *surface_element, - grpc_byte_buffer *message); -void grpc_call_read_closed(grpc_call_element *surface_element); -void grpc_call_stream_closed(grpc_call_element *surface_element); - -void grpc_call_execute_op(grpc_call *call, grpc_call_op *op); grpc_call_error grpc_call_start_ioreq_and_call_back( grpc_call *call, const grpc_ioreq *reqs, size_t nreqs, grpc_ioreq_completion_func on_complete, void *user_data); grpc_call_stack *grpc_call_get_call_stack(grpc_call *call); -void grpc_call_recv_synthetic_status(grpc_call_element *elem, - grpc_status_code status, - const char *message); - /* Given the top call_element, get the call object. */ grpc_call *grpc_call_from_top_element(grpc_call_element *surface_element); diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index 29b042e7c1..78f9144c19 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -52,6 +52,7 @@ typedef struct registered_call { struct grpc_channel { int is_client; gpr_refcount refs; + gpr_uint32 max_message_length; grpc_mdctx *metadata_context; grpc_mdstr *grpc_status_string; grpc_mdstr *grpc_message_string; @@ -68,9 +69,13 @@ struct grpc_channel { #define CHANNEL_FROM_TOP_ELEM(top_elem) \ CHANNEL_FROM_CHANNEL_STACK(grpc_channel_stack_from_top_element(top_elem)) +/* the protobuf library will (by default) start warning at 100megs */ +#define DEFAULT_MAX_MESSAGE_LENGTH (100 * 1024 * 1024) + grpc_channel *grpc_channel_create_from_filters( const grpc_channel_filter **filters, size_t num_filters, const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client) { + size_t i; size_t size = sizeof(grpc_channel) + grpc_channel_stack_size(filters, num_filters); grpc_channel *channel = gpr_malloc(size); @@ -88,6 +93,24 @@ grpc_channel *grpc_channel_create_from_filters( CHANNEL_STACK_FROM_CHANNEL(channel)); gpr_mu_init(&channel->registered_call_mu); channel->registered_calls = NULL; + + channel->max_message_length = DEFAULT_MAX_MESSAGE_LENGTH; + if (args) { + for (i = 0; i < args->num_args; i++) { + if (0 == strcmp(args->args[i].key, GRPC_ARG_MAX_MESSAGE_LENGTH)) { + if (args->args[i].type != GRPC_ARG_INTEGER) { + gpr_log(GPR_ERROR, "%s ignored: it must be an integer", + GRPC_ARG_MAX_MESSAGE_LENGTH); + } else if (args->args[i].value.integer < 0) { + gpr_log(GPR_ERROR, "%s ignored: it must be >= 0", + GRPC_ARG_MAX_MESSAGE_LENGTH); + } else { + channel->max_message_length = args->args[i].value.integer; + } + } + } + } + return channel; } @@ -219,3 +242,7 @@ grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel) { grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel) { return channel->grpc_message_string; } + +gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel) { + return channel->max_message_length; +} diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h index d3e51185ee..05d57a905b 100644 --- a/src/core/surface/channel.h +++ b/src/core/surface/channel.h @@ -44,6 +44,7 @@ grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel); grpc_mdctx *grpc_channel_get_metadata_context(grpc_channel *channel); grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel); grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel); +gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel); void grpc_client_channel_closed(grpc_channel_element *elem); diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index 3104b1d00d..5f3e9bec0c 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -44,7 +44,6 @@ #include "src/core/channel/client_setup.h" #include "src/core/channel/connected_channel.h" #include "src/core/channel/http_client_filter.h" -#include "src/core/channel/http_filter.h" #include "src/core/iomgr/endpoint.h" #include "src/core/iomgr/resolve_address.h" #include "src/core/iomgr/tcp_client.h" @@ -176,8 +175,7 @@ static void done_setup(void *sp) { static grpc_transport_setup_result complete_setup(void *channel_stack, grpc_transport *transport, grpc_mdctx *mdctx) { - static grpc_channel_filter const *extra_filters[] = {&grpc_http_client_filter, - &grpc_http_filter}; + static grpc_channel_filter const *extra_filters[] = {&grpc_http_client_filter}; return grpc_client_channel_transport_setup_complete( channel_stack, transport, extra_filters, GPR_ARRAY_SIZE(extra_filters), mdctx); diff --git a/src/core/surface/client.c b/src/core/surface/client.c index 2f898ff7d7..4669c59ee4 100644 --- a/src/core/surface/client.c +++ b/src/core/surface/client.c @@ -43,32 +43,9 @@ typedef struct { void *unused; } call_data; typedef struct { void *unused; } channel_data; -static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, - grpc_call_op *op) { +static void client_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) { GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - - switch (op->type) { - case GRPC_RECV_METADATA: - grpc_call_recv_metadata(elem, &op->data.metadata); - break; - case GRPC_RECV_MESSAGE: - grpc_call_recv_message(elem, op->data.message); - op->done_cb(op->user_data, GRPC_OP_OK); - break; - case GRPC_RECV_HALF_CLOSE: - grpc_call_read_closed(elem); - break; - case GRPC_RECV_FINISH: - grpc_call_stream_closed(elem); - break; - case GRPC_RECV_SYNTHETIC_STATUS: - grpc_call_recv_synthetic_status(elem, op->data.synthetic_status.status, - op->data.synthetic_status.message); - break; - default: - GPR_ASSERT(op->dir == GRPC_CALL_DOWN); - grpc_call_next_op(elem, op); - } + grpc_call_next_op(elem, op); } static void channel_op(grpc_channel_element *elem, @@ -104,6 +81,6 @@ static void init_channel_elem(grpc_channel_element *elem, static void destroy_channel_elem(grpc_channel_element *elem) {} const grpc_channel_filter grpc_client_surface_filter = { - call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, + client_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, sizeof(channel_data), init_channel_elem, destroy_channel_elem, "client", }; diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index 78170806f1..4e9eb808d7 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -46,22 +46,9 @@ typedef struct { void *unused; } call_data; typedef struct { void *unused; } channel_data; -static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, - grpc_call_op *op) { +static void lame_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) { GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - - switch (op->type) { - case GRPC_SEND_METADATA: - grpc_metadata_batch_destroy(&op->data.metadata); - grpc_call_recv_synthetic_status(elem, GRPC_STATUS_UNKNOWN, - "Rpc sent on a lame channel."); - grpc_call_stream_closed(elem); - break; - default: - break; - } - - op->done_cb(op->user_data, GRPC_OP_ERROR); + grpc_transport_op_finish_with_failure(op); } static void channel_op(grpc_channel_element *elem, @@ -93,7 +80,7 @@ static void init_channel_elem(grpc_channel_element *elem, static void destroy_channel_elem(grpc_channel_element *elem) {} static const grpc_channel_filter lame_filter = { - call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, + lame_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, sizeof(channel_data), init_channel_elem, destroy_channel_elem, "lame-client", }; diff --git a/src/core/surface/server.c b/src/core/surface/server.c index e771929870..82d1323a4b 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -173,13 +173,19 @@ struct call_data { grpc_call *call; call_state state; - gpr_timespec deadline; grpc_mdstr *path; grpc_mdstr *host; + gpr_timespec deadline; + int got_initial_metadata; legacy_data *legacy; grpc_completion_queue *cq_new; + grpc_stream_op_buffer *recv_ops; + grpc_stream_state *recv_state; + void (*on_done_recv)(void *user_data, int success); + void *recv_user_data; + call_data **root[CALL_LIST_COUNT]; call_link links[CALL_LIST_COUNT]; }; @@ -371,46 +377,6 @@ static void kill_zombie(void *elem, int success) { grpc_call_destroy(grpc_call_from_top_element(elem)); } -static void stream_closed(grpc_call_element *elem) { - call_data *calld = elem->call_data; - channel_data *chand = elem->channel_data; - gpr_mu_lock(&chand->server->mu); - switch (calld->state) { - case ACTIVATED: - break; - case PENDING: - call_list_remove(calld, PENDING_START); - /* fallthrough intended */ - case NOT_STARTED: - calld->state = ZOMBIED; - grpc_iomgr_add_callback(kill_zombie, elem); - break; - case ZOMBIED: - break; - } - gpr_mu_unlock(&chand->server->mu); - grpc_call_stream_closed(elem); -} - -static void read_closed(grpc_call_element *elem) { - call_data *calld = elem->call_data; - channel_data *chand = elem->channel_data; - gpr_mu_lock(&chand->server->mu); - switch (calld->state) { - case ACTIVATED: - case PENDING: - grpc_call_read_closed(elem); - break; - case NOT_STARTED: - calld->state = ZOMBIED; - grpc_iomgr_add_callback(kill_zombie, elem); - break; - case ZOMBIED: - break; - } - gpr_mu_unlock(&chand->server->mu); -} - static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { grpc_call_element *elem = user_data; channel_data *chand = elem->channel_data; @@ -425,33 +391,69 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { return md; } -static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn, - grpc_call_op *op) { +static void server_on_recv(void *ptr, int success) { + grpc_call_element *elem = ptr; call_data *calld = elem->call_data; - GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - switch (op->type) { - case GRPC_RECV_METADATA: + channel_data *chand = elem->channel_data; + + if (success && !calld->got_initial_metadata) { + size_t i; + size_t nops = calld->recv_ops->nops; + grpc_stream_op *ops = calld->recv_ops->ops; + for (i = 0; i < nops; i++) { + grpc_stream_op *op = &ops[i]; + if (op->type != GRPC_OP_METADATA) continue; grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem); - if (grpc_call_recv_metadata(elem, &op->data.metadata)) { + if (0 != gpr_time_cmp(op->data.metadata.deadline, gpr_inf_future)) { calld->deadline = op->data.metadata.deadline; - start_new_rpc(elem); } + calld->got_initial_metadata = 1; + start_new_rpc(elem); break; - case GRPC_RECV_MESSAGE: - grpc_call_recv_message(elem, op->data.message); - op->done_cb(op->user_data, GRPC_OP_OK); - break; - case GRPC_RECV_HALF_CLOSE: - read_closed(elem); - break; - case GRPC_RECV_FINISH: - stream_closed(elem); + } + } + + switch (*calld->recv_state) { + case GRPC_STREAM_OPEN: break; + case GRPC_STREAM_SEND_CLOSED: break; + case GRPC_STREAM_RECV_CLOSED: + gpr_mu_lock(&chand->server->mu); + if (calld->state == NOT_STARTED) { + calld->state = ZOMBIED; + grpc_iomgr_add_callback(kill_zombie, elem); + } + gpr_mu_unlock(&chand->server->mu); break; - default: - GPR_ASSERT(op->dir == GRPC_CALL_DOWN); - grpc_call_next_op(elem, op); + case GRPC_STREAM_CLOSED: + gpr_mu_lock(&chand->server->mu); + if (calld->state == NOT_STARTED) { + calld->state = ZOMBIED; + grpc_iomgr_add_callback(kill_zombie, elem); + } else if (calld->state == PENDING) { + call_list_remove(calld, PENDING_START); + } + gpr_mu_unlock(&chand->server->mu); break; } + + calld->on_done_recv(calld->recv_user_data, success); +} + +static void server_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) { + call_data *calld = elem->call_data; + GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + + if (op->recv_ops) { + /* substitute our callback for the higher callback */ + calld->recv_ops = op->recv_ops; + calld->recv_state = op->recv_state; + calld->on_done_recv = op->on_done_recv; + calld->recv_user_data = op->recv_user_data; + op->on_done_recv = server_on_recv; + op->recv_user_data = elem; + } + + grpc_call_next_op(elem, op); } static void channel_op(grpc_channel_element *elem, @@ -592,7 +594,7 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } static const grpc_channel_filter server_surface_filter = { - call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, + server_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, sizeof(channel_data), init_channel_elem, destroy_channel_elem, "server", }; diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c index f3b9219f8b..ebde5095a9 100644 --- a/src/core/surface/server_chttp2.c +++ b/src/core/surface/server_chttp2.c @@ -33,7 +33,6 @@ #include <grpc/grpc.h> -#include "src/core/channel/http_filter.h" #include "src/core/channel/http_server_filter.h" #include "src/core/iomgr/resolve_address.h" #include "src/core/iomgr/tcp_server.h" @@ -46,8 +45,7 @@ static grpc_transport_setup_result setup_transport(void *server, grpc_transport *transport, grpc_mdctx *mdctx) { - static grpc_channel_filter const *extra_filters[] = {&grpc_http_server_filter, - &grpc_http_filter}; + static grpc_channel_filter const *extra_filters[] = {&grpc_http_server_filter}; return grpc_server_setup_transport(server, transport, extra_filters, GPR_ARRAY_SIZE(extra_filters), mdctx); } diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c index 5ca31d6bc7..9b62aefd0b 100644 --- a/src/core/transport/chttp2/stream_encoder.c +++ b/src/core/transport/chttp2/stream_encoder.c @@ -481,11 +481,10 @@ gpr_uint32 grpc_chttp2_preencode(grpc_stream_op *inops, size_t *inops_count, break; case GRPC_OP_METADATA: grpc_metadata_batch_assert_ok(&op->data.metadata); - case GRPC_OP_FLOW_CTL_CB: - /* these just get copied as they don't impact the number of flow - controlled bytes */ - grpc_sopb_append(outops, op, 1); - curop++; + /* these just get copied as they don't impact the number of flow + controlled bytes */ + grpc_sopb_append(outops, op, 1); + curop++; break; case GRPC_OP_BEGIN_MESSAGE: /* begin op: for now we just convert the op to a slice and fall @@ -567,10 +566,6 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof, GPR_ERROR, "These stream ops should be filtered out by grpc_chttp2_preencode"); abort(); - case GRPC_OP_FLOW_CTL_CB: - op->data.flow_ctl_cb.cb(op->data.flow_ctl_cb.arg, GRPC_OP_OK); - curop++; - break; case GRPC_OP_METADATA: /* Encode a metadata batch; store the returned values, representing a metadata element that needs to be unreffed back into the metadata diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index e32ee284e0..9c2af560c1 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -91,10 +91,9 @@ typedef enum { /* streams that are waiting to start because there are too many concurrent streams on the connection */ WAITING_FOR_CONCURRENCY, - /* streams that want to callback the application */ - PENDING_CALLBACKS, - /* streams that *ARE* calling back to the application */ - EXECUTING_CALLBACKS, + /* streams that have finished reading: we wait until unlock to coalesce + all changes into one callback */ + FINISHED_READ_OP, STREAM_LIST_COUNT /* must be last */ } stream_list_id; @@ -141,6 +140,12 @@ typedef enum { DTS_FRAME } deframe_transport_state; +typedef enum { + WRITE_STATE_OPEN, + WRITE_STATE_QUEUED_CLOSE, + WRITE_STATE_SENT_CLOSE +} WRITE_STATE; + typedef struct { stream *head; stream *tail; @@ -182,6 +187,18 @@ typedef struct { gpr_slice debug; } pending_goaway; +typedef struct { + void (*cb)(void *user_data, int success); + void *user_data; + int success; +} op_closure; + +typedef struct { + op_closure *callbacks; + size_t count; + size_t capacity; +} op_closure_array; + struct transport { grpc_transport base; /* must be first */ const grpc_transport_callbacks *cb; @@ -202,6 +219,10 @@ struct transport { gpr_uint8 closed; error_state error_state; + /* queued callbacks */ + op_closure_array pending_callbacks; + op_closure_array executing_callbacks; + /* stream indexing */ gpr_uint32 next_stream_id; gpr_uint32 last_incoming_stream_id; @@ -281,14 +302,15 @@ struct stream { /* when the application requests writes be closed, the write_closed is 'queued'; when the close is flow controlled into the send path, we are 'sending' it; when the write has been performed it is 'sent' */ - gpr_uint8 queued_write_closed; - gpr_uint8 sending_write_closed; - gpr_uint8 sent_write_closed; + WRITE_STATE write_state; + gpr_uint8 send_closed; gpr_uint8 read_closed; gpr_uint8 cancelled; - gpr_uint8 allow_window_updates; gpr_uint8 published_close; + op_closure send_done_closure; + op_closure recv_done_closure; + stream_link links[STREAM_LIST_COUNT]; gpr_uint8 included[STREAM_LIST_COUNT]; @@ -299,7 +321,10 @@ struct stream { gpr_timespec incoming_deadline; /* sops from application */ - grpc_stream_op_buffer outgoing_sopb; + grpc_stream_op_buffer *outgoing_sopb; + grpc_stream_op_buffer *incoming_sopb; + grpc_stream_state *publish_state; + grpc_stream_state published_state; /* sops that have passed flow control to be written */ grpc_stream_op_buffer writing_sopb; @@ -348,6 +373,13 @@ static void become_skip_parser(transport *t); static void recv_data(void *tp, gpr_slice *slices, size_t nslices, grpc_endpoint_cb_status error); +static void schedule_cb(transport *t, op_closure closure, int success); +static void maybe_finish_read(transport *t, stream *s); +static void maybe_join_window_updates(transport *t, stream *s); +static void finish_reads(transport *t); +static void add_to_pollset_locked(transport *t, grpc_pollset *pollset); + + /* * CONSTRUCTION/DESTRUCTION/REFCOUNTING */ @@ -416,6 +448,8 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup, GPR_ASSERT(strlen(CLIENT_CONNECT_STRING) == CLIENT_CONNECT_STRLEN); + memset(t, 0, sizeof(*t)); + t->base.vtable = &vtable; t->ep = ep; /* one ref is for destroy, the other for when ep becomes NULL */ @@ -427,27 +461,16 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup, t->str_grpc_timeout = grpc_mdstr_from_string(t->metadata_context, "grpc-timeout"); t->reading = 1; - t->writing = 0; t->error_state = ERROR_STATE_NONE; t->next_stream_id = is_client ? 1 : 2; - t->last_incoming_stream_id = 0; - t->destroying = 0; - t->closed = 0; t->is_client = is_client; t->outgoing_window = DEFAULT_WINDOW; t->incoming_window = DEFAULT_WINDOW; t->connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET; t->deframe_state = is_client ? DTS_FH_0 : DTS_CLIENT_PREFIX_0; - t->expect_continuation_stream_id = 0; - t->pings = NULL; - t->ping_count = 0; - t->ping_capacity = 0; t->ping_counter = gpr_now().tv_nsec; grpc_chttp2_hpack_compressor_init(&t->hpack_compressor, mdctx); grpc_chttp2_goaway_parser_init(&t->goaway_parser); - t->pending_goaways = NULL; - t->num_pending_goaways = 0; - t->cap_pending_goaways = 0; gpr_slice_buffer_init(&t->outbuf); gpr_slice_buffer_init(&t->qbuf); grpc_sopb_init(&t->nuke_later_sopb); @@ -462,7 +485,6 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup, needed. TODO(ctiller): tune this */ grpc_chttp2_stream_map_init(&t->stream_map, 8); - memset(&t->lists, 0, sizeof(t->lists)); /* copy in initial settings to all setting sets */ for (i = 0; i < NUM_SETTING_SETS; i++) { @@ -503,7 +525,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup, gpr_mu_lock(&t->mu); t->calling_back = 1; - ref_transport(t); + ref_transport(t); /* matches unref at end of this function */ gpr_mu_unlock(&t->mu); sr = setup(arg, &t->base, t->metadata_context); @@ -515,7 +537,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup, if (t->destroying) gpr_cv_signal(&t->cv); unlock(t); - ref_transport(t); + ref_transport(t); /* matches unref inside recv_data */ recv_data(t, slices, nslices, GRPC_ENDPOINT_CB_OK); unref_transport(t); @@ -577,6 +599,8 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs, transport *t = (transport *)gt; stream *s = (stream *)gs; + memset(s, 0, sizeof(*s)); + ref_transport(t); if (!server_data) { @@ -592,20 +616,7 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs, t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; s->incoming_window = t->settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - s->queued_write_closed = 0; - s->sending_write_closed = 0; - s->sent_write_closed = 0; - s->read_closed = 0; - s->cancelled = 0; - s->allow_window_updates = 0; - s->published_close = 0; - s->incoming_metadata_count = 0; - s->incoming_metadata_capacity = 0; - s->incoming_metadata = NULL; s->incoming_deadline = gpr_inf_future; - memset(&s->links, 0, sizeof(s->links)); - memset(&s->included, 0, sizeof(s->included)); - grpc_sopb_init(&s->outgoing_sopb); grpc_sopb_init(&s->writing_sopb); grpc_sopb_init(&s->callback_sopb); grpc_chttp2_data_parser_init(&s->parser); @@ -642,7 +653,7 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) { gpr_mu_unlock(&t->mu); - grpc_sopb_destroy(&s->outgoing_sopb); + GPR_ASSERT(s->outgoing_sopb == NULL); grpc_sopb_destroy(&s->writing_sopb); grpc_sopb_destroy(&s->callback_sopb); grpc_chttp2_data_parser_destroy(&s->parser); @@ -708,8 +719,6 @@ static void stream_list_add_tail(transport *t, stream *s, stream_list_id id) { } static void stream_list_join(transport *t, stream *s, stream_list_id id) { - if (id == PENDING_CALLBACKS) - GPR_ASSERT(t->cb != NULL || t->error_state == ERROR_STATE_NONE); if (s->included[id]) { return; } @@ -762,6 +771,8 @@ static void unlock(transport *t) { finalize_cancellations(t); } + finish_reads(t); + /* gather any callbacks that need to be made */ if (!t->calling_back && cb) { perform_callbacks = prepare_callbacks(t); @@ -865,22 +876,23 @@ static int prepare_write(transport *t) { while (t->outgoing_window && (s = stream_list_remove_head(t, WRITABLE)) && s->outgoing_window > 0) { window_delta = grpc_chttp2_preencode( - s->outgoing_sopb.ops, &s->outgoing_sopb.nops, + s->outgoing_sopb->ops, &s->outgoing_sopb->nops, GPR_MIN(t->outgoing_window, s->outgoing_window), &s->writing_sopb); t->outgoing_window -= window_delta; s->outgoing_window -= window_delta; - s->sending_write_closed = - s->queued_write_closed && s->outgoing_sopb.nops == 0; - if (s->writing_sopb.nops > 0 || s->sending_write_closed) { + if (s->write_state == WRITE_STATE_QUEUED_CLOSE && s->outgoing_sopb->nops == 0) { + s->send_closed = 1; + } + if (s->writing_sopb.nops > 0 || s->send_closed) { stream_list_join(t, s, WRITING); } - /* if there are still writes to do and the stream still has window - available, then schedule a further write */ - if (s->outgoing_sopb.nops > 0 && s->outgoing_window > 0) { - GPR_ASSERT(!t->outgoing_window); - stream_list_add_tail(t, s, WRITABLE); + /* we should either exhaust window or have no ops left, but not both */ + GPR_ASSERT(s->outgoing_sopb->nops == 0 || s->outgoing_window <= 0); + if (s->outgoing_sopb->nops == 0) { + s->outgoing_sopb = NULL; + schedule_cb(t, s->send_done_closure, 1); } } @@ -912,10 +924,10 @@ static void finalize_outbuf(transport *t) { while ((s = stream_list_remove_head(t, WRITING))) { grpc_chttp2_encode(s->writing_sopb.ops, s->writing_sopb.nops, - s->sending_write_closed, s->id, &t->hpack_compressor, + s->send_closed, s->id, &t->hpack_compressor, &t->outbuf); s->writing_sopb.nops = 0; - if (s->sending_write_closed) { + if (s->send_closed) { stream_list_join(t, s, WRITTEN_CLOSED); } } @@ -929,8 +941,10 @@ static void finish_write_common(transport *t, int success) { drop_connection(t); } while ((s = stream_list_remove_head(t, WRITTEN_CLOSED))) { - s->sent_write_closed = 1; - if (!s->cancelled) stream_list_join(t, s, PENDING_CALLBACKS); + s->write_state = WRITE_STATE_SENT_CLOSE; + if (!s->cancelled) { + maybe_finish_read(t, s); + } } t->outbuf.count = 0; t->outbuf.length = 0; @@ -988,43 +1002,57 @@ static void maybe_start_some_streams(transport *t) { } } -static void send_batch(grpc_transport *gt, grpc_stream *gs, grpc_stream_op *ops, - size_t ops_count, int is_last) { +static void perform_op(grpc_transport *gt, grpc_stream *gs, grpc_transport_op *op) { transport *t = (transport *)gt; stream *s = (stream *)gs; lock(t); - if (is_last) { - s->queued_write_closed = 1; - } - if (!s->cancelled) { - grpc_sopb_append(&s->outgoing_sopb, ops, ops_count); - if (s->id == 0) { - stream_list_join(t, s, WAITING_FOR_CONCURRENCY); - maybe_start_some_streams(t); + if (op->send_ops) { + GPR_ASSERT(s->outgoing_sopb == NULL); + s->send_done_closure.cb = op->on_done_send; + s->send_done_closure.user_data = op->send_user_data; + if (!s->cancelled) { + s->outgoing_sopb = op->send_ops; + if (op->is_last_send && s->write_state == WRITE_STATE_OPEN) { + s->write_state = WRITE_STATE_QUEUED_CLOSE; + } + if (s->id == 0) { + stream_list_join(t, s, WAITING_FOR_CONCURRENCY); + maybe_start_some_streams(t); + } else if (s->outgoing_window > 0) { + stream_list_join(t, s, WRITABLE); + } } else { - stream_list_join(t, s, WRITABLE); + schedule_nuke_sopb(t, op->send_ops); + schedule_cb(t, s->send_done_closure, 0); } - } else { - grpc_sopb_append(&t->nuke_later_sopb, ops, ops_count); } - if (is_last && s->outgoing_sopb.nops == 0 && s->read_closed && - !s->published_close) { - stream_list_join(t, s, PENDING_CALLBACKS); + + if (op->recv_ops) { + GPR_ASSERT(s->incoming_sopb == NULL); + s->recv_done_closure.cb = op->on_done_recv; + s->recv_done_closure.user_data = op->recv_user_data; + if (!s->cancelled) { + s->incoming_sopb = op->recv_ops; + s->incoming_sopb->nops = 0; + s->publish_state = op->recv_state; + maybe_finish_read(t, s); + maybe_join_window_updates(t, s); + } else { + schedule_cb(t, s->recv_done_closure, 0); + } } - unlock(t); -} + if (op->bind_pollset) { + add_to_pollset_locked(t, op->bind_pollset); + } -static void abort_stream(grpc_transport *gt, grpc_stream *gs, - grpc_status_code status) { - transport *t = (transport *)gt; - stream *s = (stream *)gs; + if (op->cancel_with_status != GRPC_STATUS_OK) { + cancel_stream(t, s, op->cancel_with_status, grpc_chttp2_grpc_status_to_http2_error(op->cancel_with_status), + 1); + } - lock(t); - cancel_stream(t, s, status, grpc_chttp2_grpc_status_to_http2_error(status), - 1); unlock(t); } @@ -1063,8 +1091,8 @@ static void finalize_cancellations(transport *t) { while ((s = stream_list_remove_head(t, CANCELLED))) { s->read_closed = 1; - s->sent_write_closed = 1; - stream_list_join(t, s, PENDING_CALLBACKS); + s->write_state = WRITE_STATE_SENT_CLOSE; + maybe_finish_read(t, s); } } @@ -1088,12 +1116,15 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id, if (s) { /* clear out any unreported input & output: nobody cares anymore */ - had_outgoing = s->outgoing_sopb.nops != 0; + had_outgoing = s->outgoing_sopb && s->outgoing_sopb->nops != 0; schedule_nuke_sopb(t, &s->parser.incoming_sopb); - schedule_nuke_sopb(t, &s->outgoing_sopb); + if (s->outgoing_sopb) { + schedule_nuke_sopb(t, s->outgoing_sopb); + schedule_cb(t, s->send_done_closure, 0); + } if (s->cancelled) { send_rst = 0; - } else if (!s->read_closed || !s->sent_write_closed || had_outgoing) { + } else if (!s->read_closed || s->write_state != WRITE_STATE_SENT_CLOSE || had_outgoing) { s->cancelled = 1; stream_list_join(t, s, CANCELLED); @@ -1111,7 +1142,7 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id, break; } - stream_list_join(t, s, PENDING_CALLBACKS); + maybe_finish_read(t, s); } } if (!id) send_rst = 0; @@ -1150,8 +1181,14 @@ static void drop_connection(transport *t) { end_all_the_calls(t); } +static void maybe_finish_read(transport *t, stream *s) { + if (s->incoming_sopb) { + stream_list_join(t, s, FINISHED_READ_OP); + } +} + static void maybe_join_window_updates(transport *t, stream *s) { - if (s->allow_window_updates && + if (s->incoming_sopb != NULL && s->incoming_window < t->settings[LOCAL_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] * @@ -1160,6 +1197,7 @@ static void maybe_join_window_updates(transport *t, stream *s) { } } +#if 0 static void set_allow_window_updates(grpc_transport *tp, grpc_stream *sp, int allow) { transport *t = (transport *)tp; @@ -1174,6 +1212,7 @@ static void set_allow_window_updates(grpc_transport *tp, grpc_stream *sp, } unlock(t); } +#endif static grpc_chttp2_parse_error update_incoming_window(transport *t, stream *s) { if (t->incoming_frame_size > t->incoming_window) { @@ -1271,7 +1310,6 @@ static void on_header(void *tp, grpc_mdelem *md) { grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value))); - stream_list_join(t, s, PENDING_CALLBACKS); if (md->key == t->str_grpc_timeout) { gpr_timespec *cached_timeout = grpc_mdelem_get_user_data(md, free_timeout); if (!cached_timeout) { @@ -1290,6 +1328,7 @@ static void on_header(void *tp, grpc_mdelem *md) { } else { add_incoming_metadata(t, s, md); } + maybe_finish_read(t, s); } static int init_header_frame_parser(transport *t, int is_continuation) { @@ -1464,8 +1503,6 @@ static int is_window_update_legal(gpr_int64 window_update, gpr_int64 window) { return window + window_update < MAX_WINDOW; } -static void free_md(void *p, grpc_op_error result) { gpr_free(p); } - static void add_metadata_batch(transport *t, stream *s) { grpc_metadata_batch b; size_t i; @@ -1483,8 +1520,7 @@ static void add_metadata_batch(transport *t, stream *s) { s->incoming_metadata[s->incoming_metadata_count - 1].next = NULL; grpc_sopb_add_metadata(&s->parser.incoming_sopb, b); - grpc_sopb_add_flow_ctl_cb(&s->parser.incoming_sopb, free_md, - s->incoming_metadata); + /* TODO(ctiller): don't leak incoming_metadata */ /* reset */ s->incoming_deadline = gpr_inf_future; @@ -1501,14 +1537,14 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) { case GRPC_CHTTP2_PARSE_OK: if (st.end_of_stream) { t->incoming_stream->read_closed = 1; - stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS); + maybe_finish_read(t, t->incoming_stream); } if (st.need_flush_reads) { - stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS); + maybe_finish_read(t, t->incoming_stream); } if (st.metadata_boundary) { add_metadata_batch(t, t->incoming_stream); - stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS); + maybe_finish_read(t, t->incoming_stream); } if (st.ack_settings) { gpr_slice_buffer_add(&t->qbuf, grpc_chttp2_settings_ack_create()); @@ -1549,7 +1585,7 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) { int was_window_empty = s->outgoing_window <= 0; s->outgoing_window += st.initial_window_update; if (was_window_empty && s->outgoing_window > 0 && - s->outgoing_sopb.nops > 0) { + s->outgoing_sopb && s->outgoing_sopb->nops > 0) { stream_list_join(t, s, WRITABLE); } } @@ -1568,7 +1604,7 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) { s->outgoing_window += st.window_update; /* if this window update makes outgoing ops writable again, flag that */ - if (was_window_empty && s->outgoing_sopb.nops) { + if (was_window_empty && s->outgoing_sopb && s->outgoing_sopb->nops > 0) { stream_list_join(t, s, WRITABLE); } } @@ -1830,53 +1866,71 @@ static grpc_stream_state compute_state(gpr_uint8 write_closed, return GRPC_STREAM_OPEN; } -static int prepare_callbacks(transport *t) { +static void finish_reads(transport *t) { stream *s; - int n = 0; - while ((s = stream_list_remove_head(t, PENDING_CALLBACKS))) { - int execute = 1; - - s->callback_state = compute_state(s->sent_write_closed, s->read_closed); - if (s->callback_state == GRPC_STREAM_CLOSED) { - remove_from_stream_map(t, s); - if (s->published_close) { - execute = 0; - } else if (s->incoming_metadata_count) { - add_metadata_batch(t, s); - } - s->published_close = 1; - } - grpc_sopb_swap(&s->parser.incoming_sopb, &s->callback_sopb); - - if (execute) { - stream_list_add_tail(t, s, EXECUTING_CALLBACKS); - n = 1; + while ((s = stream_list_remove_head(t, FINISHED_READ_OP)) != NULL) { + int publish = 0; + GPR_ASSERT(s->incoming_sopb); + *s->publish_state = compute_state(s->write_state == WRITE_STATE_SENT_CLOSE, + s->read_closed); + if (*s->publish_state != s->published_state) { + s->published_state = *s->publish_state; + publish = 1; + } + if (s->parser.incoming_sopb.nops > 0) { + grpc_sopb_swap(s->incoming_sopb, &s->parser.incoming_sopb); + publish = 1; } + if (publish) { + schedule_cb(t, s->recv_done_closure, 1); + } + } +} + +static void schedule_cb(transport *t, op_closure closure, int success) { + if (t->pending_callbacks.capacity == t->pending_callbacks.count) { + t->pending_callbacks.capacity = GPR_MAX(t->pending_callbacks.capacity * 2, 8); + t->pending_callbacks.callbacks = gpr_realloc(t->pending_callbacks.callbacks, t->pending_callbacks.capacity * sizeof(*t->pending_callbacks.callbacks)); } - return n; + closure.success = success; + t->pending_callbacks.callbacks[t->pending_callbacks.count++] = closure; +} + +static int prepare_callbacks(transport *t) { + op_closure_array temp = t->pending_callbacks; + t->pending_callbacks = t->executing_callbacks; + t->executing_callbacks = temp; + return t->executing_callbacks.count > 0; } static void run_callbacks(transport *t, const grpc_transport_callbacks *cb) { - stream *s; - while ((s = stream_list_remove_head(t, EXECUTING_CALLBACKS))) { - size_t nops = s->callback_sopb.nops; - s->callback_sopb.nops = 0; - cb->recv_batch(t->cb_user_data, &t->base, (grpc_stream *)s, - s->callback_sopb.ops, nops, s->callback_state); + size_t i; + for (i = 0; i < t->executing_callbacks.count; i++) { + op_closure c = t->executing_callbacks.callbacks[i]; + c.cb(c.user_data, c.success); } + t->executing_callbacks.count = 0; } static void call_cb_closed(transport *t, const grpc_transport_callbacks *cb) { cb->closed(t->cb_user_data, &t->base); } -static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) { - transport *t = (transport *)gt; - lock(t); +/* + * POLLSET STUFF + */ + +static void add_to_pollset_locked(transport *t, grpc_pollset *pollset) { if (t->ep) { grpc_endpoint_add_to_pollset(t->ep, pollset); } +} + +static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) { + transport *t = (transport *)gt; + lock(t); + add_to_pollset_locked(t, pollset); unlock(t); } @@ -1885,8 +1939,8 @@ static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) { */ static const grpc_transport_vtable vtable = { - sizeof(stream), init_stream, send_batch, set_allow_window_updates, - add_to_pollset, destroy_stream, abort_stream, goaway, close_transport, + sizeof(stream), init_stream, perform_op, + add_to_pollset, destroy_stream, goaway, close_transport, send_ping, destroy_transport}; void grpc_create_chttp2_transport(grpc_transport_setup_callback setup, diff --git a/src/core/transport/stream_op.c b/src/core/transport/stream_op.c index 882c078d51..ea22b0e1c8 100644 --- a/src/core/transport/stream_op.c +++ b/src/core/transport/stream_op.c @@ -81,9 +81,6 @@ void grpc_stream_ops_unref_owned_objects(grpc_stream_op *ops, size_t nops) { case GRPC_OP_METADATA: grpc_metadata_batch_destroy(&ops[i].data.metadata); break; - case GRPC_OP_FLOW_CTL_CB: - ops[i].data.flow_ctl_cb.cb(ops[i].data.flow_ctl_cb.arg, GRPC_OP_ERROR); - break; case GRPC_NO_OP: case GRPC_OP_BEGIN_MESSAGE: break; @@ -119,6 +116,7 @@ static grpc_stream_op *add(grpc_stream_op_buffer *sopb) { assert_contained_metadata_ok(sopb->ops, sopb->nops); + GPR_ASSERT(sopb->nops <= sopb->capacity); if (sopb->nops == sopb->capacity) { expandto(sopb, GROW(sopb->capacity)); } @@ -158,16 +156,6 @@ void grpc_sopb_add_slice(grpc_stream_op_buffer *sopb, gpr_slice slice) { assert_contained_metadata_ok(sopb->ops, sopb->nops); } -void grpc_sopb_add_flow_ctl_cb(grpc_stream_op_buffer *sopb, - void (*cb)(void *arg, grpc_op_error error), - void *arg) { - grpc_stream_op *op = add(sopb); - op->type = GRPC_OP_FLOW_CTL_CB; - op->data.flow_ctl_cb.cb = cb; - op->data.flow_ctl_cb.arg = arg; - assert_contained_metadata_ok(sopb->ops, sopb->nops); -} - void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops, size_t nops) { size_t orig_nops = sopb->nops; diff --git a/src/core/transport/stream_op.h b/src/core/transport/stream_op.h index 20146b9af2..f5de64d583 100644 --- a/src/core/transport/stream_op.h +++ b/src/core/transport/stream_op.h @@ -55,9 +55,7 @@ typedef enum grpc_stream_op_code { GRPC_OP_BEGIN_MESSAGE, /* Add a slice of data to the current message/metadata element/status. Must not overflow the forward declared length. */ - GRPC_OP_SLICE, - /* Call some function once this operation has passed flow control. */ - GRPC_OP_FLOW_CTL_CB + GRPC_OP_SLICE } grpc_stream_op_code; /* Arguments for GRPC_OP_BEGIN */ @@ -68,12 +66,6 @@ typedef struct grpc_begin_message { gpr_uint32 flags; } grpc_begin_message; -/* Arguments for GRPC_OP_FLOW_CTL_CB */ -typedef struct grpc_flow_ctl_cb { - void (*cb)(void *arg, grpc_op_error error); - void *arg; -} grpc_flow_ctl_cb; - typedef struct grpc_linked_mdelem { grpc_mdelem *md; struct grpc_linked_mdelem *next; @@ -129,7 +121,6 @@ typedef struct grpc_stream_op { grpc_begin_message begin_message; grpc_metadata_batch metadata; gpr_slice slice; - grpc_flow_ctl_cb flow_ctl_cb; } data; } grpc_stream_op; @@ -163,12 +154,10 @@ void grpc_sopb_add_begin_message(grpc_stream_op_buffer *sopb, gpr_uint32 length, void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb, grpc_metadata_batch metadata); /* Append a GRPC_SLICE to a buffer - does not ref/unref the slice */ void grpc_sopb_add_slice(grpc_stream_op_buffer *sopb, gpr_slice slice); -/* Append a GRPC_OP_FLOW_CTL_CB to a buffer */ -void grpc_sopb_add_flow_ctl_cb(grpc_stream_op_buffer *sopb, - void (*cb)(void *arg, grpc_op_error error), - void *arg); /* Append a buffer to a buffer - does not ref/unref any internal objects */ void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops, size_t nops); +char *grpc_sopb_string(grpc_stream_op_buffer *sopb); + #endif /* GRPC_INTERNAL_CORE_TRANSPORT_STREAM_OP_H */ diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c index ef0020dc58..35195348e7 100644 --- a/src/core/transport/transport.c +++ b/src/core/transport/transport.c @@ -56,14 +56,9 @@ int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream, return transport->vtable->init_stream(transport, stream, server_data); } -void grpc_transport_send_batch(grpc_transport *transport, grpc_stream *stream, - grpc_stream_op *ops, size_t nops, int is_last) { - transport->vtable->send_batch(transport, stream, ops, nops, is_last); -} - -void grpc_transport_set_allow_window_updates(grpc_transport *transport, - grpc_stream *stream, int allow) { - transport->vtable->set_allow_window_updates(transport, stream, allow); +void grpc_transport_perform_op(grpc_transport *transport, grpc_stream *stream, + grpc_transport_op *op) { + transport->vtable->perform_op(transport, stream, op); } void grpc_transport_add_to_pollset(grpc_transport *transport, @@ -76,11 +71,6 @@ void grpc_transport_destroy_stream(grpc_transport *transport, transport->vtable->destroy_stream(transport, stream); } -void grpc_transport_abort_stream(grpc_transport *transport, grpc_stream *stream, - grpc_status_code status) { - transport->vtable->abort_stream(transport, stream, status); -} - void grpc_transport_ping(grpc_transport *transport, void (*cb)(void *user_data), void *user_data) { transport->vtable->ping(transport, cb, user_data); diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index ce8c17c322..d0007680e3 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -62,24 +62,6 @@ typedef enum grpc_stream_state { /* Callbacks made from the transport to the upper layers of grpc. */ struct grpc_transport_callbacks { - /* Allocate a buffer to receive data into. - It's safe to call grpc_slice_new() to do this, but performance minded - proxies may want to carefully place data into optimal locations for - transports. - This function must return a valid, non-empty slice. - - Arguments: - user_data - the transport user data set at transport creation time - transport - the grpc_transport instance making this call - stream - the grpc_stream instance the buffer will be used for, or - NULL if this is not known - size_hint - how big of a buffer would the transport optimally like? - the actual returned buffer can be smaller or larger than - size_hint as the implementation finds convenient */ - struct gpr_slice (*alloc_recv_buffer)(void *user_data, - grpc_transport *transport, - grpc_stream *stream, size_t size_hint); - /* Initialize a new stream on behalf of the transport. Must result in a call to grpc_transport_init_stream(transport, ..., request) in the same call @@ -96,30 +78,7 @@ struct grpc_transport_callbacks { void (*accept_stream)(void *user_data, grpc_transport *transport, const void *server_data); - /* Process a set of stream ops that have been received by the transport. - Called by network threads, so must be careful not to block on network - activity. - - If final_state == GRPC_STREAM_CLOSED, the upper layers should arrange to - call grpc_transport_destroy_stream. - - Ownership of any objects contained in ops is transferred to the callee. - - Arguments: - user_data - the transport user data set at transport creation time - transport - the grpc_transport instance making this call - stream - the stream this data was received for - ops - stream operations that are part of this batch - ops_count - the number of stream operations in this batch - final_state - the state of the stream as of the final operation in this - batch */ - void (*recv_batch)(void *user_data, grpc_transport *transport, - grpc_stream *stream, grpc_stream_op *ops, size_t ops_count, - grpc_stream_state final_state); - - /* The transport received a goaway */ - void (*goaway)(void *user_data, grpc_transport *transport, - grpc_status_code status, gpr_slice debug); + void (*goaway)(void *user_data, grpc_transport *transport, grpc_status_code status, gpr_slice debug); /* The transport has been closed */ void (*closed)(void *user_data, grpc_transport *transport); @@ -154,20 +113,29 @@ int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream, void grpc_transport_destroy_stream(grpc_transport *transport, grpc_stream *stream); -/* Enable/disable incoming data for a stream. +/* Transport op: a set of operations to perform on a transport */ +typedef struct grpc_transport_op { + grpc_stream_op_buffer *send_ops; + int is_last_send; + void (*on_done_send)(void *user_data, int success); + void *send_user_data; - This effectively disables new window becoming available for a given stream, - but does not prevent existing window from being consumed by a sender: the - caller must still be prepared to receive some additional data after this - call. + grpc_stream_op_buffer *recv_ops; + grpc_stream_state *recv_state; + void (*on_done_recv)(void *user_data, int success); + void *recv_user_data; - Arguments: - transport - the transport on which to create this stream - stream - the grpc_stream to destroy (memory is still owned by the - caller, but any child memory must be cleaned up) - allow - is it allowed that new window be opened up? */ -void grpc_transport_set_allow_window_updates(grpc_transport *transport, - grpc_stream *stream, int allow); + grpc_pollset *bind_pollset; + + grpc_status_code cancel_with_status; +} grpc_transport_op; + +void grpc_transport_op_finish_with_failure(grpc_transport_op *op); + +/* TODO(ctiller): remove this */ +void grpc_transport_add_to_pollset(grpc_transport *transport, grpc_pollset *pollset); + +char *grpc_transport_op_string(grpc_transport_op *op); /* Send a batch of operations on a transport @@ -177,13 +145,9 @@ void grpc_transport_set_allow_window_updates(grpc_transport *transport, transport - the transport on which to initiate the stream stream - the stream on which to send the operations. This must be non-NULL and previously initialized by the same transport. - ops - an array of operations to apply to the stream - can be NULL - if ops_count == 0. - ops_count - the number of elements in ops - is_last - is this the last batch of operations to be sent out */ -void grpc_transport_send_batch(grpc_transport *transport, grpc_stream *stream, - grpc_stream_op *ops, size_t ops_count, - int is_last); + op - a grpc_transport_op specifying the op to perform */ +void grpc_transport_perform_op(grpc_transport *transport, grpc_stream *stream, + grpc_transport_op *op); /* Send a ping on a transport @@ -193,19 +157,6 @@ void grpc_transport_send_batch(grpc_transport *transport, grpc_stream *stream, void grpc_transport_ping(grpc_transport *transport, void (*cb)(void *user_data), void *user_data); -/* Abort a stream - - Terminate reading and writing for a stream. A final recv_batch with no - operations and final_state == GRPC_STREAM_CLOSED will be received locally, - and no more data will be presented to the up-layer. - - TODO(ctiller): consider adding a HTTP/2 reason to this function. */ -void grpc_transport_abort_stream(grpc_transport *transport, grpc_stream *stream, - grpc_status_code status); - -void grpc_transport_add_to_pollset(grpc_transport *transport, - grpc_pollset *pollset); - /* Advise peer of pending connection termination. */ void grpc_transport_goaway(grpc_transport *transport, grpc_status_code status, gpr_slice debug_data); diff --git a/src/core/transport/transport_impl.h b/src/core/transport/transport_impl.h index ac275c7560..ef79ac0741 100644 --- a/src/core/transport/transport_impl.h +++ b/src/core/transport/transport_impl.h @@ -46,12 +46,8 @@ typedef struct grpc_transport_vtable { const void *server_data); /* implementation of grpc_transport_send_batch */ - void (*send_batch)(grpc_transport *self, grpc_stream *stream, - grpc_stream_op *ops, size_t ops_count, int is_last); - - /* implementation of grpc_transport_set_allow_window_updates */ - void (*set_allow_window_updates)(grpc_transport *self, grpc_stream *stream, - int allow); + void (*perform_op)(grpc_transport *self, grpc_stream *stream, + grpc_transport_op *op); /* implementation of grpc_transport_add_to_pollset */ void (*add_to_pollset)(grpc_transport *self, grpc_pollset *pollset); @@ -59,10 +55,6 @@ typedef struct grpc_transport_vtable { /* implementation of grpc_transport_destroy_stream */ void (*destroy_stream)(grpc_transport *self, grpc_stream *stream); - /* implementation of grpc_transport_abort_stream */ - void (*abort_stream)(grpc_transport *self, grpc_stream *stream, - grpc_status_code status); - /* implementation of grpc_transport_goaway */ void (*goaway)(grpc_transport *self, grpc_status_code status, gpr_slice debug_data); diff --git a/src/core/transport/transport_op_string.c b/src/core/transport/transport_op_string.c new file mode 100644 index 0000000000..e886690234 --- /dev/null +++ b/src/core/transport/transport_op_string.c @@ -0,0 +1,152 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/channel/channel_stack.h" + +#include <stdarg.h> +#include <stdio.h> +#include <string.h> + +#include "src/core/support/string.h" +#include <grpc/support/alloc.h> +#include <grpc/support/useful.h> + +static void put_metadata(gpr_strvec *b, grpc_mdelem *md) { + gpr_strvec_add(b, gpr_strdup(" key=")); + gpr_strvec_add( + b, gpr_hexdump((char *)GPR_SLICE_START_PTR(md->key->slice), + GPR_SLICE_LENGTH(md->key->slice), GPR_HEXDUMP_PLAINTEXT)); + + gpr_strvec_add(b, gpr_strdup(" value=")); + gpr_strvec_add(b, gpr_hexdump((char *)GPR_SLICE_START_PTR(md->value->slice), + GPR_SLICE_LENGTH(md->value->slice), + GPR_HEXDUMP_PLAINTEXT)); +} + +static void put_metadata_list(gpr_strvec *b, grpc_metadata_batch md) { + grpc_linked_mdelem *m; + for (m = md.list.head; m != NULL; m = m->next) { + put_metadata(b, m->md); + } + if (gpr_time_cmp(md.deadline, gpr_inf_future) != 0) { + char *tmp; + gpr_asprintf(&tmp, " deadline=%d.%09d", md.deadline.tv_sec, + md.deadline.tv_nsec); + gpr_strvec_add(b, tmp); + } +} + +char *grpc_sopb_string(grpc_stream_op_buffer *sopb) { + char *out; + char *tmp; + size_t i; + gpr_strvec b; + gpr_strvec_init(&b); + + for (i = 0; i < sopb->nops; i++) { + grpc_stream_op *op = &sopb->ops[i]; + if (i) gpr_strvec_add(&b, gpr_strdup(", ")); + switch (op->type) { + case GRPC_NO_OP: + gpr_strvec_add(&b, gpr_strdup("NO_OP")); + break; + case GRPC_OP_BEGIN_MESSAGE: + gpr_asprintf(&tmp, "BEGIN_MESSAGE:%d", op->data.begin_message.length); + gpr_strvec_add(&b, tmp); + break; + case GRPC_OP_SLICE: + gpr_asprintf(&tmp, "SLICE:%d", GPR_SLICE_LENGTH(op->data.slice)); + break; + case GRPC_OP_METADATA: + put_metadata_list(&b, op->data.metadata); + break; + } + } + + out = gpr_strvec_flatten(&b, NULL); + gpr_strvec_destroy(&b); + + return out; +} + +char *grpc_transport_op_string(grpc_transport_op *op) { + char *tmp; + char *out; + int first = 1; + + gpr_strvec b; + gpr_strvec_init(&b); + + if (op->send_ops) { + if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); + first = 0; + gpr_strvec_add(&b, gpr_strdup("SEND")); + if (op->is_last_send) { + gpr_strvec_add(&b, gpr_strdup("_LAST")); + } + gpr_strvec_add(&b, gpr_strdup("[")); + gpr_strvec_add(&b, grpc_sopb_string(op->send_ops)); + gpr_strvec_add(&b, gpr_strdup("]")); + } + + if (op->recv_ops) { + if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); + first = 0; + gpr_strvec_add(&b, gpr_strdup("RECV")); + } + + if (op->bind_pollset) { + if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); + first = 0; + gpr_strvec_add(&b, gpr_strdup("BIND")); + } + + if (op->cancel_with_status != GRPC_STATUS_OK) { + if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); + first = 0; + gpr_asprintf(&tmp, "CANCEL:%d", op->cancel_with_status); + gpr_strvec_add(&b, tmp); + } + + out = gpr_strvec_flatten(&b, NULL); + gpr_strvec_destroy(&b); + + return out; +} + +void grpc_call_log_op(char *file, int line, gpr_log_severity severity, + grpc_call_element *elem, grpc_transport_op *op) { + char *str = grpc_transport_op_string(op); + gpr_log(file, line, severity, "OP[%s:%p]: %s", elem->filter->name, elem, str); + gpr_free(str); +} |