From 4f0cd0e82c425533bef9f7ab8119cc542bcc9a41 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Fri, 22 Sep 2017 23:34:43 -0700 Subject: Add flow control to inproc transport so send needs a matching recv; fix some tests that assumed some sends could always go out --- src/core/ext/transport/inproc/inproc_transport.cc | 669 ++++++++++------------ 1 file changed, 299 insertions(+), 370 deletions(-) (limited to 'src/core') diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc index 1001d74c22..67a8358927 100644 --- a/src/core/ext/transport/inproc/inproc_transport.cc +++ b/src/core/ext/transport/inproc/inproc_transport.cc @@ -62,96 +62,22 @@ typedef struct inproc_transport { struct inproc_stream *stream_list; } inproc_transport; -typedef struct sb_list_entry { - grpc_slice_buffer sb; - struct sb_list_entry *next; -} sb_list_entry; - -// Specialize grpc_byte_stream for our use case -typedef struct { - grpc_byte_stream base; - sb_list_entry *le; - grpc_error *shutdown_error; -} inproc_slice_byte_stream; - -typedef struct { - // TODO (vjpai): Add some inlined elements to avoid alloc in simple cases - sb_list_entry *head; - sb_list_entry *tail; -} slice_buffer_list; - -static void slice_buffer_list_init(slice_buffer_list *l) { - l->head = NULL; - l->tail = NULL; -} - -static void sb_list_entry_destroy(grpc_exec_ctx *exec_ctx, sb_list_entry *le) { - grpc_slice_buffer_destroy_internal(exec_ctx, &le->sb); - gpr_free(le); -} - -static void slice_buffer_list_destroy(grpc_exec_ctx *exec_ctx, - slice_buffer_list *l) { - sb_list_entry *curr = l->head; - while (curr != NULL) { - sb_list_entry *le = curr; - curr = curr->next; - sb_list_entry_destroy(exec_ctx, le); - } - l->head = NULL; - l->tail = NULL; -} - -static bool slice_buffer_list_empty(slice_buffer_list *l) { - return l->head == NULL; -} - -static void slice_buffer_list_append_entry(slice_buffer_list *l, - sb_list_entry *next) { - next->next = NULL; - if (l->tail) { - l->tail->next = next; - l->tail = next; - } else { - l->head = next; - l->tail = next; - } -} - -static grpc_slice_buffer *slice_buffer_list_append(slice_buffer_list *l) { - sb_list_entry *next = (sb_list_entry *)gpr_malloc(sizeof(*next)); - grpc_slice_buffer_init(&next->sb); - slice_buffer_list_append_entry(l, next); - return &next->sb; -} - -static sb_list_entry *slice_buffer_list_pophead(slice_buffer_list *l) { - sb_list_entry *ret = l->head; - l->head = l->head->next; - if (l->head == NULL) { - l->tail = NULL; - } - return ret; -} - typedef struct inproc_stream { inproc_transport *t; grpc_metadata_batch to_read_initial_md; uint32_t to_read_initial_md_flags; bool to_read_initial_md_filled; - slice_buffer_list to_read_message; grpc_metadata_batch to_read_trailing_md; bool to_read_trailing_md_filled; - bool reads_needed; - bool read_closure_scheduled; - grpc_closure read_closure; + bool ops_needed; + bool op_closure_scheduled; + grpc_closure op_closure; // Write buffer used only during gap at init time when client-side // stream is set up but server side stream is not yet set up grpc_metadata_batch write_buffer_initial_md; bool write_buffer_initial_md_filled; uint32_t write_buffer_initial_md_flags; grpc_millis write_buffer_deadline; - slice_buffer_list write_buffer_message; grpc_metadata_batch write_buffer_trailing_md; bool write_buffer_trailing_md_filled; grpc_error *write_buffer_cancel_error; @@ -164,11 +90,15 @@ typedef struct inproc_stream { gpr_arena *arena; + grpc_transport_stream_op_batch *send_message_op; + grpc_transport_stream_op_batch *send_trailing_md_op; grpc_transport_stream_op_batch *recv_initial_md_op; grpc_transport_stream_op_batch *recv_message_op; grpc_transport_stream_op_batch *recv_trailing_md_op; - inproc_slice_byte_stream recv_message_stream; + grpc_slice_buffer recv_message; + grpc_slice_buffer_stream recv_stream; + bool recv_inited; bool initial_md_sent; bool trailing_md_sent; @@ -187,54 +117,11 @@ typedef struct inproc_stream { struct inproc_stream *stream_list_next; } inproc_stream; -static bool inproc_slice_byte_stream_next(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *bs, size_t max, - grpc_closure *on_complete) { - // Because inproc transport always provides the entire message atomically, - // the byte stream always has data available when this function is called. - // Thus, this function always returns true (unlike other transports) and - // there is never any need to schedule a closure - return true; -} - -static grpc_error *inproc_slice_byte_stream_pull(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *bs, - grpc_slice *slice) { - inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs; - if (stream->shutdown_error != GRPC_ERROR_NONE) { - return GRPC_ERROR_REF(stream->shutdown_error); - } - *slice = grpc_slice_buffer_take_first(&stream->le->sb); - return GRPC_ERROR_NONE; -} - -static void inproc_slice_byte_stream_shutdown(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *bs, - grpc_error *error) { - inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs; - GRPC_ERROR_UNREF(stream->shutdown_error); - stream->shutdown_error = error; -} - -static void inproc_slice_byte_stream_destroy(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *bs) { - inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs; - sb_list_entry_destroy(exec_ctx, stream->le); - GRPC_ERROR_UNREF(stream->shutdown_error); -} - -static const grpc_byte_stream_vtable inproc_slice_byte_stream_vtable = { - inproc_slice_byte_stream_next, inproc_slice_byte_stream_pull, - inproc_slice_byte_stream_shutdown, inproc_slice_byte_stream_destroy}; - -void inproc_slice_byte_stream_init(inproc_slice_byte_stream *s, - sb_list_entry *le) { - s->base.length = (uint32_t)le->sb.length; - s->base.flags = 0; - s->base.vtable = &inproc_slice_byte_stream_vtable; - s->le = le; - s->shutdown_error = GRPC_ERROR_NONE; -} +static grpc_closure do_nothing_closure; +static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, + grpc_error *error); +static void op_state_machine(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); static void ref_transport(inproc_transport *t) { INPROC_LOG(GPR_DEBUG, "ref_transport %p", t); @@ -280,12 +167,14 @@ static void unref_stream(grpc_exec_ctx *exec_ctx, inproc_stream *s, static void really_destroy_stream(grpc_exec_ctx *exec_ctx, inproc_stream *s) { INPROC_LOG(GPR_DEBUG, "really_destroy_stream %p", s); - slice_buffer_list_destroy(exec_ctx, &s->to_read_message); - slice_buffer_list_destroy(exec_ctx, &s->write_buffer_message); GRPC_ERROR_UNREF(s->write_buffer_cancel_error); GRPC_ERROR_UNREF(s->cancel_self_error); GRPC_ERROR_UNREF(s->cancel_other_error); + if (s->recv_inited) { + grpc_slice_buffer_destroy_internal(exec_ctx, &s->recv_message); + } + unref_transport(exec_ctx, s->t); if (s->closure_at_destroy) { @@ -293,9 +182,6 @@ static void really_destroy_stream(grpc_exec_ctx *exec_ctx, inproc_stream *s) { } } -static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error); - static void log_metadata(const grpc_metadata_batch *md_batch, bool is_client, bool is_initial) { for (grpc_linked_mdelem *md = md_batch->list.head; md != NULL; @@ -359,11 +245,9 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, s->write_buffer_initial_md_filled = false; grpc_metadata_batch_init(&s->write_buffer_trailing_md); s->write_buffer_trailing_md_filled = false; - slice_buffer_list_init(&s->to_read_message); - slice_buffer_list_init(&s->write_buffer_message); - s->reads_needed = false; - s->read_closure_scheduled = false; - GRPC_CLOSURE_INIT(&s->read_closure, read_state_machine, s, + s->ops_needed = false; + s->op_closure_scheduled = false; + GRPC_CLOSURE_INIT(&s->op_closure, op_state_machine, s, grpc_schedule_on_exec_ctx); s->t = t; s->closure_at_destroy = NULL; @@ -425,11 +309,6 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_metadata_batch_clear(exec_ctx, &cs->write_buffer_initial_md); cs->write_buffer_initial_md_filled = false; } - while (!slice_buffer_list_empty(&cs->write_buffer_message)) { - slice_buffer_list_append_entry( - &s->to_read_message, - slice_buffer_list_pophead(&cs->write_buffer_message)); - } if (cs->write_buffer_trailing_md_filled) { fill_in_metadata(exec_ctx, s, &cs->write_buffer_trailing_md, 0, &s->to_read_trailing_md, NULL, @@ -488,9 +367,39 @@ static void close_other_side_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, } } +// Call the on_complete closure associated with this stream_op_batch if +// this stream_op_batch is only one of the pending operations for this +// stream. This is called when one of the pending operations for the stream +// is done and about to be NULLed out +static void complete_if_batch_end_locked(grpc_exec_ctx *exec_ctx, + inproc_stream *s, grpc_error *error, + grpc_transport_stream_op_batch *op, + const char *msg) { + int is_sm = (int)(op == s->send_message_op); + int is_stm = (int)(op == s->send_trailing_md_op); + int is_rim = (int)(op == s->recv_initial_md_op); + int is_rm = (int)(op == s->recv_message_op); + int is_rtm = (int)(op == s->recv_trailing_md_op); + + if ((is_sm + is_stm + is_rim + is_rm + is_rtm) == 1) { + INPROC_LOG(GPR_DEBUG, "%s %p %p %p", msg, s, op, error); + GRPC_CLOSURE_SCHED(exec_ctx, op->on_complete, GRPC_ERROR_REF(error)); + } +} + +static void maybe_schedule_op_closure_locked(grpc_exec_ctx *exec_ctx, + inproc_stream *s, + grpc_error *error) { + if (s && s->ops_needed && !s->op_closure_scheduled) { + GRPC_CLOSURE_SCHED(exec_ctx, &s->op_closure, GRPC_ERROR_REF(error)); + s->op_closure_scheduled = true; + s->ops_needed = false; + } +} + static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, grpc_error *error) { - INPROC_LOG(GPR_DEBUG, "read_state_machine %p fail_helper", s); + INPROC_LOG(GPR_DEBUG, "op_state_machine %p fail_helper", s); // If we're failing this side, we need to make sure that // we also send or have already sent trailing metadata if (!s->trailing_md_sent) { @@ -512,14 +421,7 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, if (other->cancel_other_error == GRPC_ERROR_NONE) { other->cancel_other_error = GRPC_ERROR_REF(error); } - if (other->reads_needed) { - if (!other->read_closure_scheduled) { - GRPC_CLOSURE_SCHED(exec_ctx, &other->read_closure, - GRPC_ERROR_REF(error)); - other->read_closure_scheduled = true; - } - other->reads_needed = false; - } + maybe_schedule_op_closure_locked(exec_ctx, other, error); } else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) { s->write_buffer_cancel_error = GRPC_ERROR_REF(error); } @@ -564,14 +466,9 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, err); // Last use of err so no need to REF and then UNREF it - if ((s->recv_initial_md_op != s->recv_message_op) && - (s->recv_initial_md_op != s->recv_trailing_md_op)) { - INPROC_LOG(GPR_DEBUG, - "fail_helper %p scheduling initial-metadata-on-complete %p", - error, s); - GRPC_CLOSURE_SCHED(exec_ctx, s->recv_initial_md_op->on_complete, - GRPC_ERROR_REF(error)); - } + complete_if_batch_end_locked( + exec_ctx, s, error, s->recv_initial_md_op, + "fail_helper scheduling recv-initial-metadata-on-complete"); s->recv_initial_md_op = NULL; } if (s->recv_message_op) { @@ -580,20 +477,30 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, GRPC_CLOSURE_SCHED( exec_ctx, s->recv_message_op->payload->recv_message.recv_message_ready, GRPC_ERROR_REF(error)); - if (s->recv_message_op != s->recv_trailing_md_op) { - INPROC_LOG(GPR_DEBUG, "fail_helper %p scheduling message-on-complete %p", - s, error); - GRPC_CLOSURE_SCHED(exec_ctx, s->recv_message_op->on_complete, - GRPC_ERROR_REF(error)); - } + complete_if_batch_end_locked( + exec_ctx, s, error, s->recv_message_op, + "fail_helper scheduling recv-message-on-complete"); s->recv_message_op = NULL; } + if (s->send_message_op) { + complete_if_batch_end_locked( + exec_ctx, s, error, s->send_message_op, + "fail_helper scheduling send-message-on-complete"); + s->send_message_op = NULL; + } + if (s->send_trailing_md_op) { + complete_if_batch_end_locked( + exec_ctx, s, error, s->send_trailing_md_op, + "fail_helper scheduling send-trailng-md-on-complete"); + s->send_trailing_md_op = NULL; + } if (s->recv_trailing_md_op) { INPROC_LOG(GPR_DEBUG, "fail_helper %p scheduling trailing-md-on-complete %p", s, error); - GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete, - GRPC_ERROR_REF(error)); + complete_if_batch_end_locked( + exec_ctx, s, error, s->recv_trailing_md_op, + "fail_helper scheduling recv-trailing-metadata-on-complete"); s->recv_trailing_md_op = NULL; } close_other_side_locked(exec_ctx, s, "fail_helper:other_side"); @@ -602,12 +509,61 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, GRPC_ERROR_UNREF(error); } -static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void message_transfer_locked(grpc_exec_ctx *exec_ctx, + inproc_stream *sender, + inproc_stream *receiver) { + size_t remaining = + sender->send_message_op->payload->send_message.send_message->length; + if (receiver->recv_inited) { + grpc_slice_buffer_destroy_internal(exec_ctx, &receiver->recv_message); + } + grpc_slice_buffer_init(&receiver->recv_message); + receiver->recv_inited = true; + do { + grpc_slice message_slice; + grpc_closure unused; + GPR_ASSERT(grpc_byte_stream_next( + exec_ctx, sender->send_message_op->payload->send_message.send_message, + SIZE_MAX, &unused)); + grpc_error *error = grpc_byte_stream_pull( + exec_ctx, sender->send_message_op->payload->send_message.send_message, + &message_slice); + if (error != GRPC_ERROR_NONE) { + cancel_stream_locked(exec_ctx, sender, GRPC_ERROR_REF(error)); + break; + } + GPR_ASSERT(error == GRPC_ERROR_NONE); + remaining -= GRPC_SLICE_LENGTH(message_slice); + grpc_slice_buffer_add(&receiver->recv_message, message_slice); + } while (remaining > 0); + + grpc_slice_buffer_stream_init(&receiver->recv_stream, &receiver->recv_message, + 0); + *receiver->recv_message_op->payload->recv_message.recv_message = + &receiver->recv_stream.base; + INPROC_LOG(GPR_DEBUG, "message_transfer_locked %p scheduling message-ready", + receiver); + GRPC_CLOSURE_SCHED( + exec_ctx, + receiver->recv_message_op->payload->recv_message.recv_message_ready, + GRPC_ERROR_NONE); + complete_if_batch_end_locked( + exec_ctx, sender, GRPC_ERROR_NONE, sender->send_message_op, + "message_transfer scheduling sender on_complete"); + complete_if_batch_end_locked( + exec_ctx, receiver, GRPC_ERROR_NONE, receiver->recv_message_op, + "message_transfer scheduling receiver on_complete"); + + receiver->recv_message_op = NULL; + sender->send_message_op = NULL; +} + +static void op_state_machine(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { // This function gets called when we have contents in the unprocessed reads // Get what we want based on our ops wanted // Schedule our appropriate closures - // and then return to reads_needed state if still needed + // and then return to ops_needed state if still needed // Since this is a closure directly invoked by the combiner, it should not // unref the error parameter explicitly; the combiner will do that implicitly @@ -615,12 +571,14 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg, bool needs_close = false; - INPROC_LOG(GPR_DEBUG, "read_state_machine %p", arg); + INPROC_LOG(GPR_DEBUG, "op_state_machine %p", arg); inproc_stream *s = (inproc_stream *)arg; gpr_mu *mu = &s->t->mu->mu; // keep aside in case s gets closed gpr_mu_lock(mu); - s->read_closure_scheduled = false; + s->op_closure_scheduled = false; // cancellation takes precedence + inproc_stream *other = s->other_side; + if (s->cancel_self_error != GRPC_ERROR_NONE) { fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(s->cancel_self_error)); goto done; @@ -632,89 +590,116 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg, goto done; } - if (s->recv_initial_md_op) { - if (!s->to_read_initial_md_filled) { - // We entered the state machine on some other kind of read even though - // we still haven't satisfied initial md . That's an error. - new_err = - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Unexpected frame sequencing"); - INPROC_LOG(GPR_DEBUG, - "read_state_machine %p scheduling on_complete errors for no " - "initial md %p", - s, new_err); + if (s->send_message_op && other) { + if (other->recv_message_op) { + message_transfer_locked(exec_ctx, s, other); + maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE); + } else if (!s->t->is_client && + (s->trailing_md_sent || other->recv_trailing_md_op)) { + // A server send will never be matched if the client is waiting + // for trailing metadata already + complete_if_batch_end_locked( + exec_ctx, s, GRPC_ERROR_NONE, s->send_message_op, + "op_state_machine scheduling send-message-on-complete"); + s->send_message_op = NULL; + } + } + // Pause a send trailing metadata if there is still an outstanding + // send message unless we know that the send message will never get + // matched to a receive. This happens on the client if the server has + // already sent status. + if (s->send_trailing_md_op && + (!s->send_message_op || + (s->t->is_client && + (s->trailing_md_recvd || s->to_read_trailing_md_filled)))) { + grpc_metadata_batch *dest = (other == NULL) ? &s->write_buffer_trailing_md + : &other->to_read_trailing_md; + bool *destfilled = (other == NULL) ? &s->write_buffer_trailing_md_filled + : &other->to_read_trailing_md_filled; + if (*destfilled || s->trailing_md_sent) { + // The buffer is already in use; that's an error! + INPROC_LOG(GPR_DEBUG, "Extra trailing metadata %p", s); + new_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra trailing metadata"); fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err)); goto done; - } else if (s->initial_md_recvd) { + } else { + if (other && !other->closed) { + fill_in_metadata(exec_ctx, s, + s->send_trailing_md_op->payload->send_trailing_metadata + .send_trailing_metadata, + 0, dest, NULL, destfilled); + } + s->trailing_md_sent = true; + if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) { + INPROC_LOG(GPR_DEBUG, + "op_state_machine %p scheduling trailing-md-on-complete", s); + GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete, + GRPC_ERROR_NONE); + s->recv_trailing_md_op = NULL; + needs_close = true; + } + } + maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE); + complete_if_batch_end_locked( + exec_ctx, s, GRPC_ERROR_NONE, s->send_trailing_md_op, + "op_state_machine scheduling send-trailing-metadata-on-complete"); + s->send_trailing_md_op = NULL; + } + if (s->recv_initial_md_op) { + if (s->initial_md_recvd) { new_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already recvd initial md"); INPROC_LOG( GPR_DEBUG, - "read_state_machine %p scheduling on_complete errors for already " + "op_state_machine %p scheduling on_complete errors for already " "recvd initial md %p", s, new_err); fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err)); goto done; } - s->initial_md_recvd = true; - new_err = fill_in_metadata( - exec_ctx, s, &s->to_read_initial_md, s->to_read_initial_md_flags, - s->recv_initial_md_op->payload->recv_initial_metadata - .recv_initial_metadata, - s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags, NULL); - s->recv_initial_md_op->payload->recv_initial_metadata.recv_initial_metadata - ->deadline = s->deadline; - grpc_metadata_batch_clear(exec_ctx, &s->to_read_initial_md); - s->to_read_initial_md_filled = false; - INPROC_LOG(GPR_DEBUG, - "read_state_machine %p scheduling initial-metadata-ready %p", s, - new_err); - GRPC_CLOSURE_SCHED(exec_ctx, - s->recv_initial_md_op->payload->recv_initial_metadata - .recv_initial_metadata_ready, - GRPC_ERROR_REF(new_err)); - if ((s->recv_initial_md_op != s->recv_message_op) && - (s->recv_initial_md_op != s->recv_trailing_md_op)) { - INPROC_LOG( - GPR_DEBUG, - "read_state_machine %p scheduling initial-metadata-on-complete %p", s, - new_err); - GRPC_CLOSURE_SCHED(exec_ctx, s->recv_initial_md_op->on_complete, - GRPC_ERROR_REF(new_err)); - } - s->recv_initial_md_op = NULL; - - if (new_err != GRPC_ERROR_NONE) { + if (s->to_read_initial_md_filled) { + s->initial_md_recvd = true; + new_err = fill_in_metadata( + exec_ctx, s, &s->to_read_initial_md, s->to_read_initial_md_flags, + s->recv_initial_md_op->payload->recv_initial_metadata + .recv_initial_metadata, + s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags, + NULL); + s->recv_initial_md_op->payload->recv_initial_metadata + .recv_initial_metadata->deadline = s->deadline; + grpc_metadata_batch_clear(exec_ctx, &s->to_read_initial_md); + s->to_read_initial_md_filled = false; INPROC_LOG(GPR_DEBUG, - "read_state_machine %p scheduling on_complete errors2 %p", s, + "op_state_machine %p scheduling initial-metadata-ready %p", s, new_err); - fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err)); - goto done; + GRPC_CLOSURE_SCHED(exec_ctx, + s->recv_initial_md_op->payload->recv_initial_metadata + .recv_initial_metadata_ready, + GRPC_ERROR_REF(new_err)); + complete_if_batch_end_locked( + exec_ctx, s, new_err, s->recv_initial_md_op, + "op_state_machine scheduling recv-initial-metadata-on-complete"); + s->recv_initial_md_op = NULL; + + if (new_err != GRPC_ERROR_NONE) { + INPROC_LOG(GPR_DEBUG, + "op_state_machine %p scheduling on_complete errors2 %p", s, + new_err); + fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err)); + goto done; + } } } - if (s->to_read_initial_md_filled) { - new_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Unexpected recv frame"); - fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err)); - goto done; - } - if (!slice_buffer_list_empty(&s->to_read_message) && s->recv_message_op) { - inproc_slice_byte_stream_init( - &s->recv_message_stream, - slice_buffer_list_pophead(&s->to_read_message)); - *s->recv_message_op->payload->recv_message.recv_message = - &s->recv_message_stream.base; - INPROC_LOG(GPR_DEBUG, "read_state_machine %p scheduling message-ready", s); - GRPC_CLOSURE_SCHED( - exec_ctx, s->recv_message_op->payload->recv_message.recv_message_ready, - GRPC_ERROR_NONE); - if (s->recv_message_op != s->recv_trailing_md_op) { - INPROC_LOG(GPR_DEBUG, - "read_state_machine %p scheduling message-on-complete %p", s, - new_err); - GRPC_CLOSURE_SCHED(exec_ctx, s->recv_message_op->on_complete, - GRPC_ERROR_REF(new_err)); + if (s->recv_message_op) { + if (other && other->send_message_op) { + message_transfer_locked(exec_ctx, other, s); + maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE); } - s->recv_message_op = NULL; + } + if (s->recv_trailing_md_op && s->t->is_client && other && + other->send_message_op) { + maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE); } if (s->to_read_trailing_md_filled) { if (s->trailing_md_recvd) { @@ -722,7 +707,7 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already recvd trailing md"); INPROC_LOG( GPR_DEBUG, - "read_state_machine %p scheduling on_complete errors for already " + "op_state_machine %p scheduling on_complete errors for already " "recvd trailing md %p", s, new_err); fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err)); @@ -731,21 +716,24 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg, if (s->recv_message_op != NULL) { // This message needs to be wrapped up because it will never be // satisfied - INPROC_LOG(GPR_DEBUG, "read_state_machine %p scheduling message-ready", - s); + INPROC_LOG(GPR_DEBUG, "op_state_machine %p scheduling message-ready", s); GRPC_CLOSURE_SCHED( exec_ctx, s->recv_message_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); - if (s->recv_message_op != s->recv_trailing_md_op) { - INPROC_LOG(GPR_DEBUG, - "read_state_machine %p scheduling message-on-complete %p", s, - new_err); - GRPC_CLOSURE_SCHED(exec_ctx, s->recv_message_op->on_complete, - GRPC_ERROR_REF(new_err)); - } + complete_if_batch_end_locked( + exec_ctx, s, new_err, s->recv_message_op, + "op_state_machine scheduling recv-message-on-complete"); s->recv_message_op = NULL; } + if ((s->trailing_md_sent || s->t->is_client) && s->send_message_op) { + // Nothing further will try to receive from this stream, so finish off + // any outstanding send_message op + complete_if_batch_end_locked( + exec_ctx, s, new_err, s->send_message_op, + "op_state_machine scheduling send-message-on-complete"); + s->send_message_op = NULL; + } if (s->recv_trailing_md_op != NULL) { // We wanted trailing metadata and we got it s->trailing_md_recvd = true; @@ -763,61 +751,65 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg, // (If the server hasn't already sent its trailing md, it doesn't have // a final status, so don't mark this op complete) if (s->t->is_client || s->trailing_md_sent) { - INPROC_LOG( - GPR_DEBUG, - "read_state_machine %p scheduling trailing-md-on-complete %p", s, - new_err); + INPROC_LOG(GPR_DEBUG, + "op_state_machine %p scheduling trailing-md-on-complete %p", + s, new_err); GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete, GRPC_ERROR_REF(new_err)); s->recv_trailing_md_op = NULL; needs_close = true; } else { INPROC_LOG(GPR_DEBUG, - "read_state_machine %p server needs to delay handling " + "op_state_machine %p server needs to delay handling " "trailing-md-on-complete %p", s, new_err); } } else { INPROC_LOG( GPR_DEBUG, - "read_state_machine %p has trailing md but not yet waiting for it", - s); + "op_state_machine %p has trailing md but not yet waiting for it", s); } } if (s->trailing_md_recvd && s->recv_message_op) { // No further message will come on this stream, so finish off the // recv_message_op - INPROC_LOG(GPR_DEBUG, "read_state_machine %p scheduling message-ready", s); + INPROC_LOG(GPR_DEBUG, "op_state_machine %p scheduling message-ready", s); GRPC_CLOSURE_SCHED( exec_ctx, s->recv_message_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); - if (s->recv_message_op != s->recv_trailing_md_op) { - INPROC_LOG(GPR_DEBUG, - "read_state_machine %p scheduling message-on-complete %p", s, - new_err); - GRPC_CLOSURE_SCHED(exec_ctx, s->recv_message_op->on_complete, - GRPC_ERROR_REF(new_err)); - } + complete_if_batch_end_locked( + exec_ctx, s, new_err, s->recv_message_op, + "op_state_machine scheduling recv-message-on-complete"); s->recv_message_op = NULL; } - if (s->recv_message_op || s->recv_trailing_md_op) { + if (s->trailing_md_recvd && (s->trailing_md_sent || s->t->is_client) && + s->send_message_op) { + // Nothing further will try to receive from this stream, so finish off + // any outstanding send_message op + complete_if_batch_end_locked( + exec_ctx, s, new_err, s->send_message_op, + "op_state_machine scheduling send-message-on-complete"); + s->send_message_op = NULL; + } + if (s->send_message_op || s->send_trailing_md_op || s->recv_initial_md_op || + s->recv_message_op || s->recv_trailing_md_op) { // Didn't get the item we wanted so we still need to get // rescheduled - INPROC_LOG(GPR_DEBUG, "read_state_machine %p still needs closure %p %p", s, - s->recv_message_op, s->recv_trailing_md_op); - s->reads_needed = true; + INPROC_LOG( + GPR_DEBUG, "op_state_machine %p still needs closure %p %p %p %p %p", s, + s->send_message_op, s->send_trailing_md_op, s->recv_initial_md_op, + s->recv_message_op, s->recv_trailing_md_op); + s->ops_needed = true; } done: if (needs_close) { - close_other_side_locked(exec_ctx, s, "read_state_machine"); + close_other_side_locked(exec_ctx, s, "op_state_machine"); close_stream_locked(exec_ctx, s); } gpr_mu_unlock(mu); GRPC_ERROR_UNREF(new_err); } -static grpc_closure do_nothing_closure; - static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, grpc_error *error) { bool ret = false; // was the cancel accepted @@ -826,14 +818,7 @@ static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, if (s->cancel_self_error == GRPC_ERROR_NONE) { ret = true; s->cancel_self_error = GRPC_ERROR_REF(error); - if (s->reads_needed) { - if (!s->read_closure_scheduled) { - GRPC_CLOSURE_SCHED(exec_ctx, &s->read_closure, - GRPC_ERROR_REF(s->cancel_self_error)); - s->read_closure_scheduled = true; - } - s->reads_needed = false; - } + maybe_schedule_op_closure_locked(exec_ctx, s, s->cancel_self_error); // Send trailing md to the other side indicating cancellation, even if we // already have s->trailing_md_sent = true; @@ -853,14 +838,8 @@ static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, if (other->cancel_other_error == GRPC_ERROR_NONE) { other->cancel_other_error = GRPC_ERROR_REF(s->cancel_self_error); } - if (other->reads_needed) { - if (!other->read_closure_scheduled) { - GRPC_CLOSURE_SCHED(exec_ctx, &other->read_closure, - GRPC_ERROR_REF(other->cancel_other_error)); - other->read_closure_scheduled = true; - } - other->reads_needed = false; - } + maybe_schedule_op_closure_locked(exec_ctx, other, + other->cancel_other_error); } else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) { s->write_buffer_cancel_error = GRPC_ERROR_REF(s->cancel_self_error); } @@ -869,11 +848,9 @@ static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, // couldn't complete that because we hadn't yet sent out trailing // md, now's the chance if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) { - INPROC_LOG(GPR_DEBUG, - "cancel_stream %p scheduling trailing-md-on-complete %p", s, - s->cancel_self_error); - GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete, - GRPC_ERROR_REF(s->cancel_self_error)); + complete_if_batch_end_locked( + exec_ctx, s, s->cancel_self_error, s->recv_trailing_md_op, + "cancel_stream scheduling trailing-md-on-complete"); s->recv_trailing_md_op = NULL; } } @@ -918,7 +895,8 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, // already self-canceled so still give it an error error = GRPC_ERROR_REF(s->cancel_self_error); } else { - INPROC_LOG(GPR_DEBUG, "perform_stream_op %p%s%s%s%s%s%s", s, + INPROC_LOG(GPR_DEBUG, "perform_stream_op %p %s%s%s%s%s%s%s", s, + s->t->is_client ? "client" : "server", op->send_initial_metadata ? " send_initial_metadata" : "", op->send_message ? " send_message" : "", op->send_trailing_metadata ? " send_trailing_metadata" : "", @@ -929,10 +907,9 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, bool needs_close = false; + inproc_stream *other = s->other_side; if (error == GRPC_ERROR_NONE && - (op->send_initial_metadata || op->send_message || - op->send_trailing_metadata)) { - inproc_stream *other = s->other_side; + (op->send_initial_metadata || op->send_trailing_metadata)) { if (s->t->is_closed) { error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Endpoint already shutdown"); } @@ -963,72 +940,21 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, s->initial_md_sent = true; } } - } - if (error == GRPC_ERROR_NONE && op->send_message) { - size_t remaining = op->payload->send_message.send_message->length; - grpc_slice_buffer *dest = slice_buffer_list_append( - (other == NULL) ? &s->write_buffer_message : &other->to_read_message); - do { - grpc_slice message_slice; - grpc_closure unused; - GPR_ASSERT(grpc_byte_stream_next(exec_ctx, - op->payload->send_message.send_message, - SIZE_MAX, &unused)); - error = grpc_byte_stream_pull( - exec_ctx, op->payload->send_message.send_message, &message_slice); - if (error != GRPC_ERROR_NONE) { - cancel_stream_locked(exec_ctx, s, GRPC_ERROR_REF(error)); - break; - } - GPR_ASSERT(error == GRPC_ERROR_NONE); - remaining -= GRPC_SLICE_LENGTH(message_slice); - grpc_slice_buffer_add(dest, message_slice); - } while (remaining != 0); - grpc_byte_stream_destroy(exec_ctx, - op->payload->send_message.send_message); - } - if (error == GRPC_ERROR_NONE && op->send_trailing_metadata) { - grpc_metadata_batch *dest = (other == NULL) ? &s->write_buffer_trailing_md - : &other->to_read_trailing_md; - bool *destfilled = (other == NULL) ? &s->write_buffer_trailing_md_filled - : &other->to_read_trailing_md_filled; - if (*destfilled || s->trailing_md_sent) { - // The buffer is already in use; that's an error! - INPROC_LOG(GPR_DEBUG, "Extra trailing metadata %p", s); - error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra trailing metadata"); - } else { - if (!other->closed) { - fill_in_metadata( - exec_ctx, s, - op->payload->send_trailing_metadata.send_trailing_metadata, 0, - dest, NULL, destfilled); - } - s->trailing_md_sent = true; - if (!s->t->is_client && s->trailing_md_recvd && - s->recv_trailing_md_op) { - INPROC_LOG(GPR_DEBUG, - "perform_stream_op %p scheduling trailing-md-on-complete", - s); - GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete, - GRPC_ERROR_NONE); - s->recv_trailing_md_op = NULL; - needs_close = true; - } - } - } - if (other != NULL && other->reads_needed) { - if (!other->read_closure_scheduled) { - GRPC_CLOSURE_SCHED(exec_ctx, &other->read_closure, error); - other->read_closure_scheduled = true; - } - other->reads_needed = false; + maybe_schedule_op_closure_locked(exec_ctx, other, error); } } + if (error == GRPC_ERROR_NONE && - (op->recv_initial_metadata || op->recv_message || + (op->send_message || op->send_trailing_metadata || + op->recv_initial_metadata || op->recv_message || op->recv_trailing_metadata)) { - // If there are any reads, mark it so that the read closure will react to - // them + // Mark ops that need to be processed by the closure + if (op->send_message) { + s->send_message_op = op; + } + if (op->send_trailing_metadata) { + s->send_trailing_md_op = op; + } if (op->recv_initial_metadata) { s->recv_initial_md_op = op; } @@ -1040,25 +966,28 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, } // We want to initiate the closure if: - // 1. There is initial metadata and something ready to take that - // 2. There is a message and something ready to take it - // 3. There is trailing metadata, even if nothing specifically wants - // that because that can shut down the message as well - if ((s->to_read_initial_md_filled && op->recv_initial_metadata) || - ((!slice_buffer_list_empty(&s->to_read_message) || - s->trailing_md_recvd) && - op->recv_message) || - (s->to_read_trailing_md_filled)) { - if (!s->read_closure_scheduled) { - GRPC_CLOSURE_SCHED(exec_ctx, &s->read_closure, GRPC_ERROR_NONE); - s->read_closure_scheduled = true; + // 1. We want to send a message and the other side wants to receive or end + // 2. We want to send trailing metadata and there isn't an unmatched send + // 3. We want initial metadata and the other side has sent it + // 4. We want to receive a message and there is a message ready + // 5. There is trailing metadata, even if nothing specifically wants + // that because that can shut down the receive message as well + if ((op->send_message && other && ((other->recv_message_op != NULL) || + (other->recv_trailing_md_op != NULL))) || + (op->send_trailing_metadata && !op->send_message) || + (op->recv_initial_metadata && s->to_read_initial_md_filled) || + (op->recv_message && (other && other->send_message_op != NULL)) || + (s->to_read_trailing_md_filled || s->trailing_md_recvd)) { + if (!s->op_closure_scheduled) { + GRPC_CLOSURE_SCHED(exec_ctx, &s->op_closure, GRPC_ERROR_NONE); + s->op_closure_scheduled = true; } } else { - s->reads_needed = true; + s->ops_needed = true; } } else { if (error != GRPC_ERROR_NONE) { - // Schedule op's read closures that we didn't push to read state machine + // Schedule op's closures that we didn't push to op state machine if (op->recv_initial_metadata) { INPROC_LOG( GPR_DEBUG, -- cgit v1.2.3