diff options
Diffstat (limited to 'src/core/surface')
-rw-r--r-- | src/core/surface/call.c | 546 | ||||
-rw-r--r-- | src/core/surface/call.h | 15 | ||||
-rw-r--r-- | src/core/surface/channel.c | 27 | ||||
-rw-r--r-- | src/core/surface/channel.h | 3 | ||||
-rw-r--r-- | src/core/surface/channel_create.c | 5 | ||||
-rw-r--r-- | src/core/surface/client.c | 36 | ||||
-rw-r--r-- | src/core/surface/completion_queue.c | 2 | ||||
-rw-r--r-- | src/core/surface/lame_client.c | 31 | ||||
-rw-r--r-- | src/core/surface/secure_channel_create.c | 5 | ||||
-rw-r--r-- | src/core/surface/server.c | 145 | ||||
-rw-r--r-- | src/core/surface/server_chttp2.c | 5 |
11 files changed, 445 insertions, 375 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 2949805622..8f1dfa75b8 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -81,9 +81,9 @@ typedef struct { grpc_ioreq_completion_func on_complete; void *user_data; /* a bit mask of which request ops are needed (1u << opid) */ - gpr_uint32 need_mask; + gpr_uint16 need_mask; /* a bit mask of which request ops are now completed */ - gpr_uint32 complete_mask; + gpr_uint16 complete_mask; } reqinfo_master; /* Status data for a request can come from several sources; this @@ -144,12 +144,17 @@ struct grpc_call { gpr_uint8 have_alarm; /* are we currently performing a send operation */ gpr_uint8 sending; + /* are we currently performing a recv operation */ + gpr_uint8 receiving; /* are we currently completing requests */ gpr_uint8 completing; /* pairs with completed_requests */ gpr_uint8 num_completed_requests; - /* flag that we need to request more data */ - gpr_uint8 need_more_data; + /* are we currently reading a message? */ + gpr_uint8 reading_message; + /* flags with bits corresponding to write states allowing us to determine + what was sent */ + gpr_uint16 last_send_contains; /* Active ioreqs. request_set and request_data contain one element per active ioreq @@ -214,6 +219,13 @@ struct grpc_call { size_t send_initial_metadata_count; gpr_timespec send_deadline; + grpc_stream_op_buffer send_ops; + grpc_stream_op_buffer recv_ops; + grpc_stream_state recv_state; + + gpr_slice_buffer incoming_message; + gpr_uint32 incoming_message_length; + /* Data that the legacy api needs to track. To be deleted at some point soon */ legacy_state *legacy_state; @@ -234,9 +246,13 @@ struct grpc_call { } while (0) static void do_nothing(void *ignored, grpc_op_error also_ignored) {} -static send_action choose_send_action(grpc_call *call); -static void enact_send_action(grpc_call *call, send_action sa); static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline); +static void call_on_done_recv(void *call, int success); +static void call_on_done_send(void *call, int success); +static int fill_send_ops(grpc_call *call, grpc_transport_op *op); +static void execute_op(grpc_call *call, grpc_transport_op *op); +static void recv_metadata(grpc_call *call, grpc_metadata_batch *metadata); +static void finish_read_ops(grpc_call *call); grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, const void *server_transport_data, @@ -244,6 +260,8 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, size_t add_initial_metadata_count, gpr_timespec send_deadline) { size_t i; + grpc_transport_op initial_op; + grpc_transport_op *initial_op_ptr = NULL; grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel); grpc_call *call = gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size); @@ -267,10 +285,24 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, call->send_deadline = send_deadline; grpc_channel_internal_ref(channel); call->metadata_context = grpc_channel_get_metadata_context(channel); - /* one ref is dropped in response to destroy, the other in - stream_closed */ - gpr_ref_init(&call->internal_refcount, 2); - grpc_call_stack_init(channel_stack, server_transport_data, + grpc_sopb_init(&call->send_ops); + grpc_sopb_init(&call->recv_ops); + gpr_slice_buffer_init(&call->incoming_message); + /* dropped in destroy */ + gpr_ref_init(&call->internal_refcount, 1); + /* server hack: start reads immediately so we can get initial metadata. + TODO(ctiller): figure out a cleaner solution */ + if (!call->is_client) { + memset(&initial_op, 0, sizeof(initial_op)); + initial_op.recv_ops = &call->recv_ops; + initial_op.recv_state = &call->recv_state; + initial_op.on_done_recv = call_on_done_recv; + initial_op.recv_user_data = call; + call->receiving = 1; + grpc_call_internal_ref(call); + initial_op_ptr = &initial_op; + } + grpc_call_stack_init(channel_stack, server_transport_data, initial_op_ptr, CALL_STACK_FROM_CALL(call)); if (gpr_time_cmp(send_deadline, gpr_inf_future) != 0) { set_deadline_alarm(call, send_deadline); @@ -287,7 +319,8 @@ grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call) { return call->cq; } -void grpc_call_internal_ref(grpc_call *c) { gpr_ref(&c->internal_refcount); } +void grpc_call_internal_ref(grpc_call *c) { + gpr_ref(&c->internal_refcount); } static void destroy_call(void *call, int ignored_success) { size_t i; @@ -310,10 +343,13 @@ static void destroy_call(void *call, int ignored_success) { for (i = 0; i < c->send_initial_metadata_count; i++) { grpc_mdelem_unref(c->send_initial_metadata[i].md); } + grpc_sopb_destroy(&c->send_ops); + grpc_sopb_destroy(&c->recv_ops); if (c->legacy_state) { destroy_legacy_state(c->legacy_state); } grpc_bbq_destroy(&c->incoming_queue); + gpr_slice_buffer_destroy(&c->incoming_message); gpr_free(c); } @@ -359,20 +395,6 @@ static grpc_call_error bind_cq(grpc_call *call, grpc_completion_queue *cq) { return GRPC_CALL_OK; } -static void request_more_data(grpc_call *call) { - grpc_call_op op; - - /* call down */ - op.type = GRPC_REQUEST_DATA; - op.dir = GRPC_CALL_DOWN; - op.flags = 0; - op.done_cb = do_nothing; - op.user_data = NULL; - op.bind_pollset = NULL; - - grpc_call_execute_op(call, &op); -} - static int is_op_live(grpc_call *call, grpc_ioreq_op op) { gpr_uint8 set = call->request_set[op]; reqinfo_master *master; @@ -383,17 +405,41 @@ static int is_op_live(grpc_call *call, grpc_ioreq_op op) { static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); } +static int need_more_data(grpc_call *call) { + return is_op_live(call, GRPC_IOREQ_RECV_INITIAL_METADATA) || + is_op_live(call, GRPC_IOREQ_RECV_MESSAGE) || + is_op_live(call, GRPC_IOREQ_RECV_TRAILING_METADATA) || + is_op_live(call, GRPC_IOREQ_RECV_STATUS) || + is_op_live(call, GRPC_IOREQ_RECV_STATUS_DETAILS) || + (is_op_live(call, GRPC_IOREQ_RECV_CLOSE) && grpc_bbq_empty(&call->incoming_queue)) || + (call->write_state == WRITE_STATE_INITIAL && !call->is_client && call->read_state != READ_STATE_STREAM_CLOSED); +} + static void unlock(grpc_call *call) { - send_action sa = SEND_NOTHING; + grpc_transport_op op; completed_request completed_requests[GRPC_IOREQ_OP_COUNT]; int completing_requests = 0; - int need_more_data = - call->need_more_data && - (call->write_state >= WRITE_STATE_STARTED || !call->is_client); + int start_op = 0; int i; - if (need_more_data) { - call->need_more_data = 0; + memset(&op, 0, sizeof(op)); + + if (!call->receiving && need_more_data(call)) { + op.recv_ops = &call->recv_ops; + op.recv_state = &call->recv_state; + op.on_done_recv = call_on_done_recv; + op.recv_user_data = call; + call->receiving = 1; + grpc_call_internal_ref(call); + start_op = 1; + } + + if (!call->sending) { + if (fill_send_ops(call, &op)) { + call->sending = 1; + grpc_call_internal_ref(call); + start_op = 1; + } } if (!call->completing && call->num_completed_requests != 0) { @@ -405,22 +451,10 @@ static void unlock(grpc_call *call) { grpc_call_internal_ref(call); } - if (!call->sending) { - sa = choose_send_action(call); - if (sa != SEND_NOTHING) { - call->sending = 1; - grpc_call_internal_ref(call); - } - } - gpr_mu_unlock(&call->mu); - if (need_more_data) { - request_more_data(call); - } - - if (sa != SEND_NOTHING) { - enact_send_action(call, sa); + if (start_op) { + execute_op(call, &op); } if (completing_requests > 0) { @@ -495,7 +529,6 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op, master->complete_mask |= 1u << op; if (status != GRPC_OP_OK) { master->status = status; - master->complete_mask = master->need_mask; } if (master->complete_mask == master->need_mask) { for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) { @@ -554,64 +587,144 @@ static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op, } } -static void finish_send_op(grpc_call *call, grpc_ioreq_op op, write_state ws, - grpc_op_error error) { +static void call_on_done_send(void *pc, int success) { + grpc_call *call = pc; + grpc_op_error error = success ? GRPC_OP_OK : GRPC_OP_ERROR; lock(call); - finish_ioreq_op(call, op, error); + if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_INITIAL_METADATA)) { + finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, error); + } + if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_MESSAGE)) { + finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, error); + } + if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_CLOSE)) { + finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, error); + finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, error); + finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, GRPC_OP_OK); + } + call->last_send_contains = 0; call->sending = 0; - call->write_state = ws; unlock(call); grpc_call_internal_unref(call, 0); } -static void finish_write_step(void *pc, grpc_op_error error) { - finish_send_op(pc, GRPC_IOREQ_SEND_MESSAGE, WRITE_STATE_STARTED, error); +static void finish_message(grpc_call *call) { + /* TODO(ctiller): this could be a lot faster if coded directly */ + grpc_byte_buffer *byte_buffer = grpc_byte_buffer_create( + call->incoming_message.slices, call->incoming_message.count); + gpr_slice_buffer_reset_and_unref(&call->incoming_message); + + grpc_bbq_push(&call->incoming_queue, byte_buffer); + + GPR_ASSERT(call->incoming_message.count == 0); + call->reading_message = 0; } -static void finish_finish_step(void *pc, grpc_op_error error) { - finish_send_op(pc, GRPC_IOREQ_SEND_CLOSE, WRITE_STATE_WRITE_CLOSED, error); +static int begin_message(grpc_call *call, grpc_begin_message msg) { + /* can't begin a message when we're still reading a message */ + if (call->reading_message) { + char *message = NULL; + gpr_asprintf( + &message, "Message terminated early; read %d bytes, expected %d", + (int)call->incoming_message.length, (int)call->incoming_message_length); + grpc_call_cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message); + gpr_free(message); + return 0; + } + /* stash away parameters, and prepare for incoming slices */ + if (msg.length > grpc_channel_get_max_message_length(call->channel)) { + char *message = NULL; + gpr_asprintf( + &message, + "Maximum message length of %d exceeded by a message of length %d", + grpc_channel_get_max_message_length(call->channel), msg.length); + grpc_call_cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message); + gpr_free(message); + return 0; + } else if (msg.length > 0) { + call->reading_message = 1; + call->incoming_message_length = msg.length; + return 1; + } else { + finish_message(call); + return 1; + } } -static void finish_start_step(void *pc, grpc_op_error error) { - finish_send_op(pc, GRPC_IOREQ_SEND_INITIAL_METADATA, WRITE_STATE_STARTED, - error); +static int add_slice_to_message(grpc_call *call, gpr_slice slice) { + if (GPR_SLICE_LENGTH(slice) == 0) { + gpr_slice_unref(slice); + return 1; + } + /* we have to be reading a message to know what to do here */ + if (!call->reading_message) { + grpc_call_cancel_with_status( + call, GRPC_STATUS_INVALID_ARGUMENT, + "Received payload data while not reading a message"); + return 0; + } + /* append the slice to the incoming buffer */ + gpr_slice_buffer_add(&call->incoming_message, slice); + if (call->incoming_message.length > call->incoming_message_length) { + /* if we got too many bytes, complain */ + char *message = NULL; + gpr_asprintf( + &message, "Receiving message overflow; read %d bytes, expected %d", + (int)call->incoming_message.length, (int)call->incoming_message_length); + grpc_call_cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message); + gpr_free(message); + return 0; + } else if (call->incoming_message.length == call->incoming_message_length) { + finish_message(call); + return 1; + } else { + return 1; + } } -static send_action choose_send_action(grpc_call *call) { - switch (call->write_state) { - case WRITE_STATE_INITIAL: - if (is_op_live(call, GRPC_IOREQ_SEND_INITIAL_METADATA)) { - if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE) || - is_op_live(call, GRPC_IOREQ_SEND_CLOSE)) { - return SEND_BUFFERED_INITIAL_METADATA; - } else { - return SEND_INITIAL_METADATA; - } - } - return SEND_NOTHING; - case WRITE_STATE_STARTED: - if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE)) { - if (is_op_live(call, GRPC_IOREQ_SEND_CLOSE)) { - return SEND_BUFFERED_MESSAGE; - } else { - return SEND_MESSAGE; - } - } else if (is_op_live(call, GRPC_IOREQ_SEND_CLOSE)) { - finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, GRPC_OP_OK); - finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, GRPC_OP_OK); - if (call->is_client) { - return SEND_FINISH; - } else { - return SEND_TRAILING_METADATA_AND_FINISH; - } +static void call_on_done_recv(void *pc, int success) { + grpc_call *call = pc; + size_t i; + lock(call); + call->receiving = 0; + if (success) { + for (i = 0; success && i < call->recv_ops.nops; i++) { + grpc_stream_op *op = &call->recv_ops.ops[i]; + switch (op->type) { + case GRPC_NO_OP: + break; + case GRPC_OP_METADATA: + recv_metadata(call, &op->data.metadata); + break; + case GRPC_OP_BEGIN_MESSAGE: + success = begin_message(call, op->data.begin_message); + break; + case GRPC_OP_SLICE: + success = add_slice_to_message(call, op->data.slice); + break; } - return SEND_NOTHING; - case WRITE_STATE_WRITE_CLOSED: - return SEND_NOTHING; + } + if (call->recv_state == GRPC_STREAM_RECV_CLOSED) { + GPR_ASSERT(call->read_state <= READ_STATE_READ_CLOSED); + call->read_state = READ_STATE_READ_CLOSED; + } + if (call->recv_state == GRPC_STREAM_CLOSED) { + GPR_ASSERT(call->read_state <= READ_STATE_STREAM_CLOSED); + call->read_state = READ_STATE_STREAM_CLOSED; + } + finish_read_ops(call); + } else { + finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_ERROR); + finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_ERROR); + finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_ERROR); + finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_ERROR); + finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_ERROR); + finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS_DETAILS, GRPC_OP_ERROR); } - gpr_log(GPR_ERROR, "should never reach here"); - abort(); - return SEND_NOTHING; + call->recv_ops.nops = 0; + unlock(call); + + grpc_call_internal_unref(call, 0); } static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count, @@ -639,97 +752,102 @@ static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count, return out; } -static void enact_send_action(grpc_call *call, send_action sa) { +/* Copy the contents of a byte buffer into stream ops */ +static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer, + grpc_stream_op_buffer *sopb) { + size_t i; + + switch (byte_buffer->type) { + case GRPC_BB_SLICE_BUFFER: + for (i = 0; i < byte_buffer->data.slice_buffer.count; i++) { + gpr_slice slice = byte_buffer->data.slice_buffer.slices[i]; + gpr_slice_ref(slice); + grpc_sopb_add_slice(sopb, slice); + } + break; + } +} + +static int fill_send_ops(grpc_call *call, grpc_transport_op *op) { grpc_ioreq_data data; - grpc_call_op op; + grpc_metadata_batch mdb; size_t i; - gpr_uint32 flags = 0; char status_str[GPR_LTOA_MIN_BUFSIZE]; + GPR_ASSERT(op->send_ops == NULL); - switch (sa) { - case SEND_NOTHING: - abort(); - break; - case SEND_BUFFERED_INITIAL_METADATA: - flags |= GRPC_WRITE_BUFFER_HINT; - /* fallthrough */ - case SEND_INITIAL_METADATA: + switch (call->write_state) { + case WRITE_STATE_INITIAL: + if (!is_op_live(call, GRPC_IOREQ_SEND_INITIAL_METADATA)) { + break; + } data = call->request_data[GRPC_IOREQ_SEND_INITIAL_METADATA]; - op.type = GRPC_SEND_METADATA; - op.dir = GRPC_CALL_DOWN; - op.flags = flags; - op.data.metadata.list = chain_metadata_from_app( - call, data.send_metadata.count, data.send_metadata.metadata); - op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL; - op.data.metadata.deadline = call->send_deadline; + mdb.list = chain_metadata_from_app(call, data.send_metadata.count, + data.send_metadata.metadata); + mdb.garbage.head = mdb.garbage.tail = NULL; + mdb.deadline = call->send_deadline; for (i = 0; i < call->send_initial_metadata_count; i++) { - grpc_metadata_batch_link_head(&op.data.metadata, - &call->send_initial_metadata[i]); + grpc_metadata_batch_link_head(&mdb, &call->send_initial_metadata[i]); } + grpc_sopb_add_metadata(&call->send_ops, mdb); + op->send_ops = &call->send_ops; + op->bind_pollset = grpc_cq_pollset(call->cq); + call->last_send_contains |= 1 << GRPC_IOREQ_SEND_INITIAL_METADATA; + call->write_state = WRITE_STATE_STARTED; call->send_initial_metadata_count = 0; - op.done_cb = finish_start_step; - op.user_data = call; - op.bind_pollset = grpc_cq_pollset(call->cq); - grpc_call_execute_op(call, &op); - break; - case SEND_BUFFERED_MESSAGE: - flags |= GRPC_WRITE_BUFFER_HINT; - /* fallthrough */ - case SEND_MESSAGE: - data = call->request_data[GRPC_IOREQ_SEND_MESSAGE]; - op.type = GRPC_SEND_MESSAGE; - op.dir = GRPC_CALL_DOWN; - op.flags = flags; - op.data.message = data.send_message; - op.done_cb = finish_write_step; - op.user_data = call; - op.bind_pollset = NULL; - grpc_call_execute_op(call, &op); - break; - case SEND_TRAILING_METADATA_AND_FINISH: - /* send trailing metadata */ - data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA]; - op.type = GRPC_SEND_METADATA; - op.dir = GRPC_CALL_DOWN; - op.flags = flags; - op.data.metadata.list = chain_metadata_from_app( - call, data.send_metadata.count, data.send_metadata.metadata); - op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL; - op.data.metadata.deadline = call->send_deadline; - op.bind_pollset = NULL; - /* send status */ - /* TODO(ctiller): cache common status values */ - data = call->request_data[GRPC_IOREQ_SEND_STATUS]; - gpr_ltoa(data.send_status.code, status_str); - grpc_metadata_batch_add_tail( - &op.data.metadata, &call->status_link, - grpc_mdelem_from_metadata_strings( - call->metadata_context, - grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)), - grpc_mdstr_from_string(call->metadata_context, status_str))); - if (data.send_status.details) { - grpc_metadata_batch_add_tail( - &op.data.metadata, &call->details_link, - grpc_mdelem_from_metadata_strings( - call->metadata_context, - grpc_mdstr_ref(grpc_channel_get_message_string(call->channel)), - grpc_mdstr_from_string(call->metadata_context, - data.send_status.details))); + /* fall through intended */ + case WRITE_STATE_STARTED: + if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE)) { + data = call->request_data[GRPC_IOREQ_SEND_MESSAGE]; + grpc_sopb_add_begin_message( + &call->send_ops, grpc_byte_buffer_length(data.send_message), 0); + copy_byte_buffer_to_stream_ops(data.send_message, &call->send_ops); + op->send_ops = &call->send_ops; + call->last_send_contains |= 1 << GRPC_IOREQ_SEND_MESSAGE; + } + if (is_op_live(call, GRPC_IOREQ_SEND_CLOSE)) { + op->is_last_send = 1; + op->send_ops = &call->send_ops; + call->last_send_contains |= 1 << GRPC_IOREQ_SEND_CLOSE; + call->write_state = WRITE_STATE_WRITE_CLOSED; + if (!call->is_client) { + /* send trailing metadata */ + data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA]; + mdb.list = chain_metadata_from_app(call, data.send_metadata.count, + data.send_metadata.metadata); + mdb.garbage.head = mdb.garbage.tail = NULL; + mdb.deadline = gpr_inf_future; + /* send status */ + /* TODO(ctiller): cache common status values */ + data = call->request_data[GRPC_IOREQ_SEND_STATUS]; + gpr_ltoa(data.send_status.code, status_str); + grpc_metadata_batch_add_tail( + &mdb, &call->status_link, + grpc_mdelem_from_metadata_strings( + call->metadata_context, + grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)), + grpc_mdstr_from_string(call->metadata_context, status_str))); + if (data.send_status.details) { + grpc_metadata_batch_add_tail( + &mdb, &call->details_link, + grpc_mdelem_from_metadata_strings( + call->metadata_context, + grpc_mdstr_ref( + grpc_channel_get_message_string(call->channel)), + grpc_mdstr_from_string(call->metadata_context, + data.send_status.details))); + } + grpc_sopb_add_metadata(&call->send_ops, mdb); + } } - op.done_cb = do_nothing; - op.user_data = NULL; - grpc_call_execute_op(call, &op); - /* fallthrough: see choose_send_action for details */ - case SEND_FINISH: - op.type = GRPC_SEND_FINISH; - op.dir = GRPC_CALL_DOWN; - op.flags = 0; - op.done_cb = finish_finish_step; - op.user_data = call; - op.bind_pollset = NULL; - grpc_call_execute_op(call, &op); break; + case WRITE_STATE_WRITE_CLOSED: + break; + } + if (op->send_ops) { + op->on_done_send = call_on_done_send; + op->send_user_data = call; } + return op->send_ops != NULL; } static grpc_call_error start_ioreq_error(grpc_call *call, @@ -838,10 +956,6 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, master->on_complete = completion; master->user_data = user_data; - if (have_ops & (1u << GRPC_IOREQ_RECV_MESSAGE)) { - call->need_more_data = 1; - } - finish_read_ops(call); early_out_write_ops(call); @@ -871,41 +985,34 @@ void grpc_call_destroy(grpc_call *c) { grpc_call_internal_unref(c, 1); } -grpc_call_error grpc_call_cancel(grpc_call *c) { - grpc_call_element *elem; - grpc_call_op op; - - op.type = GRPC_CANCEL_OP; - op.dir = GRPC_CALL_DOWN; - op.flags = 0; - op.done_cb = do_nothing; - op.user_data = NULL; - op.bind_pollset = NULL; - - elem = CALL_ELEM_FROM_CALL(c, 0); - elem->filter->call_op(elem, NULL, &op); - - return GRPC_CALL_OK; +grpc_call_error grpc_call_cancel(grpc_call *call) { + return grpc_call_cancel_with_status(call, GRPC_STATUS_CANCELLED, "Cancelled"); } grpc_call_error grpc_call_cancel_with_status(grpc_call *c, grpc_status_code status, const char *description) { + grpc_transport_op op; grpc_mdstr *details = description ? grpc_mdstr_from_string(c->metadata_context, description) : NULL; + memset(&op, 0, sizeof(op)); + op.cancel_with_status = status; + lock(c); set_status_code(c, STATUS_FROM_API_OVERRIDE, status); set_status_details(c, STATUS_FROM_API_OVERRIDE, details); unlock(c); - return grpc_call_cancel(c); + + execute_op(c, &op); + + return GRPC_CALL_OK; } -void grpc_call_execute_op(grpc_call *call, grpc_call_op *op) { +static void execute_op(grpc_call *call, grpc_transport_op *op) { grpc_call_element *elem; - GPR_ASSERT(op->dir == GRPC_CALL_DOWN); elem = CALL_ELEM_FROM_CALL(call, 0); - elem->filter->call_op(elem, NULL, op); + elem->filter->start_transport_op(elem, op); } grpc_call *grpc_call_from_top_element(grpc_call_element *elem) { @@ -928,34 +1035,14 @@ static void call_alarm(void *arg, int success) { static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) { if (call->have_alarm) { gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice"); + assert(0); + return; } grpc_call_internal_ref(call); call->have_alarm = 1; grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now()); } -static void set_read_state_locked(grpc_call *call, read_state state) { - GPR_ASSERT(call->read_state < state); - call->read_state = state; - finish_read_ops(call); -} - -static void set_read_state(grpc_call *call, read_state state) { - lock(call); - set_read_state_locked(call, state); - unlock(call); -} - -void grpc_call_read_closed(grpc_call_element *elem) { - set_read_state(CALL_FROM_TOP_ELEM(elem), READ_STATE_READ_CLOSED); -} - -void grpc_call_stream_closed(grpc_call_element *elem) { - grpc_call *call = CALL_FROM_TOP_ELEM(elem); - set_read_state(call, READ_STATE_STREAM_CLOSED); - grpc_call_internal_unref(call, 0); -} - /* we offset status by a small amount when storing it into transport metadata as metadata cannot store a 0 value (which is used as OK for grpc_status_codes */ @@ -979,35 +1066,13 @@ static gpr_uint32 decode_status(grpc_mdelem *md) { return status; } -void grpc_call_recv_message(grpc_call_element *elem, - grpc_byte_buffer *byte_buffer) { - grpc_call *call = CALL_FROM_TOP_ELEM(elem); - lock(call); - grpc_bbq_push(&call->incoming_queue, byte_buffer); - finish_read_ops(call); - unlock(call); -} - -void grpc_call_recv_synthetic_status(grpc_call_element *elem, - grpc_status_code status, - const char *message) { - grpc_call *call = CALL_FROM_TOP_ELEM(elem); - lock(call); - set_status_code(call, STATUS_FROM_CORE, status); - set_status_details(call, STATUS_FROM_CORE, - grpc_mdstr_from_string(call->metadata_context, message)); - unlock(call); -} - -int grpc_call_recv_metadata(grpc_call_element *elem, grpc_metadata_batch *md) { - grpc_call *call = CALL_FROM_TOP_ELEM(elem); +static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) { grpc_linked_mdelem *l; grpc_metadata_array *dest; grpc_metadata *mdusr; int is_trailing; grpc_mdctx *mdctx = call->metadata_context; - lock(call); is_trailing = call->read_state >= READ_STATE_GOT_INITIAL_METADATA; for (l = md->list.head; l != NULL; l = l->next) { grpc_mdelem *md = l->md; @@ -1043,9 +1108,8 @@ int grpc_call_recv_metadata(grpc_call_element *elem, grpc_metadata_batch *md) { set_deadline_alarm(call, md->deadline); } if (!is_trailing) { - set_read_state_locked(call, READ_STATE_GOT_INITIAL_METADATA); + call->read_state = READ_STATE_GOT_INITIAL_METADATA; } - unlock(call); grpc_mdctx_lock(mdctx); for (l = md->list.head; l; l = l->next) { @@ -1055,8 +1119,6 @@ int grpc_call_recv_metadata(grpc_call_element *elem, grpc_metadata_batch *md) { grpc_mdctx_locked_mdelem_unref(mdctx, l->md); } grpc_mdctx_unlock(mdctx); - - return !is_trailing; } grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) { diff --git a/src/core/surface/call.h b/src/core/surface/call.h index f8d0915349..199beb1738 100644 --- a/src/core/surface/call.h +++ b/src/core/surface/call.h @@ -96,27 +96,12 @@ grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call); void grpc_call_internal_ref(grpc_call *call); void grpc_call_internal_unref(grpc_call *call, int allow_immediate_deletion); -/* Helpers for grpc_client, grpc_server filters to publish received data to - the completion queue/surface layer */ -/* receive metadata - returns 1 if this was initial metadata */ -int grpc_call_recv_metadata(grpc_call_element *surface_element, - grpc_metadata_batch *md); -void grpc_call_recv_message(grpc_call_element *surface_element, - grpc_byte_buffer *message); -void grpc_call_read_closed(grpc_call_element *surface_element); -void grpc_call_stream_closed(grpc_call_element *surface_element); - -void grpc_call_execute_op(grpc_call *call, grpc_call_op *op); grpc_call_error grpc_call_start_ioreq_and_call_back( grpc_call *call, const grpc_ioreq *reqs, size_t nreqs, grpc_ioreq_completion_func on_complete, void *user_data); grpc_call_stack *grpc_call_get_call_stack(grpc_call *call); -void grpc_call_recv_synthetic_status(grpc_call_element *elem, - grpc_status_code status, - const char *message); - /* Given the top call_element, get the call object. */ grpc_call *grpc_call_from_top_element(grpc_call_element *surface_element); diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index 29b042e7c1..78f9144c19 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -52,6 +52,7 @@ typedef struct registered_call { struct grpc_channel { int is_client; gpr_refcount refs; + gpr_uint32 max_message_length; grpc_mdctx *metadata_context; grpc_mdstr *grpc_status_string; grpc_mdstr *grpc_message_string; @@ -68,9 +69,13 @@ struct grpc_channel { #define CHANNEL_FROM_TOP_ELEM(top_elem) \ CHANNEL_FROM_CHANNEL_STACK(grpc_channel_stack_from_top_element(top_elem)) +/* the protobuf library will (by default) start warning at 100megs */ +#define DEFAULT_MAX_MESSAGE_LENGTH (100 * 1024 * 1024) + grpc_channel *grpc_channel_create_from_filters( const grpc_channel_filter **filters, size_t num_filters, const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client) { + size_t i; size_t size = sizeof(grpc_channel) + grpc_channel_stack_size(filters, num_filters); grpc_channel *channel = gpr_malloc(size); @@ -88,6 +93,24 @@ grpc_channel *grpc_channel_create_from_filters( CHANNEL_STACK_FROM_CHANNEL(channel)); gpr_mu_init(&channel->registered_call_mu); channel->registered_calls = NULL; + + channel->max_message_length = DEFAULT_MAX_MESSAGE_LENGTH; + if (args) { + for (i = 0; i < args->num_args; i++) { + if (0 == strcmp(args->args[i].key, GRPC_ARG_MAX_MESSAGE_LENGTH)) { + if (args->args[i].type != GRPC_ARG_INTEGER) { + gpr_log(GPR_ERROR, "%s ignored: it must be an integer", + GRPC_ARG_MAX_MESSAGE_LENGTH); + } else if (args->args[i].value.integer < 0) { + gpr_log(GPR_ERROR, "%s ignored: it must be >= 0", + GRPC_ARG_MAX_MESSAGE_LENGTH); + } else { + channel->max_message_length = args->args[i].value.integer; + } + } + } + } + return channel; } @@ -219,3 +242,7 @@ grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel) { grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel) { return channel->grpc_message_string; } + +gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel) { + return channel->max_message_length; +} diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h index d3e51185ee..388be35711 100644 --- a/src/core/surface/channel.h +++ b/src/core/surface/channel.h @@ -44,10 +44,11 @@ grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel); grpc_mdctx *grpc_channel_get_metadata_context(grpc_channel *channel); grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel); grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel); +gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel); void grpc_client_channel_closed(grpc_channel_element *elem); void grpc_channel_internal_ref(grpc_channel *channel); void grpc_channel_internal_unref(grpc_channel *channel); -#endif /* GRPC_INTERNAL_CORE_SURFACE_CHANNEL_H */ +#endif /* GRPC_INTERNAL_CORE_SURFACE_CHANNEL_H */ diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index 3104b1d00d..daa8d3a7c6 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -44,7 +44,6 @@ #include "src/core/channel/client_setup.h" #include "src/core/channel/connected_channel.h" #include "src/core/channel/http_client_filter.h" -#include "src/core/channel/http_filter.h" #include "src/core/iomgr/endpoint.h" #include "src/core/iomgr/resolve_address.h" #include "src/core/iomgr/tcp_client.h" @@ -176,8 +175,8 @@ static void done_setup(void *sp) { static grpc_transport_setup_result complete_setup(void *channel_stack, grpc_transport *transport, grpc_mdctx *mdctx) { - static grpc_channel_filter const *extra_filters[] = {&grpc_http_client_filter, - &grpc_http_filter}; + static grpc_channel_filter const *extra_filters[] = { + &grpc_http_client_filter}; return grpc_client_channel_transport_setup_complete( channel_stack, transport, extra_filters, GPR_ARRAY_SIZE(extra_filters), mdctx); diff --git a/src/core/surface/client.c b/src/core/surface/client.c index 2f898ff7d7..8ac4dd1e0e 100644 --- a/src/core/surface/client.c +++ b/src/core/surface/client.c @@ -43,32 +43,10 @@ typedef struct { void *unused; } call_data; typedef struct { void *unused; } channel_data; -static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, - grpc_call_op *op) { +static void client_start_transport_op(grpc_call_element *elem, + grpc_transport_op *op) { GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - - switch (op->type) { - case GRPC_RECV_METADATA: - grpc_call_recv_metadata(elem, &op->data.metadata); - break; - case GRPC_RECV_MESSAGE: - grpc_call_recv_message(elem, op->data.message); - op->done_cb(op->user_data, GRPC_OP_OK); - break; - case GRPC_RECV_HALF_CLOSE: - grpc_call_read_closed(elem); - break; - case GRPC_RECV_FINISH: - grpc_call_stream_closed(elem); - break; - case GRPC_RECV_SYNTHETIC_STATUS: - grpc_call_recv_synthetic_status(elem, op->data.synthetic_status.status, - op->data.synthetic_status.message); - break; - default: - GPR_ASSERT(op->dir == GRPC_CALL_DOWN); - grpc_call_next_op(elem, op); - } + grpc_call_next_op(elem, op); } static void channel_op(grpc_channel_element *elem, @@ -90,7 +68,8 @@ static void channel_op(grpc_channel_element *elem, } static void init_call_elem(grpc_call_element *elem, - const void *transport_server_data) {} + const void *transport_server_data, + grpc_transport_op *initial_op) {} static void destroy_call_elem(grpc_call_element *elem) {} @@ -104,6 +83,7 @@ static void init_channel_elem(grpc_channel_element *elem, static void destroy_channel_elem(grpc_channel_element *elem) {} const grpc_channel_filter grpc_client_surface_filter = { - call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, - sizeof(channel_data), init_channel_elem, destroy_channel_elem, "client", + client_start_transport_op, channel_op, sizeof(call_data), init_call_elem, + destroy_call_elem, sizeof(channel_data), init_channel_elem, + destroy_channel_elem, "client", }; diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 24f4a05071..f3c2453b5e 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -401,7 +401,9 @@ static void on_pollset_destroy_done(void *arg) { } void grpc_completion_queue_destroy(grpc_completion_queue *cc) { + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); GPR_ASSERT(cc->queue == NULL); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc); } diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index 78170806f1..c93222344d 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -46,22 +46,10 @@ typedef struct { void *unused; } call_data; typedef struct { void *unused; } channel_data; -static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, - grpc_call_op *op) { +static void lame_start_transport_op(grpc_call_element *elem, + grpc_transport_op *op) { GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - - switch (op->type) { - case GRPC_SEND_METADATA: - grpc_metadata_batch_destroy(&op->data.metadata); - grpc_call_recv_synthetic_status(elem, GRPC_STATUS_UNKNOWN, - "Rpc sent on a lame channel."); - grpc_call_stream_closed(elem); - break; - default: - break; - } - - op->done_cb(op->user_data, GRPC_OP_ERROR); + grpc_transport_op_finish_with_failure(op); } static void channel_op(grpc_channel_element *elem, @@ -79,7 +67,12 @@ static void channel_op(grpc_channel_element *elem, } static void init_call_elem(grpc_call_element *elem, - const void *transport_server_data) {} + const void *transport_server_data, + grpc_transport_op *initial_op) { + if (initial_op) { + grpc_transport_op_finish_with_failure(initial_op); + } +} static void destroy_call_elem(grpc_call_element *elem) {} @@ -93,9 +86,9 @@ static void init_channel_elem(grpc_channel_element *elem, static void destroy_channel_elem(grpc_channel_element *elem) {} static const grpc_channel_filter lame_filter = { - call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, - sizeof(channel_data), init_channel_elem, destroy_channel_elem, - "lame-client", + lame_start_transport_op, channel_op, sizeof(call_data), init_call_elem, + destroy_call_elem, sizeof(channel_data), init_channel_elem, + destroy_channel_elem, "lame-client", }; grpc_channel *grpc_lame_client_channel_create(void) { diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index e7d223bfda..3e331293b5 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -44,7 +44,6 @@ #include "src/core/channel/client_setup.h" #include "src/core/channel/connected_channel.h" #include "src/core/channel/http_client_filter.h" -#include "src/core/channel/http_filter.h" #include "src/core/iomgr/resolve_address.h" #include "src/core/iomgr/tcp_client.h" #include "src/core/security/auth.h" @@ -193,7 +192,7 @@ static grpc_transport_setup_result complete_setup(void *channel_stack, grpc_transport *transport, grpc_mdctx *mdctx) { static grpc_channel_filter const *extra_filters[] = { - &grpc_client_auth_filter, &grpc_http_client_filter, &grpc_http_filter}; + &grpc_client_auth_filter, &grpc_http_client_filter}; return grpc_client_channel_transport_setup_complete( channel_stack, transport, extra_filters, GPR_ARRAY_SIZE(extra_filters), mdctx); @@ -211,7 +210,7 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds, grpc_arg connector_arg; grpc_channel_args *args_copy; grpc_channel_args *new_args_from_connector; - grpc_channel_security_connector* connector; + grpc_channel_security_connector *connector; grpc_mdctx *mdctx; #define MAX_FILTERS 3 const grpc_channel_filter *filters[MAX_FILTERS]; diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 3b129039bb..3cc8f9cbb9 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -173,13 +173,19 @@ struct call_data { grpc_call *call; call_state state; - gpr_timespec deadline; grpc_mdstr *path; grpc_mdstr *host; + gpr_timespec deadline; + int got_initial_metadata; legacy_data *legacy; grpc_completion_queue *cq_new; + grpc_stream_op_buffer *recv_ops; + grpc_stream_state *recv_state; + void (*on_done_recv)(void *user_data, int success); + void *recv_user_data; + call_data **root[CALL_LIST_COUNT]; call_link links[CALL_LIST_COUNT]; }; @@ -371,46 +377,6 @@ static void kill_zombie(void *elem, int success) { grpc_call_destroy(grpc_call_from_top_element(elem)); } -static void stream_closed(grpc_call_element *elem) { - call_data *calld = elem->call_data; - channel_data *chand = elem->channel_data; - gpr_mu_lock(&chand->server->mu); - switch (calld->state) { - case ACTIVATED: - break; - case PENDING: - call_list_remove(calld, PENDING_START); - /* fallthrough intended */ - case NOT_STARTED: - calld->state = ZOMBIED; - grpc_iomgr_add_callback(kill_zombie, elem); - break; - case ZOMBIED: - break; - } - gpr_mu_unlock(&chand->server->mu); - grpc_call_stream_closed(elem); -} - -static void read_closed(grpc_call_element *elem) { - call_data *calld = elem->call_data; - channel_data *chand = elem->channel_data; - gpr_mu_lock(&chand->server->mu); - switch (calld->state) { - case ACTIVATED: - case PENDING: - grpc_call_read_closed(elem); - break; - case NOT_STARTED: - calld->state = ZOMBIED; - grpc_iomgr_add_callback(kill_zombie, elem); - break; - case ZOMBIED: - break; - } - gpr_mu_unlock(&chand->server->mu); -} - static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { grpc_call_element *elem = user_data; channel_data *chand = elem->channel_data; @@ -425,33 +391,75 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { return md; } -static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn, - grpc_call_op *op) { +static void server_on_recv(void *ptr, int success) { + grpc_call_element *elem = ptr; call_data *calld = elem->call_data; - GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - switch (op->type) { - case GRPC_RECV_METADATA: + channel_data *chand = elem->channel_data; + + if (success && !calld->got_initial_metadata) { + size_t i; + size_t nops = calld->recv_ops->nops; + grpc_stream_op *ops = calld->recv_ops->ops; + for (i = 0; i < nops; i++) { + grpc_stream_op *op = &ops[i]; + if (op->type != GRPC_OP_METADATA) continue; grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem); - if (grpc_call_recv_metadata(elem, &op->data.metadata)) { + if (0 != gpr_time_cmp(op->data.metadata.deadline, gpr_inf_future)) { calld->deadline = op->data.metadata.deadline; - start_new_rpc(elem); } + calld->got_initial_metadata = 1; + start_new_rpc(elem); break; - case GRPC_RECV_MESSAGE: - grpc_call_recv_message(elem, op->data.message); - op->done_cb(op->user_data, GRPC_OP_OK); + } + } + + switch (*calld->recv_state) { + case GRPC_STREAM_OPEN: break; - case GRPC_RECV_HALF_CLOSE: - read_closed(elem); + case GRPC_STREAM_SEND_CLOSED: break; - case GRPC_RECV_FINISH: - stream_closed(elem); + case GRPC_STREAM_RECV_CLOSED: + gpr_mu_lock(&chand->server->mu); + if (calld->state == NOT_STARTED) { + calld->state = ZOMBIED; + grpc_iomgr_add_callback(kill_zombie, elem); + } + gpr_mu_unlock(&chand->server->mu); break; - default: - GPR_ASSERT(op->dir == GRPC_CALL_DOWN); - grpc_call_next_op(elem, op); + case GRPC_STREAM_CLOSED: + gpr_mu_lock(&chand->server->mu); + if (calld->state == NOT_STARTED) { + calld->state = ZOMBIED; + grpc_iomgr_add_callback(kill_zombie, elem); + } else if (calld->state == PENDING) { + call_list_remove(calld, PENDING_START); + } + gpr_mu_unlock(&chand->server->mu); break; } + + calld->on_done_recv(calld->recv_user_data, success); +} + +static void server_mutate_op(grpc_call_element *elem, grpc_transport_op *op) { + call_data *calld = elem->call_data; + + if (op->recv_ops) { + /* substitute our callback for the higher callback */ + calld->recv_ops = op->recv_ops; + calld->recv_state = op->recv_state; + calld->on_done_recv = op->on_done_recv; + calld->recv_user_data = op->recv_user_data; + op->on_done_recv = server_on_recv; + op->recv_user_data = elem; + } +} + +static void server_start_transport_op(grpc_call_element *elem, + grpc_transport_op *op) { + GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + server_mutate_op(elem, op); + grpc_call_next_op(elem, op); } static void channel_op(grpc_channel_element *elem, @@ -502,7 +510,8 @@ static void shutdown_channel(channel_data *chand) { } static void init_call_elem(grpc_call_element *elem, - const void *server_transport_data) { + const void *server_transport_data, + grpc_transport_op *initial_op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; memset(calld, 0, sizeof(call_data)); @@ -514,6 +523,8 @@ static void init_call_elem(grpc_call_element *elem, gpr_mu_unlock(&chand->server->mu); server_ref(chand->server); + + if (initial_op) server_mutate_op(elem, initial_op); } static void destroy_call_elem(grpc_call_element *elem) { @@ -592,8 +603,9 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } static const grpc_channel_filter server_surface_filter = { - call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, - sizeof(channel_data), init_channel_elem, destroy_channel_elem, "server", + server_start_transport_op, channel_op, sizeof(call_data), init_call_elem, + destroy_call_elem, sizeof(channel_data), init_channel_elem, + destroy_channel_elem, "server", }; static void addcq(grpc_server *server, grpc_completion_queue *cq) { @@ -913,6 +925,8 @@ void grpc_server_destroy(grpc_server *server) { channel_data *c; listener *l; size_t i; + call_data *calld; + gpr_mu_lock(&server->mu); if (!server->shutdown) { gpr_mu_unlock(&server->mu); @@ -937,6 +951,15 @@ void grpc_server_destroy(grpc_server *server) { gpr_free(l); } + while ((calld = call_list_remove_head(&server->lists[PENDING_START], + PENDING_START)) != NULL) { + gpr_log(GPR_DEBUG, "server destroys call %p", calld->call); + calld->state = ZOMBIED; + grpc_iomgr_add_callback( + kill_zombie, + grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); + } + for (c = server->root_channel_data.next; c != &server->root_channel_data; c = c->next) { shutdown_channel(c); diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c index f3b9219f8b..7b5c2f227b 100644 --- a/src/core/surface/server_chttp2.c +++ b/src/core/surface/server_chttp2.c @@ -33,7 +33,6 @@ #include <grpc/grpc.h> -#include "src/core/channel/http_filter.h" #include "src/core/channel/http_server_filter.h" #include "src/core/iomgr/resolve_address.h" #include "src/core/iomgr/tcp_server.h" @@ -46,8 +45,8 @@ static grpc_transport_setup_result setup_transport(void *server, grpc_transport *transport, grpc_mdctx *mdctx) { - static grpc_channel_filter const *extra_filters[] = {&grpc_http_server_filter, - &grpc_http_filter}; + static grpc_channel_filter const *extra_filters[] = { + &grpc_http_server_filter}; return grpc_server_setup_transport(server, transport, extra_filters, GPR_ARRAY_SIZE(extra_filters), mdctx); } |