diff options
Diffstat (limited to 'src/core/ext/transport/inproc/inproc_transport.cc')
-rw-r--r-- | src/core/ext/transport/inproc/inproc_transport.cc | 52 |
1 files changed, 46 insertions, 6 deletions
diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc index 6e5aa5a46b..ac83c7c64d 100644 --- a/src/core/ext/transport/inproc/inproc_transport.cc +++ b/src/core/ext/transport/inproc/inproc_transport.cc @@ -120,7 +120,6 @@ typedef struct inproc_stream { struct inproc_stream* stream_list_next; } inproc_stream; -static grpc_closure do_nothing_closure; static bool cancel_stream_locked(inproc_stream* s, grpc_error* error); static void op_state_machine(void* arg, grpc_error* error); @@ -373,6 +372,10 @@ static void complete_if_batch_end_locked(inproc_stream* s, grpc_error* error, const char* msg) { int is_sm = static_cast<int>(op == s->send_message_op); int is_stm = static_cast<int>(op == s->send_trailing_md_op); + // TODO(vjpai): We should not consider the recv ops here, since they + // have their own callbacks. We should invoke a batch's on_complete + // as soon as all of the batch's send ops are complete, even if there + // are still recv ops pending. int is_rim = static_cast<int>(op == s->recv_initial_md_op); int is_rm = static_cast<int>(op == s->recv_message_op); int is_rtm = static_cast<int>(op == s->recv_trailing_md_op); @@ -496,6 +499,11 @@ static void fail_helper_locked(inproc_stream* s, grpc_error* error) { s->send_trailing_md_op = nullptr; } if (s->recv_trailing_md_op) { + INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-metadata-ready %p", + s, error); + GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata + .recv_trailing_metadata_ready, + GRPC_ERROR_REF(error)); INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-md-on-complete %p", s, error); complete_if_batch_end_locked( @@ -639,6 +647,12 @@ static void op_state_machine(void* arg, grpc_error* error) { s->trailing_md_sent = true; if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) { INPROC_LOG(GPR_INFO, + "op_state_machine %p scheduling trailing-metadata-ready", s); + GRPC_CLOSURE_SCHED( + s->recv_trailing_md_op->payload->recv_trailing_metadata + .recv_trailing_metadata_ready, + GRPC_ERROR_NONE); + INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling trailing-md-on-complete", s); GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->on_complete, GRPC_ERROR_NONE); @@ -711,6 +725,12 @@ static void op_state_machine(void* arg, grpc_error* error) { } if (s->recv_trailing_md_op && s->t->is_client && other && other->send_message_op) { + INPROC_LOG(GPR_INFO, + "op_state_machine %p scheduling trailing-metadata-ready %p", s, + GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata + .recv_trailing_metadata_ready, + GRPC_ERROR_NONE); maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE); } if (s->to_read_trailing_md_filled) { @@ -766,6 +786,10 @@ static void op_state_machine(void* arg, grpc_error* error) { INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling trailing-md-on-complete %p", s, new_err); + GRPC_CLOSURE_SCHED( + s->recv_trailing_md_op->payload->recv_trailing_metadata + .recv_trailing_metadata_ready, + GRPC_ERROR_REF(new_err)); GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->on_complete, GRPC_ERROR_REF(new_err)); s->recv_trailing_md_op = nullptr; @@ -859,6 +883,9 @@ static bool cancel_stream_locked(inproc_stream* s, grpc_error* error) { // couldn't complete that because we hadn't yet sent out trailing // md, now's the chance if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) { + GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata + .recv_trailing_metadata_ready, + GRPC_ERROR_REF(s->cancel_self_error)); complete_if_batch_end_locked( s, s->cancel_self_error, s->recv_trailing_md_op, "cancel_stream scheduling trailing-md-on-complete"); @@ -873,6 +900,8 @@ static bool cancel_stream_locked(inproc_stream* s, grpc_error* error) { return ret; } +static void do_nothing(void* arg, grpc_error* error) {} + static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, grpc_transport_stream_op_batch* op) { INPROC_LOG(GPR_INFO, "perform_stream_op %p %p %p", gt, gs, op); @@ -892,8 +921,14 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, } grpc_error* error = GRPC_ERROR_NONE; grpc_closure* on_complete = op->on_complete; + // TODO(roth): This is a hack needed because we use data inside of the + // closure itself to do the barrier calculation (i.e., to ensure that + // we don't schedule the closure until all ops in the batch have been + // completed). This can go away once we move to a new C++ closure API + // that provides the ability to create a barrier closure. if (on_complete == nullptr) { - on_complete = &do_nothing_closure; + on_complete = GRPC_CLOSURE_INIT(&op->handler_private.closure, do_nothing, + nullptr, grpc_schedule_on_exec_ctx); } if (op->cancel_stream) { @@ -1026,6 +1061,15 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, GRPC_CLOSURE_SCHED(op->payload->recv_message.recv_message_ready, GRPC_ERROR_REF(error)); } + if (op->recv_trailing_metadata) { + INPROC_LOG( + GPR_INFO, + "perform_stream_op error %p scheduling trailing-metadata-ready %p", + s, error); + GRPC_CLOSURE_SCHED( + op->payload->recv_trailing_metadata.recv_trailing_metadata_ready, + GRPC_ERROR_REF(error)); + } } INPROC_LOG(GPR_INFO, "perform_stream_op %p scheduling on_complete %p", s, error); @@ -1129,12 +1173,8 @@ static grpc_endpoint* get_endpoint(grpc_transport* t) { return nullptr; } /******************************************************************************* * GLOBAL INIT AND DESTROY */ -static void do_nothing(void* arg, grpc_error* error) {} - void grpc_inproc_transport_init(void) { grpc_core::ExecCtx exec_ctx; - GRPC_CLOSURE_INIT(&do_nothing_closure, do_nothing, nullptr, - grpc_schedule_on_exec_ctx); g_empty_slice = grpc_slice_from_static_buffer(nullptr, 0); grpc_slice key_tmp = grpc_slice_from_static_string(":path"); |