aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/transport/inproc/inproc_transport.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/transport/inproc/inproc_transport.cc')
-rw-r--r--src/core/ext/transport/inproc/inproc_transport.cc580
1 files changed, 280 insertions, 300 deletions
diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc
index b0ca7f8207..61968de4d5 100644
--- a/src/core/ext/transport/inproc/inproc_transport.cc
+++ b/src/core/ext/transport/inproc/inproc_transport.cc
@@ -40,18 +40,68 @@
if (grpc_inproc_trace.enabled()) gpr_log(__VA_ARGS__); \
} while (0)
-static grpc_slice g_empty_slice;
-static grpc_slice g_fake_path_key;
-static grpc_slice g_fake_path_value;
-static grpc_slice g_fake_auth_key;
-static grpc_slice g_fake_auth_value;
+namespace {
+grpc_slice g_empty_slice;
+grpc_slice g_fake_path_key;
+grpc_slice g_fake_path_value;
+grpc_slice g_fake_auth_key;
+grpc_slice g_fake_auth_value;
+
+struct inproc_stream;
+bool cancel_stream_locked(inproc_stream* s, grpc_error* error);
+void op_state_machine(void* arg, grpc_error* error);
+void log_metadata(const grpc_metadata_batch* md_batch, bool is_client,
+ bool is_initial);
+grpc_error* fill_in_metadata(inproc_stream* s,
+ const grpc_metadata_batch* metadata,
+ uint32_t flags, grpc_metadata_batch* out_md,
+ uint32_t* outflags, bool* markfilled);
+
+struct shared_mu {
+ shared_mu() {
+ // Share one lock between both sides since both sides get affected
+ gpr_mu_init(&mu);
+ gpr_ref_init(&refs, 2);
+ }
-typedef struct {
gpr_mu mu;
gpr_refcount refs;
-} shared_mu;
+};
+
+struct inproc_transport {
+ inproc_transport(const grpc_transport_vtable* vtable, shared_mu* mu,
+ bool is_client)
+ : mu(mu), is_client(is_client) {
+ base.vtable = vtable;
+ // Start each side of transport with 2 refs since they each have a ref
+ // to the other
+ gpr_ref_init(&refs, 2);
+ grpc_connectivity_state_init(&connectivity, GRPC_CHANNEL_READY,
+ is_client ? "inproc_client" : "inproc_server");
+ }
+
+ ~inproc_transport() {
+ grpc_connectivity_state_destroy(&connectivity);
+ if (gpr_unref(&mu->refs)) {
+ gpr_free(mu);
+ }
+ }
+
+ void ref() {
+ INPROC_LOG(GPR_INFO, "ref_transport %p", this);
+ gpr_ref(&refs);
+ }
+
+ void unref() {
+ INPROC_LOG(GPR_INFO, "unref_transport %p", this);
+ if (!gpr_unref(&refs)) {
+ return;
+ }
+ INPROC_LOG(GPR_INFO, "really_destroy_transport %p", this);
+ this->~inproc_transport();
+ gpr_free(this);
+ }
-typedef struct inproc_transport {
grpc_transport base;
shared_mu* mu;
gpr_refcount refs;
@@ -60,128 +110,174 @@ typedef struct inproc_transport {
void (*accept_stream_cb)(void* user_data, grpc_transport* transport,
const void* server_data);
void* accept_stream_data;
- bool is_closed;
+ bool is_closed = false;
struct inproc_transport* other_side;
- struct inproc_stream* stream_list;
-} inproc_transport;
+ struct inproc_stream* stream_list = nullptr;
+};
+
+struct inproc_stream {
+ inproc_stream(inproc_transport* t, grpc_stream_refcount* refcount,
+ const void* server_data, gpr_arena* arena)
+ : t(t), refs(refcount), arena(arena) {
+ // Ref this stream right now for ctor and list.
+ ref("inproc_init_stream:init");
+ ref("inproc_init_stream:list");
+
+ grpc_metadata_batch_init(&to_read_initial_md);
+ grpc_metadata_batch_init(&to_read_trailing_md);
+ GRPC_CLOSURE_INIT(&op_closure, op_state_machine, this,
+ grpc_schedule_on_exec_ctx);
+ grpc_metadata_batch_init(&write_buffer_initial_md);
+ grpc_metadata_batch_init(&write_buffer_trailing_md);
+
+ stream_list_prev = nullptr;
+ gpr_mu_lock(&t->mu->mu);
+ stream_list_next = t->stream_list;
+ if (t->stream_list) {
+ t->stream_list->stream_list_prev = this;
+ }
+ t->stream_list = this;
+ gpr_mu_unlock(&t->mu->mu);
+
+ if (!server_data) {
+ t->ref();
+ inproc_transport* st = t->other_side;
+ st->ref();
+ other_side = nullptr; // will get filled in soon
+ // Pass the client-side stream address to the server-side for a ref
+ ref("inproc_init_stream:clt"); // ref it now on behalf of server
+ // side to avoid destruction
+ INPROC_LOG(GPR_INFO, "calling accept stream cb %p %p",
+ st->accept_stream_cb, st->accept_stream_data);
+ (*st->accept_stream_cb)(st->accept_stream_data, &st->base, (void*)this);
+ } else {
+ // This is the server-side and is being called through accept_stream_cb
+ inproc_stream* cs = (inproc_stream*)server_data;
+ other_side = cs;
+ // Ref the server-side stream on behalf of the client now
+ ref("inproc_init_stream:srv");
+
+ // Now we are about to affect the other side, so lock the transport
+ // to make sure that it doesn't get destroyed
+ gpr_mu_lock(&t->mu->mu);
+ cs->other_side = this;
+ // Now transfer from the other side's write_buffer if any to the to_read
+ // buffer
+ if (cs->write_buffer_initial_md_filled) {
+ fill_in_metadata(this, &cs->write_buffer_initial_md,
+ cs->write_buffer_initial_md_flags, &to_read_initial_md,
+ &to_read_initial_md_flags, &to_read_initial_md_filled);
+ deadline = GPR_MIN(deadline, cs->write_buffer_deadline);
+ grpc_metadata_batch_clear(&cs->write_buffer_initial_md);
+ cs->write_buffer_initial_md_filled = false;
+ }
+ if (cs->write_buffer_trailing_md_filled) {
+ fill_in_metadata(this, &cs->write_buffer_trailing_md, 0,
+ &to_read_trailing_md, nullptr,
+ &to_read_trailing_md_filled);
+ grpc_metadata_batch_clear(&cs->write_buffer_trailing_md);
+ cs->write_buffer_trailing_md_filled = false;
+ }
+ if (cs->write_buffer_cancel_error != GRPC_ERROR_NONE) {
+ cancel_other_error = cs->write_buffer_cancel_error;
+ cs->write_buffer_cancel_error = GRPC_ERROR_NONE;
+ }
+
+ gpr_mu_unlock(&t->mu->mu);
+ }
+ }
+
+ ~inproc_stream() {
+ GRPC_ERROR_UNREF(write_buffer_cancel_error);
+ GRPC_ERROR_UNREF(cancel_self_error);
+ GRPC_ERROR_UNREF(cancel_other_error);
+
+ if (recv_inited) {
+ grpc_slice_buffer_destroy_internal(&recv_message);
+ }
+
+ t->unref();
+
+ if (closure_at_destroy) {
+ GRPC_CLOSURE_SCHED(closure_at_destroy, GRPC_ERROR_NONE);
+ }
+ }
+
+#ifndef NDEBUG
+#define STREAM_REF(refs, reason) grpc_stream_ref(refs, reason)
+#define STREAM_UNREF(refs, reason) grpc_stream_unref(refs, reason)
+#else
+#define STREAM_REF(refs, reason) grpc_stream_ref(refs)
+#define STREAM_UNREF(refs, reason) grpc_stream_unref(refs)
+#endif
+ void ref(const char* reason) {
+ INPROC_LOG(GPR_INFO, "ref_stream %p %s", this, reason);
+ STREAM_REF(refs, reason);
+ }
+
+ void unref(const char* reason) {
+ INPROC_LOG(GPR_INFO, "unref_stream %p %s", this, reason);
+ STREAM_UNREF(refs, reason);
+ }
+#undef STREAM_REF
+#undef STREAM_UNREF
-typedef struct inproc_stream {
inproc_transport* t;
grpc_metadata_batch to_read_initial_md;
- uint32_t to_read_initial_md_flags;
- bool to_read_initial_md_filled;
+ uint32_t to_read_initial_md_flags = 0;
+ bool to_read_initial_md_filled = false;
grpc_metadata_batch to_read_trailing_md;
- bool to_read_trailing_md_filled;
- bool ops_needed;
- bool op_closure_scheduled;
+ bool to_read_trailing_md_filled = false;
+ bool ops_needed = false;
+ bool op_closure_scheduled = false;
grpc_closure op_closure;
// Write buffer used only during gap at init time when client-side
// stream is set up but server side stream is not yet set up
grpc_metadata_batch write_buffer_initial_md;
- bool write_buffer_initial_md_filled;
- uint32_t write_buffer_initial_md_flags;
- grpc_millis write_buffer_deadline;
+ bool write_buffer_initial_md_filled = false;
+ uint32_t write_buffer_initial_md_flags = 0;
+ grpc_millis write_buffer_deadline = GRPC_MILLIS_INF_FUTURE;
grpc_metadata_batch write_buffer_trailing_md;
- bool write_buffer_trailing_md_filled;
- grpc_error* write_buffer_cancel_error;
+ bool write_buffer_trailing_md_filled = false;
+ grpc_error* write_buffer_cancel_error = GRPC_ERROR_NONE;
struct inproc_stream* other_side;
- bool other_side_closed; // won't talk anymore
- bool write_buffer_other_side_closed; // on hold
+ bool other_side_closed = false; // won't talk anymore
+ bool write_buffer_other_side_closed = false; // on hold
grpc_stream_refcount* refs;
- grpc_closure* closure_at_destroy;
+ grpc_closure* closure_at_destroy = nullptr;
gpr_arena* arena;
- grpc_transport_stream_op_batch* send_message_op;
- grpc_transport_stream_op_batch* send_trailing_md_op;
- grpc_transport_stream_op_batch* recv_initial_md_op;
- grpc_transport_stream_op_batch* recv_message_op;
- grpc_transport_stream_op_batch* recv_trailing_md_op;
+ grpc_transport_stream_op_batch* send_message_op = nullptr;
+ grpc_transport_stream_op_batch* send_trailing_md_op = nullptr;
+ grpc_transport_stream_op_batch* recv_initial_md_op = nullptr;
+ grpc_transport_stream_op_batch* recv_message_op = nullptr;
+ grpc_transport_stream_op_batch* recv_trailing_md_op = nullptr;
grpc_slice_buffer recv_message;
grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> recv_stream;
- bool recv_inited;
+ bool recv_inited = false;
- bool initial_md_sent;
- bool trailing_md_sent;
- bool initial_md_recvd;
- bool trailing_md_recvd;
+ bool initial_md_sent = false;
+ bool trailing_md_sent = false;
+ bool initial_md_recvd = false;
+ bool trailing_md_recvd = false;
- bool closed;
+ bool closed = false;
- grpc_error* cancel_self_error;
- grpc_error* cancel_other_error;
+ grpc_error* cancel_self_error = GRPC_ERROR_NONE;
+ grpc_error* cancel_other_error = GRPC_ERROR_NONE;
- grpc_millis deadline;
+ grpc_millis deadline = GRPC_MILLIS_INF_FUTURE;
- bool listed;
+ bool listed = true;
struct inproc_stream* stream_list_prev;
struct inproc_stream* stream_list_next;
-} inproc_stream;
-
-static bool cancel_stream_locked(inproc_stream* s, grpc_error* error);
-static void op_state_machine(void* arg, grpc_error* error);
-
-static void ref_transport(inproc_transport* t) {
- INPROC_LOG(GPR_INFO, "ref_transport %p", t);
- gpr_ref(&t->refs);
-}
-
-static void really_destroy_transport(inproc_transport* t) {
- INPROC_LOG(GPR_INFO, "really_destroy_transport %p", t);
- grpc_connectivity_state_destroy(&t->connectivity);
- if (gpr_unref(&t->mu->refs)) {
- gpr_free(t->mu);
- }
- gpr_free(t);
-}
-
-static void unref_transport(inproc_transport* t) {
- INPROC_LOG(GPR_INFO, "unref_transport %p", t);
- if (gpr_unref(&t->refs)) {
- really_destroy_transport(t);
- }
-}
-
-#ifndef NDEBUG
-#define STREAM_REF(refs, reason) grpc_stream_ref(refs, reason)
-#define STREAM_UNREF(refs, reason) grpc_stream_unref(refs, reason)
-#else
-#define STREAM_REF(refs, reason) grpc_stream_ref(refs)
-#define STREAM_UNREF(refs, reason) grpc_stream_unref(refs)
-#endif
-
-static void ref_stream(inproc_stream* s, const char* reason) {
- INPROC_LOG(GPR_INFO, "ref_stream %p %s", s, reason);
- STREAM_REF(s->refs, reason);
-}
-
-static void unref_stream(inproc_stream* s, const char* reason) {
- INPROC_LOG(GPR_INFO, "unref_stream %p %s", s, reason);
- STREAM_UNREF(s->refs, reason);
-}
-
-static void really_destroy_stream(inproc_stream* s) {
- INPROC_LOG(GPR_INFO, "really_destroy_stream %p", s);
+};
- GRPC_ERROR_UNREF(s->write_buffer_cancel_error);
- GRPC_ERROR_UNREF(s->cancel_self_error);
- GRPC_ERROR_UNREF(s->cancel_other_error);
-
- if (s->recv_inited) {
- grpc_slice_buffer_destroy_internal(&s->recv_message);
- }
-
- unref_transport(s->t);
-
- if (s->closure_at_destroy) {
- GRPC_CLOSURE_SCHED(s->closure_at_destroy, GRPC_ERROR_NONE);
- }
-}
-
-static void log_metadata(const grpc_metadata_batch* md_batch, bool is_client,
- bool is_initial) {
+void log_metadata(const grpc_metadata_batch* md_batch, bool is_client,
+ bool is_initial) {
for (grpc_linked_mdelem* md = md_batch->list.head; md != nullptr;
md = md->next) {
char* key = grpc_slice_to_c_string(GRPC_MDKEY(md->md));
@@ -193,10 +289,10 @@ static void log_metadata(const grpc_metadata_batch* md_batch, bool is_client,
}
}
-static grpc_error* fill_in_metadata(inproc_stream* s,
- const grpc_metadata_batch* metadata,
- uint32_t flags, grpc_metadata_batch* out_md,
- uint32_t* outflags, bool* markfilled) {
+grpc_error* fill_in_metadata(inproc_stream* s,
+ const grpc_metadata_batch* metadata,
+ uint32_t flags, grpc_metadata_batch* out_md,
+ uint32_t* outflags, bool* markfilled) {
if (grpc_inproc_trace.enabled()) {
log_metadata(metadata, s->t->is_client, outflags != nullptr);
}
@@ -221,109 +317,16 @@ static grpc_error* fill_in_metadata(inproc_stream* s,
return error;
}
-static int init_stream(grpc_transport* gt, grpc_stream* gs,
- grpc_stream_refcount* refcount, const void* server_data,
- gpr_arena* arena) {
+int init_stream(grpc_transport* gt, grpc_stream* gs,
+ grpc_stream_refcount* refcount, const void* server_data,
+ gpr_arena* arena) {
INPROC_LOG(GPR_INFO, "init_stream %p %p %p", gt, gs, server_data);
inproc_transport* t = reinterpret_cast<inproc_transport*>(gt);
- inproc_stream* s = reinterpret_cast<inproc_stream*>(gs);
- s->arena = arena;
-
- s->refs = refcount;
- // Ref this stream right now
- ref_stream(s, "inproc_init_stream:init");
-
- grpc_metadata_batch_init(&s->to_read_initial_md);
- s->to_read_initial_md_flags = 0;
- s->to_read_initial_md_filled = false;
- grpc_metadata_batch_init(&s->to_read_trailing_md);
- s->to_read_trailing_md_filled = false;
- grpc_metadata_batch_init(&s->write_buffer_initial_md);
- s->write_buffer_initial_md_flags = 0;
- s->write_buffer_initial_md_filled = false;
- grpc_metadata_batch_init(&s->write_buffer_trailing_md);
- s->write_buffer_trailing_md_filled = false;
- s->ops_needed = false;
- s->op_closure_scheduled = false;
- GRPC_CLOSURE_INIT(&s->op_closure, op_state_machine, s,
- grpc_schedule_on_exec_ctx);
- s->t = t;
- s->closure_at_destroy = nullptr;
- s->other_side_closed = false;
-
- s->initial_md_sent = s->trailing_md_sent = s->initial_md_recvd =
- s->trailing_md_recvd = false;
-
- s->closed = false;
-
- s->cancel_self_error = GRPC_ERROR_NONE;
- s->cancel_other_error = GRPC_ERROR_NONE;
- s->write_buffer_cancel_error = GRPC_ERROR_NONE;
- s->deadline = GRPC_MILLIS_INF_FUTURE;
- s->write_buffer_deadline = GRPC_MILLIS_INF_FUTURE;
-
- s->stream_list_prev = nullptr;
- gpr_mu_lock(&t->mu->mu);
- s->listed = true;
- ref_stream(s, "inproc_init_stream:list");
- s->stream_list_next = t->stream_list;
- if (t->stream_list) {
- t->stream_list->stream_list_prev = s;
- }
- t->stream_list = s;
- gpr_mu_unlock(&t->mu->mu);
-
- if (!server_data) {
- ref_transport(t);
- inproc_transport* st = t->other_side;
- ref_transport(st);
- s->other_side = nullptr; // will get filled in soon
- // Pass the client-side stream address to the server-side for a ref
- ref_stream(s, "inproc_init_stream:clt"); // ref it now on behalf of server
- // side to avoid destruction
- INPROC_LOG(GPR_INFO, "calling accept stream cb %p %p", st->accept_stream_cb,
- st->accept_stream_data);
- (*st->accept_stream_cb)(st->accept_stream_data, &st->base, (void*)s);
- } else {
- // This is the server-side and is being called through accept_stream_cb
- inproc_stream* cs = (inproc_stream*)server_data;
- s->other_side = cs;
- // Ref the server-side stream on behalf of the client now
- ref_stream(s, "inproc_init_stream:srv");
-
- // Now we are about to affect the other side, so lock the transport
- // to make sure that it doesn't get destroyed
- gpr_mu_lock(&s->t->mu->mu);
- cs->other_side = s;
- // Now transfer from the other side's write_buffer if any to the to_read
- // buffer
- if (cs->write_buffer_initial_md_filled) {
- fill_in_metadata(s, &cs->write_buffer_initial_md,
- cs->write_buffer_initial_md_flags,
- &s->to_read_initial_md, &s->to_read_initial_md_flags,
- &s->to_read_initial_md_filled);
- s->deadline = GPR_MIN(s->deadline, cs->write_buffer_deadline);
- grpc_metadata_batch_clear(&cs->write_buffer_initial_md);
- cs->write_buffer_initial_md_filled = false;
- }
- if (cs->write_buffer_trailing_md_filled) {
- fill_in_metadata(s, &cs->write_buffer_trailing_md, 0,
- &s->to_read_trailing_md, nullptr,
- &s->to_read_trailing_md_filled);
- grpc_metadata_batch_clear(&cs->write_buffer_trailing_md);
- cs->write_buffer_trailing_md_filled = false;
- }
- if (cs->write_buffer_cancel_error != GRPC_ERROR_NONE) {
- s->cancel_other_error = cs->write_buffer_cancel_error;
- cs->write_buffer_cancel_error = GRPC_ERROR_NONE;
- }
-
- gpr_mu_unlock(&s->t->mu->mu);
- }
+ new (gs) inproc_stream(t, refcount, server_data, arena);
return 0; // return value is not important
}
-static void close_stream_locked(inproc_stream* s) {
+void close_stream_locked(inproc_stream* s) {
if (!s->closed) {
// Release the metadata that we would have written out
grpc_metadata_batch_destroy(&s->write_buffer_initial_md);
@@ -341,21 +344,21 @@ static void close_stream_locked(inproc_stream* s) {
n->stream_list_prev = p;
}
s->listed = false;
- unref_stream(s, "close_stream:list");
+ s->unref("close_stream:list");
}
s->closed = true;
- unref_stream(s, "close_stream:closing");
+ s->unref("close_stream:closing");
}
}
// This function means that we are done talking/listening to the other side
-static void close_other_side_locked(inproc_stream* s, const char* reason) {
+void close_other_side_locked(inproc_stream* s, const char* reason) {
if (s->other_side != nullptr) {
// First release the metadata that came from the other side's arena
grpc_metadata_batch_destroy(&s->to_read_initial_md);
grpc_metadata_batch_destroy(&s->to_read_trailing_md);
- unref_stream(s->other_side, reason);
+ s->other_side->unref(reason);
s->other_side_closed = true;
s->other_side = nullptr;
} else if (!s->other_side_closed) {
@@ -367,9 +370,9 @@ static void close_other_side_locked(inproc_stream* s, const char* reason) {
// this stream_op_batch is only one of the pending operations for this
// stream. This is called when one of the pending operations for the stream
// is done and about to be NULLed out
-static void complete_if_batch_end_locked(inproc_stream* s, grpc_error* error,
- grpc_transport_stream_op_batch* op,
- const char* msg) {
+void complete_if_batch_end_locked(inproc_stream* s, grpc_error* error,
+ grpc_transport_stream_op_batch* op,
+ const char* msg) {
int is_sm = static_cast<int>(op == s->send_message_op);
int is_stm = static_cast<int>(op == s->send_trailing_md_op);
// TODO(vjpai): We should not consider the recv ops here, since they
@@ -386,8 +389,7 @@ static void complete_if_batch_end_locked(inproc_stream* s, grpc_error* error,
}
}
-static void maybe_schedule_op_closure_locked(inproc_stream* s,
- grpc_error* error) {
+void maybe_schedule_op_closure_locked(inproc_stream* s, grpc_error* error) {
if (s && s->ops_needed && !s->op_closure_scheduled) {
GRPC_CLOSURE_SCHED(&s->op_closure, GRPC_ERROR_REF(error));
s->op_closure_scheduled = true;
@@ -395,7 +397,7 @@ static void maybe_schedule_op_closure_locked(inproc_stream* s,
}
}
-static void fail_helper_locked(inproc_stream* s, grpc_error* error) {
+void fail_helper_locked(inproc_stream* s, grpc_error* error) {
INPROC_LOG(GPR_INFO, "op_state_machine %p fail_helper", s);
// If we're failing this side, we need to make sure that
// we also send or have already sent trailing metadata
@@ -525,8 +527,7 @@ static void fail_helper_locked(inproc_stream* s, grpc_error* error) {
// that the incoming byte stream's next() call will always return
// synchronously. That assumption is true today but may not always be
// true in the future.
-static void message_transfer_locked(inproc_stream* sender,
- inproc_stream* receiver) {
+void message_transfer_locked(inproc_stream* sender, inproc_stream* receiver) {
size_t remaining =
sender->send_message_op->payload->send_message.send_message->length();
if (receiver->recv_inited) {
@@ -572,7 +573,7 @@ static void message_transfer_locked(inproc_stream* sender,
sender->send_message_op = nullptr;
}
-static void op_state_machine(void* arg, grpc_error* error) {
+void op_state_machine(void* arg, grpc_error* error) {
// This function gets called when we have contents in the unprocessed reads
// Get what we want based on our ops wanted
// Schedule our appropriate closures
@@ -607,10 +608,8 @@ static void op_state_machine(void* arg, grpc_error* error) {
if (other->recv_message_op) {
message_transfer_locked(s, other);
maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE);
- } else if (!s->t->is_client &&
- (s->trailing_md_sent || other->recv_trailing_md_op)) {
- // A server send will never be matched if the client is waiting
- // for trailing metadata already
+ } else if (!s->t->is_client && s->trailing_md_sent) {
+ // A server send will never be matched if the server already sent status
s->send_message_op->payload->send_message.send_message.reset();
complete_if_batch_end_locked(
s, GRPC_ERROR_NONE, s->send_message_op,
@@ -621,11 +620,15 @@ static void op_state_machine(void* arg, grpc_error* error) {
// Pause a send trailing metadata if there is still an outstanding
// send message unless we know that the send message will never get
// matched to a receive. This happens on the client if the server has
- // already sent status.
+ // already sent status or on the server if the client has requested
+ // status
if (s->send_trailing_md_op &&
(!s->send_message_op ||
(s->t->is_client &&
- (s->trailing_md_recvd || s->to_read_trailing_md_filled)))) {
+ (s->trailing_md_recvd || s->to_read_trailing_md_filled)) ||
+ (!s->t->is_client && other &&
+ (other->trailing_md_recvd || other->to_read_trailing_md_filled ||
+ other->recv_trailing_md_op)))) {
grpc_metadata_batch* dest = (other == nullptr)
? &s->write_buffer_trailing_md
: &other->to_read_trailing_md;
@@ -723,16 +726,6 @@ static void op_state_machine(void* arg, grpc_error* error) {
maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE);
}
}
- if (s->recv_trailing_md_op && s->t->is_client && other &&
- other->send_message_op) {
- INPROC_LOG(GPR_INFO,
- "op_state_machine %p scheduling trailing-metadata-ready %p", s,
- GRPC_ERROR_NONE);
- GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata
- .recv_trailing_metadata_ready,
- GRPC_ERROR_NONE);
- maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE);
- }
if (s->to_read_trailing_md_filled) {
if (s->trailing_md_recvd) {
new_err =
@@ -748,6 +741,7 @@ static void op_state_machine(void* arg, grpc_error* error) {
if (s->recv_message_op != nullptr) {
// This message needs to be wrapped up because it will never be
// satisfied
+ *s->recv_message_op->payload->recv_message.recv_message = nullptr;
INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s);
GRPC_CLOSURE_SCHED(
s->recv_message_op->payload->recv_message.recv_message_ready,
@@ -810,6 +804,7 @@ static void op_state_machine(void* arg, grpc_error* error) {
// No further message will come on this stream, so finish off the
// recv_message_op
INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s);
+ *s->recv_message_op->payload->recv_message.recv_message = nullptr;
GRPC_CLOSURE_SCHED(
s->recv_message_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
@@ -847,7 +842,7 @@ done:
GRPC_ERROR_UNREF(new_err);
}
-static bool cancel_stream_locked(inproc_stream* s, grpc_error* error) {
+bool cancel_stream_locked(inproc_stream* s, grpc_error* error) {
bool ret = false; // was the cancel accepted
INPROC_LOG(GPR_INFO, "cancel_stream %p with %s", s, grpc_error_string(error));
if (s->cancel_self_error == GRPC_ERROR_NONE) {
@@ -900,10 +895,10 @@ static bool cancel_stream_locked(inproc_stream* s, grpc_error* error) {
return ret;
}
-static void do_nothing(void* arg, grpc_error* error) {}
+void do_nothing(void* arg, grpc_error* error) {}
-static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
- grpc_transport_stream_op_batch* op) {
+void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
+ grpc_transport_stream_op_batch* op) {
INPROC_LOG(GPR_INFO, "perform_stream_op %p %p %p", gt, gs, op);
inproc_stream* s = reinterpret_cast<inproc_stream*>(gs);
gpr_mu* mu = &s->t->mu->mu; // save aside in case s gets closed
@@ -1012,18 +1007,18 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
}
// We want to initiate the closure if:
- // 1. We want to send a message and the other side wants to receive or end
+ // 1. We want to send a message and the other side wants to receive
// 2. We want to send trailing metadata and there isn't an unmatched send
+ // or the other side wants trailing metadata
// 3. We want initial metadata and the other side has sent it
// 4. We want to receive a message and there is a message ready
// 5. There is trailing metadata, even if nothing specifically wants
// that because that can shut down the receive message as well
- if ((op->send_message && other &&
- ((other->recv_message_op != nullptr) ||
- (other->recv_trailing_md_op != nullptr))) ||
- (op->send_trailing_metadata && !op->send_message) ||
+ if ((op->send_message && other && other->recv_message_op != nullptr) ||
+ (op->send_trailing_metadata &&
+ (!s->send_message_op || (other && other->recv_trailing_md_op))) ||
(op->recv_initial_metadata && s->to_read_initial_md_filled) ||
- (op->recv_message && other && (other->send_message_op != nullptr)) ||
+ (op->recv_message && other && other->send_message_op != nullptr) ||
(s->to_read_trailing_md_filled || s->trailing_md_recvd)) {
if (!s->op_closure_scheduled) {
GRPC_CLOSURE_SCHED(&s->op_closure, GRPC_ERROR_NONE);
@@ -1083,7 +1078,7 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
GRPC_ERROR_UNREF(error);
}
-static void close_transport_locked(inproc_transport* t) {
+void close_transport_locked(inproc_transport* t) {
INPROC_LOG(GPR_INFO, "close_transport %p %d", t, t->is_closed);
grpc_connectivity_state_set(
&t->connectivity, GRPC_CHANNEL_SHUTDOWN,
@@ -1103,7 +1098,7 @@ static void close_transport_locked(inproc_transport* t) {
}
}
-static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) {
+void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) {
inproc_transport* t = reinterpret_cast<inproc_transport*>(gt);
INPROC_LOG(GPR_INFO, "perform_transport_op %p %p", t, op);
gpr_mu_lock(&t->mu->mu);
@@ -1136,39 +1131,64 @@ static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) {
gpr_mu_unlock(&t->mu->mu);
}
-static void destroy_stream(grpc_transport* gt, grpc_stream* gs,
- grpc_closure* then_schedule_closure) {
+void destroy_stream(grpc_transport* gt, grpc_stream* gs,
+ grpc_closure* then_schedule_closure) {
INPROC_LOG(GPR_INFO, "destroy_stream %p %p", gs, then_schedule_closure);
inproc_stream* s = reinterpret_cast<inproc_stream*>(gs);
s->closure_at_destroy = then_schedule_closure;
- really_destroy_stream(s);
+ s->~inproc_stream();
}
-static void destroy_transport(grpc_transport* gt) {
+void destroy_transport(grpc_transport* gt) {
inproc_transport* t = reinterpret_cast<inproc_transport*>(gt);
INPROC_LOG(GPR_INFO, "destroy_transport %p", t);
gpr_mu_lock(&t->mu->mu);
close_transport_locked(t);
gpr_mu_unlock(&t->mu->mu);
- unref_transport(t->other_side);
- unref_transport(t);
+ t->other_side->unref();
+ t->unref();
}
/*******************************************************************************
* INTEGRATION GLUE
*/
-static void set_pollset(grpc_transport* gt, grpc_stream* gs,
- grpc_pollset* pollset) {
+void set_pollset(grpc_transport* gt, grpc_stream* gs, grpc_pollset* pollset) {
// Nothing to do here
}
-static void set_pollset_set(grpc_transport* gt, grpc_stream* gs,
- grpc_pollset_set* pollset_set) {
+void set_pollset_set(grpc_transport* gt, grpc_stream* gs,
+ grpc_pollset_set* pollset_set) {
// Nothing to do here
}
-static grpc_endpoint* get_endpoint(grpc_transport* t) { return nullptr; }
+grpc_endpoint* get_endpoint(grpc_transport* t) { return nullptr; }
+
+const grpc_transport_vtable inproc_vtable = {
+ sizeof(inproc_stream), "inproc", init_stream,
+ set_pollset, set_pollset_set, perform_stream_op,
+ perform_transport_op, destroy_stream, destroy_transport,
+ get_endpoint};
+
+/*******************************************************************************
+ * Main inproc transport functions
+ */
+void inproc_transports_create(grpc_transport** server_transport,
+ const grpc_channel_args* server_args,
+ grpc_transport** client_transport,
+ const grpc_channel_args* client_args) {
+ INPROC_LOG(GPR_INFO, "inproc_transports_create");
+ shared_mu* mu = new (gpr_malloc(sizeof(*mu))) shared_mu();
+ inproc_transport* st = new (gpr_malloc(sizeof(*st)))
+ inproc_transport(&inproc_vtable, mu, /*is_client=*/false);
+ inproc_transport* ct = new (gpr_malloc(sizeof(*ct)))
+ inproc_transport(&inproc_vtable, mu, /*is_client=*/true);
+ st->other_side = ct;
+ ct->other_side = st;
+ *server_transport = reinterpret_cast<grpc_transport*>(st);
+ *client_transport = reinterpret_cast<grpc_transport*>(ct);
+}
+} // namespace
/*******************************************************************************
* GLOBAL INIT AND DESTROY
@@ -1190,48 +1210,6 @@ void grpc_inproc_transport_init(void) {
g_fake_auth_value = grpc_slice_from_static_string("inproc-fail");
}
-static const grpc_transport_vtable inproc_vtable = {
- sizeof(inproc_stream), "inproc", init_stream,
- set_pollset, set_pollset_set, perform_stream_op,
- perform_transport_op, destroy_stream, destroy_transport,
- get_endpoint};
-
-/*******************************************************************************
- * Main inproc transport functions
- */
-static void inproc_transports_create(grpc_transport** server_transport,
- const grpc_channel_args* server_args,
- grpc_transport** client_transport,
- const grpc_channel_args* client_args) {
- INPROC_LOG(GPR_INFO, "inproc_transports_create");
- inproc_transport* st =
- static_cast<inproc_transport*>(gpr_zalloc(sizeof(*st)));
- inproc_transport* ct =
- static_cast<inproc_transport*>(gpr_zalloc(sizeof(*ct)));
- // Share one lock between both sides since both sides get affected
- st->mu = ct->mu = static_cast<shared_mu*>(gpr_malloc(sizeof(*st->mu)));
- gpr_mu_init(&st->mu->mu);
- gpr_ref_init(&st->mu->refs, 2);
- st->base.vtable = &inproc_vtable;
- ct->base.vtable = &inproc_vtable;
- // Start each side of transport with 2 refs since they each have a ref
- // to the other
- gpr_ref_init(&st->refs, 2);
- gpr_ref_init(&ct->refs, 2);
- st->is_client = false;
- ct->is_client = true;
- grpc_connectivity_state_init(&st->connectivity, GRPC_CHANNEL_READY,
- "inproc_server");
- grpc_connectivity_state_init(&ct->connectivity, GRPC_CHANNEL_READY,
- "inproc_client");
- st->other_side = ct;
- ct->other_side = st;
- st->stream_list = nullptr;
- ct->stream_list = nullptr;
- *server_transport = reinterpret_cast<grpc_transport*>(st);
- *client_transport = reinterpret_cast<grpc_transport*>(ct);
-}
-
grpc_channel* grpc_inproc_channel_create(grpc_server* server,
grpc_channel_args* args,
void* reserved) {
@@ -1256,7 +1234,9 @@ grpc_channel* grpc_inproc_channel_create(grpc_server* server,
inproc_transports_create(&server_transport, server_args, &client_transport,
client_args);
- grpc_server_setup_transport(server, server_transport, nullptr, server_args);
+ // TODO(ncteisen): design and support channelz GetSocket for inproc.
+ grpc_server_setup_transport(server, server_transport, nullptr, server_args,
+ 0);
grpc_channel* channel = grpc_channel_create(
"inproc", client_args, GRPC_CLIENT_DIRECT_CHANNEL, client_transport);