aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/transport/inproc/inproc_transport.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/transport/inproc/inproc_transport.cc')
-rw-r--r--src/core/ext/transport/inproc/inproc_transport.cc52
1 files changed, 46 insertions, 6 deletions
diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc
index 6e5aa5a46b..ac83c7c64d 100644
--- a/src/core/ext/transport/inproc/inproc_transport.cc
+++ b/src/core/ext/transport/inproc/inproc_transport.cc
@@ -120,7 +120,6 @@ typedef struct inproc_stream {
struct inproc_stream* stream_list_next;
} inproc_stream;
-static grpc_closure do_nothing_closure;
static bool cancel_stream_locked(inproc_stream* s, grpc_error* error);
static void op_state_machine(void* arg, grpc_error* error);
@@ -373,6 +372,10 @@ static void complete_if_batch_end_locked(inproc_stream* s, grpc_error* error,
const char* msg) {
int is_sm = static_cast<int>(op == s->send_message_op);
int is_stm = static_cast<int>(op == s->send_trailing_md_op);
+ // TODO(vjpai): We should not consider the recv ops here, since they
+ // have their own callbacks. We should invoke a batch's on_complete
+ // as soon as all of the batch's send ops are complete, even if there
+ // are still recv ops pending.
int is_rim = static_cast<int>(op == s->recv_initial_md_op);
int is_rm = static_cast<int>(op == s->recv_message_op);
int is_rtm = static_cast<int>(op == s->recv_trailing_md_op);
@@ -496,6 +499,11 @@ static void fail_helper_locked(inproc_stream* s, grpc_error* error) {
s->send_trailing_md_op = nullptr;
}
if (s->recv_trailing_md_op) {
+ INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-metadata-ready %p",
+ s, error);
+ GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata
+ .recv_trailing_metadata_ready,
+ GRPC_ERROR_REF(error));
INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-md-on-complete %p",
s, error);
complete_if_batch_end_locked(
@@ -639,6 +647,12 @@ static void op_state_machine(void* arg, grpc_error* error) {
s->trailing_md_sent = true;
if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
INPROC_LOG(GPR_INFO,
+ "op_state_machine %p scheduling trailing-metadata-ready", s);
+ GRPC_CLOSURE_SCHED(
+ s->recv_trailing_md_op->payload->recv_trailing_metadata
+ .recv_trailing_metadata_ready,
+ GRPC_ERROR_NONE);
+ INPROC_LOG(GPR_INFO,
"op_state_machine %p scheduling trailing-md-on-complete", s);
GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->on_complete,
GRPC_ERROR_NONE);
@@ -711,6 +725,12 @@ static void op_state_machine(void* arg, grpc_error* error) {
}
if (s->recv_trailing_md_op && s->t->is_client && other &&
other->send_message_op) {
+ INPROC_LOG(GPR_INFO,
+ "op_state_machine %p scheduling trailing-metadata-ready %p", s,
+ GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata
+ .recv_trailing_metadata_ready,
+ GRPC_ERROR_NONE);
maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE);
}
if (s->to_read_trailing_md_filled) {
@@ -766,6 +786,10 @@ static void op_state_machine(void* arg, grpc_error* error) {
INPROC_LOG(GPR_INFO,
"op_state_machine %p scheduling trailing-md-on-complete %p",
s, new_err);
+ GRPC_CLOSURE_SCHED(
+ s->recv_trailing_md_op->payload->recv_trailing_metadata
+ .recv_trailing_metadata_ready,
+ GRPC_ERROR_REF(new_err));
GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->on_complete,
GRPC_ERROR_REF(new_err));
s->recv_trailing_md_op = nullptr;
@@ -859,6 +883,9 @@ static bool cancel_stream_locked(inproc_stream* s, grpc_error* error) {
// couldn't complete that because we hadn't yet sent out trailing
// md, now's the chance
if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
+ GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata
+ .recv_trailing_metadata_ready,
+ GRPC_ERROR_REF(s->cancel_self_error));
complete_if_batch_end_locked(
s, s->cancel_self_error, s->recv_trailing_md_op,
"cancel_stream scheduling trailing-md-on-complete");
@@ -873,6 +900,8 @@ static bool cancel_stream_locked(inproc_stream* s, grpc_error* error) {
return ret;
}
+static void do_nothing(void* arg, grpc_error* error) {}
+
static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
grpc_transport_stream_op_batch* op) {
INPROC_LOG(GPR_INFO, "perform_stream_op %p %p %p", gt, gs, op);
@@ -892,8 +921,14 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
}
grpc_error* error = GRPC_ERROR_NONE;
grpc_closure* on_complete = op->on_complete;
+ // TODO(roth): This is a hack needed because we use data inside of the
+ // closure itself to do the barrier calculation (i.e., to ensure that
+ // we don't schedule the closure until all ops in the batch have been
+ // completed). This can go away once we move to a new C++ closure API
+ // that provides the ability to create a barrier closure.
if (on_complete == nullptr) {
- on_complete = &do_nothing_closure;
+ on_complete = GRPC_CLOSURE_INIT(&op->handler_private.closure, do_nothing,
+ nullptr, grpc_schedule_on_exec_ctx);
}
if (op->cancel_stream) {
@@ -1026,6 +1061,15 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
GRPC_CLOSURE_SCHED(op->payload->recv_message.recv_message_ready,
GRPC_ERROR_REF(error));
}
+ if (op->recv_trailing_metadata) {
+ INPROC_LOG(
+ GPR_INFO,
+ "perform_stream_op error %p scheduling trailing-metadata-ready %p",
+ s, error);
+ GRPC_CLOSURE_SCHED(
+ op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
+ GRPC_ERROR_REF(error));
+ }
}
INPROC_LOG(GPR_INFO, "perform_stream_op %p scheduling on_complete %p", s,
error);
@@ -1129,12 +1173,8 @@ static grpc_endpoint* get_endpoint(grpc_transport* t) { return nullptr; }
/*******************************************************************************
* GLOBAL INIT AND DESTROY
*/
-static void do_nothing(void* arg, grpc_error* error) {}
-
void grpc_inproc_transport_init(void) {
grpc_core::ExecCtx exec_ctx;
- GRPC_CLOSURE_INIT(&do_nothing_closure, do_nothing, nullptr,
- grpc_schedule_on_exec_ctx);
g_empty_slice = grpc_slice_from_static_buffer(nullptr, 0);
grpc_slice key_tmp = grpc_slice_from_static_string(":path");