From acd07f7f40300447dda2459cf21d42814062ca37 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Mon, 5 Nov 2018 01:09:59 -0800 Subject: Make inproc transport properly obey status ordering rules --- src/core/ext/transport/inproc/inproc_transport.cc | 38 ++++++++++------------- 1 file changed, 16 insertions(+), 22 deletions(-) (limited to 'src/core') diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc index 1c4e2e79fe..61968de4d5 100644 --- a/src/core/ext/transport/inproc/inproc_transport.cc +++ b/src/core/ext/transport/inproc/inproc_transport.cc @@ -608,10 +608,8 @@ void op_state_machine(void* arg, grpc_error* error) { if (other->recv_message_op) { message_transfer_locked(s, other); maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE); - } else if (!s->t->is_client && - (s->trailing_md_sent || other->recv_trailing_md_op)) { - // A server send will never be matched if the client is waiting - // for trailing metadata already + } else if (!s->t->is_client && s->trailing_md_sent) { + // A server send will never be matched if the server already sent status s->send_message_op->payload->send_message.send_message.reset(); complete_if_batch_end_locked( s, GRPC_ERROR_NONE, s->send_message_op, @@ -622,11 +620,15 @@ void op_state_machine(void* arg, grpc_error* error) { // Pause a send trailing metadata if there is still an outstanding // send message unless we know that the send message will never get // matched to a receive. This happens on the client if the server has - // already sent status. + // already sent status or on the server if the client has requested + // status if (s->send_trailing_md_op && (!s->send_message_op || (s->t->is_client && - (s->trailing_md_recvd || s->to_read_trailing_md_filled)))) { + (s->trailing_md_recvd || s->to_read_trailing_md_filled)) || + (!s->t->is_client && other && + (other->trailing_md_recvd || other->to_read_trailing_md_filled || + other->recv_trailing_md_op)))) { grpc_metadata_batch* dest = (other == nullptr) ? &s->write_buffer_trailing_md : &other->to_read_trailing_md; @@ -724,16 +726,6 @@ void op_state_machine(void* arg, grpc_error* error) { maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE); } } - if (s->recv_trailing_md_op && s->t->is_client && other && - other->send_message_op) { - INPROC_LOG(GPR_INFO, - "op_state_machine %p scheduling trailing-metadata-ready %p", s, - GRPC_ERROR_NONE); - GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata - .recv_trailing_metadata_ready, - GRPC_ERROR_NONE); - maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE); - } if (s->to_read_trailing_md_filled) { if (s->trailing_md_recvd) { new_err = @@ -749,6 +741,7 @@ void op_state_machine(void* arg, grpc_error* error) { if (s->recv_message_op != nullptr) { // This message needs to be wrapped up because it will never be // satisfied + *s->recv_message_op->payload->recv_message.recv_message = nullptr; INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s); GRPC_CLOSURE_SCHED( s->recv_message_op->payload->recv_message.recv_message_ready, @@ -811,6 +804,7 @@ void op_state_machine(void* arg, grpc_error* error) { // No further message will come on this stream, so finish off the // recv_message_op INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s); + *s->recv_message_op->payload->recv_message.recv_message = nullptr; GRPC_CLOSURE_SCHED( s->recv_message_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); @@ -1013,18 +1007,18 @@ void perform_stream_op(grpc_transport* gt, grpc_stream* gs, } // We want to initiate the closure if: - // 1. We want to send a message and the other side wants to receive or end + // 1. We want to send a message and the other side wants to receive // 2. We want to send trailing metadata and there isn't an unmatched send + // or the other side wants trailing metadata // 3. We want initial metadata and the other side has sent it // 4. We want to receive a message and there is a message ready // 5. There is trailing metadata, even if nothing specifically wants // that because that can shut down the receive message as well - if ((op->send_message && other && - ((other->recv_message_op != nullptr) || - (other->recv_trailing_md_op != nullptr))) || - (op->send_trailing_metadata && !op->send_message) || + if ((op->send_message && other && other->recv_message_op != nullptr) || + (op->send_trailing_metadata && + (!s->send_message_op || (other && other->recv_trailing_md_op))) || (op->recv_initial_metadata && s->to_read_initial_md_filled) || - (op->recv_message && other && (other->send_message_op != nullptr)) || + (op->recv_message && other && other->send_message_op != nullptr) || (s->to_read_trailing_md_filled || s->trailing_md_recvd)) { if (!s->op_closure_scheduled) { GRPC_CLOSURE_SCHED(&s->op_closure, GRPC_ERROR_NONE); -- cgit v1.2.3