aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/transport/inproc/inproc_transport.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/transport/inproc/inproc_transport.cc')
-rw-r--r--src/core/ext/transport/inproc/inproc_transport.cc351
1 files changed, 159 insertions, 192 deletions
diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc
index 67a8358927..ba9aa503c8 100644
--- a/src/core/ext/transport/inproc/inproc_transport.cc
+++ b/src/core/ext/transport/inproc/inproc_transport.cc
@@ -54,8 +54,8 @@ typedef struct inproc_transport {
gpr_refcount refs;
bool is_client;
grpc_connectivity_state_tracker connectivity;
- void (*accept_stream_cb)(grpc_exec_ctx *exec_ctx, void *user_data,
- grpc_transport *transport, const void *server_data);
+ void (*accept_stream_cb)(void *user_data, grpc_transport *transport,
+ const void *server_data);
void *accept_stream_data;
bool is_closed;
struct inproc_transport *other_side;
@@ -118,39 +118,36 @@ typedef struct inproc_stream {
} inproc_stream;
static grpc_closure do_nothing_closure;
-static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
- grpc_error *error);
-static void op_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error);
+static bool cancel_stream_locked(inproc_stream *s, grpc_error *error);
+static void op_state_machine(void *arg, grpc_error *error);
static void ref_transport(inproc_transport *t) {
INPROC_LOG(GPR_DEBUG, "ref_transport %p", t);
gpr_ref(&t->refs);
}
-static void really_destroy_transport(grpc_exec_ctx *exec_ctx,
- inproc_transport *t) {
+static void really_destroy_transport(inproc_transport *t) {
INPROC_LOG(GPR_DEBUG, "really_destroy_transport %p", t);
- grpc_connectivity_state_destroy(exec_ctx, &t->connectivity);
+ grpc_connectivity_state_destroy(&t->connectivity);
if (gpr_unref(&t->mu->refs)) {
gpr_free(t->mu);
}
gpr_free(t);
}
-static void unref_transport(grpc_exec_ctx *exec_ctx, inproc_transport *t) {
+static void unref_transport(inproc_transport *t) {
INPROC_LOG(GPR_DEBUG, "unref_transport %p", t);
if (gpr_unref(&t->refs)) {
- really_destroy_transport(exec_ctx, t);
+ really_destroy_transport(t);
}
}
#ifndef NDEBUG
#define STREAM_REF(refs, reason) grpc_stream_ref(refs, reason)
-#define STREAM_UNREF(e, refs, reason) grpc_stream_unref(e, refs, reason)
+#define STREAM_UNREF(refs, reason) grpc_stream_unref(refs, reason)
#else
#define STREAM_REF(refs, reason) grpc_stream_ref(refs)
-#define STREAM_UNREF(e, refs, reason) grpc_stream_unref(e, refs)
+#define STREAM_UNREF(refs, reason) grpc_stream_unref(refs)
#endif
static void ref_stream(inproc_stream *s, const char *reason) {
@@ -158,13 +155,12 @@ static void ref_stream(inproc_stream *s, const char *reason) {
STREAM_REF(s->refs, reason);
}
-static void unref_stream(grpc_exec_ctx *exec_ctx, inproc_stream *s,
- const char *reason) {
+static void unref_stream(inproc_stream *s, const char *reason) {
INPROC_LOG(GPR_DEBUG, "unref_stream %p %s", s, reason);
- STREAM_UNREF(exec_ctx, s->refs, reason);
+ STREAM_UNREF(s->refs, reason);
}
-static void really_destroy_stream(grpc_exec_ctx *exec_ctx, inproc_stream *s) {
+static void really_destroy_stream(inproc_stream *s) {
INPROC_LOG(GPR_DEBUG, "really_destroy_stream %p", s);
GRPC_ERROR_UNREF(s->write_buffer_cancel_error);
@@ -172,13 +168,13 @@ static void really_destroy_stream(grpc_exec_ctx *exec_ctx, inproc_stream *s) {
GRPC_ERROR_UNREF(s->cancel_other_error);
if (s->recv_inited) {
- grpc_slice_buffer_destroy_internal(exec_ctx, &s->recv_message);
+ grpc_slice_buffer_destroy_internal(&s->recv_message);
}
- unref_transport(exec_ctx, s->t);
+ unref_transport(s->t);
if (s->closure_at_destroy) {
- GRPC_CLOSURE_SCHED(exec_ctx, s->closure_at_destroy, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(s->closure_at_destroy, GRPC_ERROR_NONE);
}
}
@@ -195,7 +191,7 @@ static void log_metadata(const grpc_metadata_batch *md_batch, bool is_client,
}
}
-static grpc_error *fill_in_metadata(grpc_exec_ctx *exec_ctx, inproc_stream *s,
+static grpc_error *fill_in_metadata(inproc_stream *s,
const grpc_metadata_batch *metadata,
uint32_t flags, grpc_metadata_batch *out_md,
uint32_t *outflags, bool *markfilled) {
@@ -214,18 +210,18 @@ static grpc_error *fill_in_metadata(grpc_exec_ctx *exec_ctx, inproc_stream *s,
(elem != NULL) && (error == GRPC_ERROR_NONE); elem = elem->next) {
grpc_linked_mdelem *nelem =
(grpc_linked_mdelem *)gpr_arena_alloc(s->arena, sizeof(*nelem));
- nelem->md = grpc_mdelem_from_slices(
- exec_ctx, grpc_slice_intern(GRPC_MDKEY(elem->md)),
- grpc_slice_intern(GRPC_MDVALUE(elem->md)));
+ nelem->md =
+ grpc_mdelem_from_slices(grpc_slice_intern(GRPC_MDKEY(elem->md)),
+ grpc_slice_intern(GRPC_MDVALUE(elem->md)));
- error = grpc_metadata_batch_link_tail(exec_ctx, out_md, nelem);
+ error = grpc_metadata_batch_link_tail(out_md, nelem);
}
return error;
}
-static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
- grpc_stream *gs, grpc_stream_refcount *refcount,
- const void *server_data, gpr_arena *arena) {
+static int init_stream(grpc_transport *gt, grpc_stream *gs,
+ grpc_stream_refcount *refcount, const void *server_data,
+ gpr_arena *arena) {
INPROC_LOG(GPR_DEBUG, "init_stream %p %p %p", gt, gs, server_data);
inproc_transport *t = (inproc_transport *)gt;
inproc_stream *s = (inproc_stream *)gs;
@@ -285,8 +281,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
// side to avoid destruction
INPROC_LOG(GPR_DEBUG, "calling accept stream cb %p %p",
st->accept_stream_cb, st->accept_stream_data);
- (*st->accept_stream_cb)(exec_ctx, st->accept_stream_data, &st->base,
- (void *)s);
+ (*st->accept_stream_cb)(st->accept_stream_data, &st->base, (void *)s);
} else {
// This is the server-side and is being called through accept_stream_cb
inproc_stream *cs = (inproc_stream *)server_data;
@@ -301,19 +296,19 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
// Now transfer from the other side's write_buffer if any to the to_read
// buffer
if (cs->write_buffer_initial_md_filled) {
- fill_in_metadata(exec_ctx, s, &cs->write_buffer_initial_md,
+ fill_in_metadata(s, &cs->write_buffer_initial_md,
cs->write_buffer_initial_md_flags,
&s->to_read_initial_md, &s->to_read_initial_md_flags,
&s->to_read_initial_md_filled);
s->deadline = GPR_MIN(s->deadline, cs->write_buffer_deadline);
- grpc_metadata_batch_clear(exec_ctx, &cs->write_buffer_initial_md);
+ grpc_metadata_batch_clear(&cs->write_buffer_initial_md);
cs->write_buffer_initial_md_filled = false;
}
if (cs->write_buffer_trailing_md_filled) {
- fill_in_metadata(exec_ctx, s, &cs->write_buffer_trailing_md, 0,
+ fill_in_metadata(s, &cs->write_buffer_trailing_md, 0,
&s->to_read_trailing_md, NULL,
&s->to_read_trailing_md_filled);
- grpc_metadata_batch_clear(exec_ctx, &cs->write_buffer_trailing_md);
+ grpc_metadata_batch_clear(&cs->write_buffer_trailing_md);
cs->write_buffer_trailing_md_filled = false;
}
if (cs->write_buffer_cancel_error != GRPC_ERROR_NONE) {
@@ -326,11 +321,11 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
return 0; // return value is not important
}
-static void close_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s) {
+static void close_stream_locked(inproc_stream *s) {
if (!s->closed) {
// Release the metadata that we would have written out
- grpc_metadata_batch_destroy(exec_ctx, &s->write_buffer_initial_md);
- grpc_metadata_batch_destroy(exec_ctx, &s->write_buffer_trailing_md);
+ grpc_metadata_batch_destroy(&s->write_buffer_initial_md);
+ grpc_metadata_batch_destroy(&s->write_buffer_trailing_md);
if (s->listed) {
inproc_stream *p = s->stream_list_prev;
@@ -344,22 +339,21 @@ static void close_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s) {
n->stream_list_prev = p;
}
s->listed = false;
- unref_stream(exec_ctx, s, "close_stream:list");
+ unref_stream(s, "close_stream:list");
}
s->closed = true;
- unref_stream(exec_ctx, s, "close_stream:closing");
+ unref_stream(s, "close_stream:closing");
}
}
// This function means that we are done talking/listening to the other side
-static void close_other_side_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
- const char *reason) {
+static void close_other_side_locked(inproc_stream *s, const char *reason) {
if (s->other_side != NULL) {
// First release the metadata that came from the other side's arena
- grpc_metadata_batch_destroy(exec_ctx, &s->to_read_initial_md);
- grpc_metadata_batch_destroy(exec_ctx, &s->to_read_trailing_md);
+ grpc_metadata_batch_destroy(&s->to_read_initial_md);
+ grpc_metadata_batch_destroy(&s->to_read_trailing_md);
- unref_stream(exec_ctx, s->other_side, reason);
+ unref_stream(s->other_side, reason);
s->other_side_closed = true;
s->other_side = NULL;
} else if (!s->other_side_closed) {
@@ -371,8 +365,7 @@ static void close_other_side_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
// this stream_op_batch is only one of the pending operations for this
// stream. This is called when one of the pending operations for the stream
// is done and about to be NULLed out
-static void complete_if_batch_end_locked(grpc_exec_ctx *exec_ctx,
- inproc_stream *s, grpc_error *error,
+static void complete_if_batch_end_locked(inproc_stream *s, grpc_error *error,
grpc_transport_stream_op_batch *op,
const char *msg) {
int is_sm = (int)(op == s->send_message_op);
@@ -383,22 +376,20 @@ static void complete_if_batch_end_locked(grpc_exec_ctx *exec_ctx,
if ((is_sm + is_stm + is_rim + is_rm + is_rtm) == 1) {
INPROC_LOG(GPR_DEBUG, "%s %p %p %p", msg, s, op, error);
- GRPC_CLOSURE_SCHED(exec_ctx, op->on_complete, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(op->on_complete, GRPC_ERROR_REF(error));
}
}
-static void maybe_schedule_op_closure_locked(grpc_exec_ctx *exec_ctx,
- inproc_stream *s,
+static void maybe_schedule_op_closure_locked(inproc_stream *s,
grpc_error *error) {
if (s && s->ops_needed && !s->op_closure_scheduled) {
- GRPC_CLOSURE_SCHED(exec_ctx, &s->op_closure, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(&s->op_closure, GRPC_ERROR_REF(error));
s->op_closure_scheduled = true;
s->ops_needed = false;
}
}
-static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
- grpc_error *error) {
+static void fail_helper_locked(inproc_stream *s, grpc_error *error) {
INPROC_LOG(GPR_DEBUG, "op_state_machine %p fail_helper", s);
// If we're failing this side, we need to make sure that
// we also send or have already sent trailing metadata
@@ -414,14 +405,14 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
: &other->to_read_trailing_md;
bool *destfilled = (other == NULL) ? &s->write_buffer_trailing_md_filled
: &other->to_read_trailing_md_filled;
- fill_in_metadata(exec_ctx, s, &fake_md, 0, dest, NULL, destfilled);
- grpc_metadata_batch_destroy(exec_ctx, &fake_md);
+ fill_in_metadata(s, &fake_md, 0, dest, NULL, destfilled);
+ grpc_metadata_batch_destroy(&fake_md);
if (other != NULL) {
if (other->cancel_other_error == GRPC_ERROR_NONE) {
other->cancel_other_error = GRPC_ERROR_REF(error);
}
- maybe_schedule_op_closure_locked(exec_ctx, other, error);
+ maybe_schedule_op_closure_locked(other, error);
} else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) {
s->write_buffer_cancel_error = GRPC_ERROR_REF(error);
}
@@ -435,24 +426,21 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
grpc_metadata_batch_init(&fake_md);
grpc_linked_mdelem *path_md =
(grpc_linked_mdelem *)gpr_arena_alloc(s->arena, sizeof(*path_md));
- path_md->md =
- grpc_mdelem_from_slices(exec_ctx, g_fake_path_key, g_fake_path_value);
- GPR_ASSERT(grpc_metadata_batch_link_tail(exec_ctx, &fake_md, path_md) ==
+ path_md->md = grpc_mdelem_from_slices(g_fake_path_key, g_fake_path_value);
+ GPR_ASSERT(grpc_metadata_batch_link_tail(&fake_md, path_md) ==
GRPC_ERROR_NONE);
grpc_linked_mdelem *auth_md =
(grpc_linked_mdelem *)gpr_arena_alloc(s->arena, sizeof(*auth_md));
- auth_md->md =
- grpc_mdelem_from_slices(exec_ctx, g_fake_auth_key, g_fake_auth_value);
- GPR_ASSERT(grpc_metadata_batch_link_tail(exec_ctx, &fake_md, auth_md) ==
+ auth_md->md = grpc_mdelem_from_slices(g_fake_auth_key, g_fake_auth_value);
+ GPR_ASSERT(grpc_metadata_batch_link_tail(&fake_md, auth_md) ==
GRPC_ERROR_NONE);
fill_in_metadata(
- exec_ctx, s, &fake_md, 0,
- s->recv_initial_md_op->payload->recv_initial_metadata
- .recv_initial_metadata,
+ s, &fake_md, 0, s->recv_initial_md_op->payload->recv_initial_metadata
+ .recv_initial_metadata,
s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags,
NULL);
- grpc_metadata_batch_destroy(exec_ctx, &fake_md);
+ grpc_metadata_batch_destroy(&fake_md);
err = GRPC_ERROR_NONE;
} else {
err = GRPC_ERROR_REF(error);
@@ -460,14 +448,13 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
INPROC_LOG(GPR_DEBUG,
"fail_helper %p scheduling initial-metadata-ready %p %p", s,
error, err);
- GRPC_CLOSURE_SCHED(exec_ctx,
- s->recv_initial_md_op->payload->recv_initial_metadata
+ GRPC_CLOSURE_SCHED(s->recv_initial_md_op->payload->recv_initial_metadata
.recv_initial_metadata_ready,
err);
// Last use of err so no need to REF and then UNREF it
complete_if_batch_end_locked(
- exec_ctx, s, error, s->recv_initial_md_op,
+ s, error, s->recv_initial_md_op,
"fail_helper scheduling recv-initial-metadata-on-complete");
s->recv_initial_md_op = NULL;
}
@@ -475,22 +462,22 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
INPROC_LOG(GPR_DEBUG, "fail_helper %p scheduling message-ready %p", s,
error);
GRPC_CLOSURE_SCHED(
- exec_ctx, s->recv_message_op->payload->recv_message.recv_message_ready,
+ s->recv_message_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_REF(error));
complete_if_batch_end_locked(
- exec_ctx, s, error, s->recv_message_op,
+ s, error, s->recv_message_op,
"fail_helper scheduling recv-message-on-complete");
s->recv_message_op = NULL;
}
if (s->send_message_op) {
complete_if_batch_end_locked(
- exec_ctx, s, error, s->send_message_op,
+ s, error, s->send_message_op,
"fail_helper scheduling send-message-on-complete");
s->send_message_op = NULL;
}
if (s->send_trailing_md_op) {
complete_if_batch_end_locked(
- exec_ctx, s, error, s->send_trailing_md_op,
+ s, error, s->send_trailing_md_op,
"fail_helper scheduling send-trailng-md-on-complete");
s->send_trailing_md_op = NULL;
}
@@ -499,23 +486,22 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
"fail_helper %p scheduling trailing-md-on-complete %p", s,
error);
complete_if_batch_end_locked(
- exec_ctx, s, error, s->recv_trailing_md_op,
+ s, error, s->recv_trailing_md_op,
"fail_helper scheduling recv-trailing-metadata-on-complete");
s->recv_trailing_md_op = NULL;
}
- close_other_side_locked(exec_ctx, s, "fail_helper:other_side");
- close_stream_locked(exec_ctx, s);
+ close_other_side_locked(s, "fail_helper:other_side");
+ close_stream_locked(s);
GRPC_ERROR_UNREF(error);
}
-static void message_transfer_locked(grpc_exec_ctx *exec_ctx,
- inproc_stream *sender,
+static void message_transfer_locked(inproc_stream *sender,
inproc_stream *receiver) {
size_t remaining =
sender->send_message_op->payload->send_message.send_message->length;
if (receiver->recv_inited) {
- grpc_slice_buffer_destroy_internal(exec_ctx, &receiver->recv_message);
+ grpc_slice_buffer_destroy_internal(&receiver->recv_message);
}
grpc_slice_buffer_init(&receiver->recv_message);
receiver->recv_inited = true;
@@ -523,13 +509,13 @@ static void message_transfer_locked(grpc_exec_ctx *exec_ctx,
grpc_slice message_slice;
grpc_closure unused;
GPR_ASSERT(grpc_byte_stream_next(
- exec_ctx, sender->send_message_op->payload->send_message.send_message,
- SIZE_MAX, &unused));
+ sender->send_message_op->payload->send_message.send_message, SIZE_MAX,
+ &unused));
grpc_error *error = grpc_byte_stream_pull(
- exec_ctx, sender->send_message_op->payload->send_message.send_message,
+ sender->send_message_op->payload->send_message.send_message,
&message_slice);
if (error != GRPC_ERROR_NONE) {
- cancel_stream_locked(exec_ctx, sender, GRPC_ERROR_REF(error));
+ cancel_stream_locked(sender, GRPC_ERROR_REF(error));
break;
}
GPR_ASSERT(error == GRPC_ERROR_NONE);
@@ -544,22 +530,20 @@ static void message_transfer_locked(grpc_exec_ctx *exec_ctx,
INPROC_LOG(GPR_DEBUG, "message_transfer_locked %p scheduling message-ready",
receiver);
GRPC_CLOSURE_SCHED(
- exec_ctx,
receiver->recv_message_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
complete_if_batch_end_locked(
- exec_ctx, sender, GRPC_ERROR_NONE, sender->send_message_op,
+ sender, GRPC_ERROR_NONE, sender->send_message_op,
"message_transfer scheduling sender on_complete");
complete_if_batch_end_locked(
- exec_ctx, receiver, GRPC_ERROR_NONE, receiver->recv_message_op,
+ receiver, GRPC_ERROR_NONE, receiver->recv_message_op,
"message_transfer scheduling receiver on_complete");
receiver->recv_message_op = NULL;
sender->send_message_op = NULL;
}
-static void op_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
+static void op_state_machine(void *arg, grpc_error *error) {
// This function gets called when we have contents in the unprocessed reads
// Get what we want based on our ops wanted
// Schedule our appropriate closures
@@ -580,26 +564,26 @@ static void op_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
inproc_stream *other = s->other_side;
if (s->cancel_self_error != GRPC_ERROR_NONE) {
- fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(s->cancel_self_error));
+ fail_helper_locked(s, GRPC_ERROR_REF(s->cancel_self_error));
goto done;
} else if (s->cancel_other_error != GRPC_ERROR_NONE) {
- fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(s->cancel_other_error));
+ fail_helper_locked(s, GRPC_ERROR_REF(s->cancel_other_error));
goto done;
} else if (error != GRPC_ERROR_NONE) {
- fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(error));
+ fail_helper_locked(s, GRPC_ERROR_REF(error));
goto done;
}
if (s->send_message_op && other) {
if (other->recv_message_op) {
- message_transfer_locked(exec_ctx, s, other);
- maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE);
+ message_transfer_locked(s, other);
+ maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE);
} else if (!s->t->is_client &&
(s->trailing_md_sent || other->recv_trailing_md_op)) {
// A server send will never be matched if the client is waiting
// for trailing metadata already
complete_if_batch_end_locked(
- exec_ctx, s, GRPC_ERROR_NONE, s->send_message_op,
+ s, GRPC_ERROR_NONE, s->send_message_op,
"op_state_machine scheduling send-message-on-complete");
s->send_message_op = NULL;
}
@@ -620,28 +604,27 @@ static void op_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
// The buffer is already in use; that's an error!
INPROC_LOG(GPR_DEBUG, "Extra trailing metadata %p", s);
new_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra trailing metadata");
- fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err));
+ fail_helper_locked(s, GRPC_ERROR_REF(new_err));
goto done;
} else {
if (other && !other->closed) {
- fill_in_metadata(exec_ctx, s,
- s->send_trailing_md_op->payload->send_trailing_metadata
- .send_trailing_metadata,
+ fill_in_metadata(s, s->send_trailing_md_op->payload
+ ->send_trailing_metadata.send_trailing_metadata,
0, dest, NULL, destfilled);
}
s->trailing_md_sent = true;
if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
INPROC_LOG(GPR_DEBUG,
"op_state_machine %p scheduling trailing-md-on-complete", s);
- GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete,
+ GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->on_complete,
GRPC_ERROR_NONE);
s->recv_trailing_md_op = NULL;
needs_close = true;
}
}
- maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE);
+ maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE);
complete_if_batch_end_locked(
- exec_ctx, s, GRPC_ERROR_NONE, s->send_trailing_md_op,
+ s, GRPC_ERROR_NONE, s->send_trailing_md_op,
"op_state_machine scheduling send-trailing-metadata-on-complete");
s->send_trailing_md_op = NULL;
}
@@ -654,31 +637,30 @@ static void op_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
"op_state_machine %p scheduling on_complete errors for already "
"recvd initial md %p",
s, new_err);
- fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err));
+ fail_helper_locked(s, GRPC_ERROR_REF(new_err));
goto done;
}
if (s->to_read_initial_md_filled) {
s->initial_md_recvd = true;
new_err = fill_in_metadata(
- exec_ctx, s, &s->to_read_initial_md, s->to_read_initial_md_flags,
+ s, &s->to_read_initial_md, s->to_read_initial_md_flags,
s->recv_initial_md_op->payload->recv_initial_metadata
.recv_initial_metadata,
s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags,
NULL);
s->recv_initial_md_op->payload->recv_initial_metadata
.recv_initial_metadata->deadline = s->deadline;
- grpc_metadata_batch_clear(exec_ctx, &s->to_read_initial_md);
+ grpc_metadata_batch_clear(&s->to_read_initial_md);
s->to_read_initial_md_filled = false;
INPROC_LOG(GPR_DEBUG,
"op_state_machine %p scheduling initial-metadata-ready %p", s,
new_err);
- GRPC_CLOSURE_SCHED(exec_ctx,
- s->recv_initial_md_op->payload->recv_initial_metadata
+ GRPC_CLOSURE_SCHED(s->recv_initial_md_op->payload->recv_initial_metadata
.recv_initial_metadata_ready,
GRPC_ERROR_REF(new_err));
complete_if_batch_end_locked(
- exec_ctx, s, new_err, s->recv_initial_md_op,
+ s, new_err, s->recv_initial_md_op,
"op_state_machine scheduling recv-initial-metadata-on-complete");
s->recv_initial_md_op = NULL;
@@ -686,20 +668,20 @@ static void op_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
INPROC_LOG(GPR_DEBUG,
"op_state_machine %p scheduling on_complete errors2 %p", s,
new_err);
- fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err));
+ fail_helper_locked(s, GRPC_ERROR_REF(new_err));
goto done;
}
}
}
if (s->recv_message_op) {
if (other && other->send_message_op) {
- message_transfer_locked(exec_ctx, other, s);
- maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE);
+ message_transfer_locked(other, s);
+ maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE);
}
}
if (s->recv_trailing_md_op && s->t->is_client && other &&
other->send_message_op) {
- maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE);
+ maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE);
}
if (s->to_read_trailing_md_filled) {
if (s->trailing_md_recvd) {
@@ -710,7 +692,7 @@ static void op_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
"op_state_machine %p scheduling on_complete errors for already "
"recvd trailing md %p",
s, new_err);
- fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err));
+ fail_helper_locked(s, GRPC_ERROR_REF(new_err));
goto done;
}
if (s->recv_message_op != NULL) {
@@ -718,11 +700,10 @@ static void op_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
// satisfied
INPROC_LOG(GPR_DEBUG, "op_state_machine %p scheduling message-ready", s);
GRPC_CLOSURE_SCHED(
- exec_ctx,
s->recv_message_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
complete_if_batch_end_locked(
- exec_ctx, s, new_err, s->recv_message_op,
+ s, new_err, s->recv_message_op,
"op_state_machine scheduling recv-message-on-complete");
s->recv_message_op = NULL;
}
@@ -730,7 +711,7 @@ static void op_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
// Nothing further will try to receive from this stream, so finish off
// any outstanding send_message op
complete_if_batch_end_locked(
- exec_ctx, s, new_err, s->send_message_op,
+ s, new_err, s->send_message_op,
"op_state_machine scheduling send-message-on-complete");
s->send_message_op = NULL;
}
@@ -738,11 +719,11 @@ static void op_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
// We wanted trailing metadata and we got it
s->trailing_md_recvd = true;
new_err =
- fill_in_metadata(exec_ctx, s, &s->to_read_trailing_md, 0,
+ fill_in_metadata(s, &s->to_read_trailing_md, 0,
s->recv_trailing_md_op->payload
->recv_trailing_metadata.recv_trailing_metadata,
NULL, NULL);
- grpc_metadata_batch_clear(exec_ctx, &s->to_read_trailing_md);
+ grpc_metadata_batch_clear(&s->to_read_trailing_md);
s->to_read_trailing_md_filled = false;
// We should schedule the recv_trailing_md_op completion if
@@ -754,7 +735,7 @@ static void op_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
INPROC_LOG(GPR_DEBUG,
"op_state_machine %p scheduling trailing-md-on-complete %p",
s, new_err);
- GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete,
+ GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->on_complete,
GRPC_ERROR_REF(new_err));
s->recv_trailing_md_op = NULL;
needs_close = true;
@@ -775,10 +756,10 @@ static void op_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
// recv_message_op
INPROC_LOG(GPR_DEBUG, "op_state_machine %p scheduling message-ready", s);
GRPC_CLOSURE_SCHED(
- exec_ctx, s->recv_message_op->payload->recv_message.recv_message_ready,
+ s->recv_message_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
complete_if_batch_end_locked(
- exec_ctx, s, new_err, s->recv_message_op,
+ s, new_err, s->recv_message_op,
"op_state_machine scheduling recv-message-on-complete");
s->recv_message_op = NULL;
}
@@ -787,7 +768,7 @@ static void op_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
// Nothing further will try to receive from this stream, so finish off
// any outstanding send_message op
complete_if_batch_end_locked(
- exec_ctx, s, new_err, s->send_message_op,
+ s, new_err, s->send_message_op,
"op_state_machine scheduling send-message-on-complete");
s->send_message_op = NULL;
}
@@ -803,22 +784,21 @@ static void op_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
}
done:
if (needs_close) {
- close_other_side_locked(exec_ctx, s, "op_state_machine");
- close_stream_locked(exec_ctx, s);
+ close_other_side_locked(s, "op_state_machine");
+ close_stream_locked(s);
}
gpr_mu_unlock(mu);
GRPC_ERROR_UNREF(new_err);
}
-static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
- grpc_error *error) {
+static bool cancel_stream_locked(inproc_stream *s, grpc_error *error) {
bool ret = false; // was the cancel accepted
INPROC_LOG(GPR_DEBUG, "cancel_stream %p with %s", s,
grpc_error_string(error));
if (s->cancel_self_error == GRPC_ERROR_NONE) {
ret = true;
s->cancel_self_error = GRPC_ERROR_REF(error);
- maybe_schedule_op_closure_locked(exec_ctx, s, s->cancel_self_error);
+ maybe_schedule_op_closure_locked(s, s->cancel_self_error);
// Send trailing md to the other side indicating cancellation, even if we
// already have
s->trailing_md_sent = true;
@@ -831,15 +811,14 @@ static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
: &other->to_read_trailing_md;
bool *destfilled = (other == NULL) ? &s->write_buffer_trailing_md_filled
: &other->to_read_trailing_md_filled;
- fill_in_metadata(exec_ctx, s, &cancel_md, 0, dest, NULL, destfilled);
- grpc_metadata_batch_destroy(exec_ctx, &cancel_md);
+ fill_in_metadata(s, &cancel_md, 0, dest, NULL, destfilled);
+ grpc_metadata_batch_destroy(&cancel_md);
if (other != NULL) {
if (other->cancel_other_error == GRPC_ERROR_NONE) {
other->cancel_other_error = GRPC_ERROR_REF(s->cancel_self_error);
}
- maybe_schedule_op_closure_locked(exec_ctx, other,
- other->cancel_other_error);
+ maybe_schedule_op_closure_locked(other, other->cancel_other_error);
} else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) {
s->write_buffer_cancel_error = GRPC_ERROR_REF(s->cancel_self_error);
}
@@ -849,21 +828,20 @@ static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
// md, now's the chance
if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
complete_if_batch_end_locked(
- exec_ctx, s, s->cancel_self_error, s->recv_trailing_md_op,
+ s, s->cancel_self_error, s->recv_trailing_md_op,
"cancel_stream scheduling trailing-md-on-complete");
s->recv_trailing_md_op = NULL;
}
}
- close_other_side_locked(exec_ctx, s, "cancel_stream:other_side");
- close_stream_locked(exec_ctx, s);
+ close_other_side_locked(s, "cancel_stream:other_side");
+ close_stream_locked(s);
GRPC_ERROR_UNREF(error);
return ret;
}
-static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
- grpc_stream *gs,
+static void perform_stream_op(grpc_transport *gt, grpc_stream *gs,
grpc_transport_stream_op_batch *op) {
INPROC_LOG(GPR_DEBUG, "perform_stream_op %p %p %p", gt, gs, op);
inproc_stream *s = (inproc_stream *)gs;
@@ -889,7 +867,7 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
if (op->cancel_stream) {
// Call cancel_stream_locked without ref'ing the cancel_error because
// this function is responsible to make sure that that field gets unref'ed
- cancel_stream_locked(exec_ctx, s, op->payload->cancel_stream.cancel_error);
+ cancel_stream_locked(s, op->payload->cancel_stream.cancel_error);
// this op can complete without an error
} else if (s->cancel_self_error != GRPC_ERROR_NONE) {
// already self-canceled so still give it an error
@@ -927,8 +905,7 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
} else {
if (!other->closed) {
fill_in_metadata(
- exec_ctx, s,
- op->payload->send_initial_metadata.send_initial_metadata,
+ s, op->payload->send_initial_metadata.send_initial_metadata,
op->payload->send_initial_metadata.send_initial_metadata_flags,
dest, destflags, destfilled);
}
@@ -940,7 +917,7 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
s->initial_md_sent = true;
}
}
- maybe_schedule_op_closure_locked(exec_ctx, other, error);
+ maybe_schedule_op_closure_locked(other, error);
}
}
@@ -979,7 +956,7 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
(op->recv_message && (other && other->send_message_op != NULL)) ||
(s->to_read_trailing_md_filled || s->trailing_md_recvd)) {
if (!s->op_closure_scheduled) {
- GRPC_CLOSURE_SCHED(exec_ctx, &s->op_closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(&s->op_closure, GRPC_ERROR_NONE);
s->op_closure_scheduled = true;
}
} else {
@@ -994,7 +971,6 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
"perform_stream_op error %p scheduling initial-metadata-ready %p",
s, error);
GRPC_CLOSURE_SCHED(
- exec_ctx,
op->payload->recv_initial_metadata.recv_initial_metadata_ready,
GRPC_ERROR_REF(error));
}
@@ -1003,28 +979,26 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
GPR_DEBUG,
"perform_stream_op error %p scheduling recv message-ready %p", s,
error);
- GRPC_CLOSURE_SCHED(exec_ctx,
- op->payload->recv_message.recv_message_ready,
+ GRPC_CLOSURE_SCHED(op->payload->recv_message.recv_message_ready,
GRPC_ERROR_REF(error));
}
}
INPROC_LOG(GPR_DEBUG, "perform_stream_op %p scheduling on_complete %p", s,
error);
- GRPC_CLOSURE_SCHED(exec_ctx, on_complete, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(on_complete, GRPC_ERROR_REF(error));
}
if (needs_close) {
- close_other_side_locked(exec_ctx, s, "perform_stream_op:other_side");
- close_stream_locked(exec_ctx, s);
+ close_other_side_locked(s, "perform_stream_op:other_side");
+ close_stream_locked(s);
}
gpr_mu_unlock(mu);
GRPC_ERROR_UNREF(error);
}
-static void close_transport_locked(grpc_exec_ctx *exec_ctx,
- inproc_transport *t) {
+static void close_transport_locked(inproc_transport *t) {
INPROC_LOG(GPR_DEBUG, "close_transport %p %d", t, t->is_closed);
grpc_connectivity_state_set(
- exec_ctx, &t->connectivity, GRPC_CHANNEL_SHUTDOWN,
+ &t->connectivity, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Closing transport."),
"close transport");
if (!t->is_closed) {
@@ -1033,7 +1007,7 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
while (t->stream_list != NULL) {
// cancel_stream_locked also adjusts stream list
cancel_stream_locked(
- exec_ctx, t->stream_list,
+ t->stream_list,
grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport closed"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
@@ -1041,14 +1015,13 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
}
}
-static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
- grpc_transport_op *op) {
+static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op) {
inproc_transport *t = (inproc_transport *)gt;
INPROC_LOG(GPR_DEBUG, "perform_transport_op %p %p", t, op);
gpr_mu_lock(&t->mu->mu);
if (op->on_connectivity_state_change) {
grpc_connectivity_state_notify_on_state_change(
- exec_ctx, &t->connectivity, op->connectivity_state,
+ &t->connectivity, op->connectivity_state,
op->on_connectivity_state_change);
}
if (op->set_accept_stream) {
@@ -1056,7 +1029,7 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
t->accept_stream_data = op->set_accept_stream_user_data;
}
if (op->on_consumed) {
- GRPC_CLOSURE_SCHED(exec_ctx, op->on_consumed, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE);
}
bool do_close = false;
@@ -1070,71 +1043,68 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
}
if (do_close) {
- close_transport_locked(exec_ctx, t);
+ close_transport_locked(t);
}
gpr_mu_unlock(&t->mu->mu);
}
-static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
- grpc_stream *gs,
+static void destroy_stream(grpc_transport *gt, grpc_stream *gs,
grpc_closure *then_schedule_closure) {
INPROC_LOG(GPR_DEBUG, "destroy_stream %p %p", gs, then_schedule_closure);
inproc_stream *s = (inproc_stream *)gs;
s->closure_at_destroy = then_schedule_closure;
- really_destroy_stream(exec_ctx, s);
+ really_destroy_stream(s);
}
-static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
+static void destroy_transport(grpc_transport *gt) {
inproc_transport *t = (inproc_transport *)gt;
INPROC_LOG(GPR_DEBUG, "destroy_transport %p", t);
gpr_mu_lock(&t->mu->mu);
- close_transport_locked(exec_ctx, t);
+ close_transport_locked(t);
gpr_mu_unlock(&t->mu->mu);
- unref_transport(exec_ctx, t->other_side);
- unref_transport(exec_ctx, t);
+ unref_transport(t->other_side);
+ unref_transport(t);
}
/*******************************************************************************
* INTEGRATION GLUE
*/
-static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
- grpc_stream *gs, grpc_pollset *pollset) {
+static void set_pollset(grpc_transport *gt, grpc_stream *gs,
+ grpc_pollset *pollset) {
// Nothing to do here
}
-static void set_pollset_set(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
- grpc_stream *gs, grpc_pollset_set *pollset_set) {
+static void set_pollset_set(grpc_transport *gt, grpc_stream *gs,
+ grpc_pollset_set *pollset_set) {
// Nothing to do here
}
-static grpc_endpoint *get_endpoint(grpc_exec_ctx *exec_ctx, grpc_transport *t) {
- return NULL;
-}
+static grpc_endpoint *get_endpoint(grpc_transport *t) { return NULL; }
/*******************************************************************************
* GLOBAL INIT AND DESTROY
*/
-static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {}
+static void do_nothing(void *arg, grpc_error *error) {}
void grpc_inproc_transport_init(void) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ ExecCtx _local_exec_ctx;
GRPC_CLOSURE_INIT(&do_nothing_closure, do_nothing, NULL,
grpc_schedule_on_exec_ctx);
g_empty_slice = grpc_slice_from_static_buffer(NULL, 0);
grpc_slice key_tmp = grpc_slice_from_static_string(":path");
g_fake_path_key = grpc_slice_intern(key_tmp);
- grpc_slice_unref_internal(&exec_ctx, key_tmp);
+ grpc_slice_unref_internal(key_tmp);
g_fake_path_value = grpc_slice_from_static_string("/");
grpc_slice auth_tmp = grpc_slice_from_static_string(":authority");
g_fake_auth_key = grpc_slice_intern(auth_tmp);
- grpc_slice_unref_internal(&exec_ctx, auth_tmp);
+ grpc_slice_unref_internal(auth_tmp);
g_fake_auth_value = grpc_slice_from_static_string("inproc-fail");
- grpc_exec_ctx_finish(&exec_ctx);
+ grpc_exec_ctx_finish();
}
static const grpc_transport_vtable inproc_vtable = {
@@ -1146,8 +1116,7 @@ static const grpc_transport_vtable inproc_vtable = {
/*******************************************************************************
* Main inproc transport functions
*/
-static void inproc_transports_create(grpc_exec_ctx *exec_ctx,
- grpc_transport **server_transport,
+static void inproc_transports_create(grpc_transport **server_transport,
const grpc_channel_args *server_args,
grpc_transport **client_transport,
const grpc_channel_args *client_args) {
@@ -1184,7 +1153,7 @@ grpc_channel *grpc_inproc_channel_create(grpc_server *server,
GRPC_API_TRACE("grpc_inproc_channel_create(server=%p, args=%p)", 2,
(server, args));
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ ExecCtx _local_exec_ctx;
const grpc_channel_args *server_args = grpc_server_get_channel_args(server);
@@ -1199,30 +1168,28 @@ grpc_channel *grpc_inproc_channel_create(grpc_server *server,
grpc_transport *server_transport;
grpc_transport *client_transport;
- inproc_transports_create(&exec_ctx, &server_transport, server_args,
- &client_transport, client_args);
+ inproc_transports_create(&server_transport, server_args, &client_transport,
+ client_args);
- grpc_server_setup_transport(&exec_ctx, server, server_transport, NULL,
- server_args);
- grpc_channel *channel =
- grpc_channel_create(&exec_ctx, "inproc", client_args,
- GRPC_CLIENT_DIRECT_CHANNEL, client_transport);
+ grpc_server_setup_transport(server, server_transport, NULL, server_args);
+ grpc_channel *channel = grpc_channel_create(
+ "inproc", client_args, GRPC_CLIENT_DIRECT_CHANNEL, client_transport);
// Free up created channel args
- grpc_channel_args_destroy(&exec_ctx, client_args);
+ grpc_channel_args_destroy(client_args);
// Now finish scheduled operations
- grpc_exec_ctx_finish(&exec_ctx);
+ grpc_exec_ctx_finish();
return channel;
}
void grpc_inproc_transport_shutdown(void) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_slice_unref_internal(&exec_ctx, g_empty_slice);
- grpc_slice_unref_internal(&exec_ctx, g_fake_path_key);
- grpc_slice_unref_internal(&exec_ctx, g_fake_path_value);
- grpc_slice_unref_internal(&exec_ctx, g_fake_auth_key);
- grpc_slice_unref_internal(&exec_ctx, g_fake_auth_value);
- grpc_exec_ctx_finish(&exec_ctx);
+ ExecCtx _local_exec_ctx;
+ grpc_slice_unref_internal(g_empty_slice);
+ grpc_slice_unref_internal(g_fake_path_key);
+ grpc_slice_unref_internal(g_fake_path_value);
+ grpc_slice_unref_internal(g_fake_auth_key);
+ grpc_slice_unref_internal(g_fake_auth_value);
+ grpc_exec_ctx_finish();
}