From 8cf1470a51ea276ca84825e7495d4ee24743540d Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Wed, 6 Dec 2017 09:47:54 -0800 Subject: Revert "Revert "All instances of exec_ctx being passed around in src/core removed"" --- src/core/ext/transport/inproc/inproc_transport.cc | 342 ++++++++++------------ 1 file changed, 154 insertions(+), 188 deletions(-) (limited to 'src/core/ext/transport/inproc') diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc index d8d753e459..8dd0b7dce2 100644 --- a/src/core/ext/transport/inproc/inproc_transport.cc +++ b/src/core/ext/transport/inproc/inproc_transport.cc @@ -54,8 +54,8 @@ typedef struct inproc_transport { gpr_refcount refs; bool is_client; grpc_connectivity_state_tracker connectivity; - void (*accept_stream_cb)(grpc_exec_ctx* exec_ctx, void* user_data, - grpc_transport* transport, const void* server_data); + void (*accept_stream_cb)(void* user_data, grpc_transport* transport, + const void* server_data); void* accept_stream_data; bool is_closed; struct inproc_transport* other_side; @@ -118,39 +118,36 @@ typedef struct inproc_stream { } inproc_stream; static grpc_closure do_nothing_closure; -static bool cancel_stream_locked(grpc_exec_ctx* exec_ctx, inproc_stream* s, - grpc_error* error); -static void op_state_machine(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error); +static bool cancel_stream_locked(inproc_stream* s, grpc_error* error); +static void op_state_machine(void* arg, grpc_error* error); static void ref_transport(inproc_transport* t) { INPROC_LOG(GPR_DEBUG, "ref_transport %p", t); gpr_ref(&t->refs); } -static void really_destroy_transport(grpc_exec_ctx* exec_ctx, - inproc_transport* t) { +static void really_destroy_transport(inproc_transport* t) { INPROC_LOG(GPR_DEBUG, "really_destroy_transport %p", t); - grpc_connectivity_state_destroy(exec_ctx, &t->connectivity); + grpc_connectivity_state_destroy(&t->connectivity); if (gpr_unref(&t->mu->refs)) { gpr_free(t->mu); } gpr_free(t); } -static void unref_transport(grpc_exec_ctx* exec_ctx, inproc_transport* t) { +static void unref_transport(inproc_transport* t) { INPROC_LOG(GPR_DEBUG, "unref_transport %p", t); if (gpr_unref(&t->refs)) { - really_destroy_transport(exec_ctx, t); + really_destroy_transport(t); } } #ifndef NDEBUG #define STREAM_REF(refs, reason) grpc_stream_ref(refs, reason) -#define STREAM_UNREF(e, refs, reason) grpc_stream_unref(e, refs, reason) +#define STREAM_UNREF(refs, reason) grpc_stream_unref(refs, reason) #else #define STREAM_REF(refs, reason) grpc_stream_ref(refs) -#define STREAM_UNREF(e, refs, reason) grpc_stream_unref(e, refs) +#define STREAM_UNREF(refs, reason) grpc_stream_unref(refs) #endif static void ref_stream(inproc_stream* s, const char* reason) { @@ -158,13 +155,12 @@ static void ref_stream(inproc_stream* s, const char* reason) { STREAM_REF(s->refs, reason); } -static void unref_stream(grpc_exec_ctx* exec_ctx, inproc_stream* s, - const char* reason) { +static void unref_stream(inproc_stream* s, const char* reason) { INPROC_LOG(GPR_DEBUG, "unref_stream %p %s", s, reason); - STREAM_UNREF(exec_ctx, s->refs, reason); + STREAM_UNREF(s->refs, reason); } -static void really_destroy_stream(grpc_exec_ctx* exec_ctx, inproc_stream* s) { +static void really_destroy_stream(inproc_stream* s) { INPROC_LOG(GPR_DEBUG, "really_destroy_stream %p", s); GRPC_ERROR_UNREF(s->write_buffer_cancel_error); @@ -172,13 +168,13 @@ static void really_destroy_stream(grpc_exec_ctx* exec_ctx, inproc_stream* s) { GRPC_ERROR_UNREF(s->cancel_other_error); if (s->recv_inited) { - grpc_slice_buffer_destroy_internal(exec_ctx, &s->recv_message); + grpc_slice_buffer_destroy_internal(&s->recv_message); } - unref_transport(exec_ctx, s->t); + unref_transport(s->t); if (s->closure_at_destroy) { - GRPC_CLOSURE_SCHED(exec_ctx, s->closure_at_destroy, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(s->closure_at_destroy, GRPC_ERROR_NONE); } } @@ -195,7 +191,7 @@ static void log_metadata(const grpc_metadata_batch* md_batch, bool is_client, } } -static grpc_error* fill_in_metadata(grpc_exec_ctx* exec_ctx, inproc_stream* s, +static grpc_error* fill_in_metadata(inproc_stream* s, const grpc_metadata_batch* metadata, uint32_t flags, grpc_metadata_batch* out_md, uint32_t* outflags, bool* markfilled) { @@ -214,18 +210,18 @@ static grpc_error* fill_in_metadata(grpc_exec_ctx* exec_ctx, inproc_stream* s, (elem != nullptr) && (error == GRPC_ERROR_NONE); elem = elem->next) { grpc_linked_mdelem* nelem = (grpc_linked_mdelem*)gpr_arena_alloc(s->arena, sizeof(*nelem)); - nelem->md = grpc_mdelem_from_slices( - exec_ctx, grpc_slice_intern(GRPC_MDKEY(elem->md)), - grpc_slice_intern(GRPC_MDVALUE(elem->md))); + nelem->md = + grpc_mdelem_from_slices(grpc_slice_intern(GRPC_MDKEY(elem->md)), + grpc_slice_intern(GRPC_MDVALUE(elem->md))); - error = grpc_metadata_batch_link_tail(exec_ctx, out_md, nelem); + error = grpc_metadata_batch_link_tail(out_md, nelem); } return error; } -static int init_stream(grpc_exec_ctx* exec_ctx, grpc_transport* gt, - grpc_stream* gs, grpc_stream_refcount* refcount, - const void* server_data, gpr_arena* arena) { +static int init_stream(grpc_transport* gt, grpc_stream* gs, + grpc_stream_refcount* refcount, const void* server_data, + gpr_arena* arena) { INPROC_LOG(GPR_DEBUG, "init_stream %p %p %p", gt, gs, server_data); inproc_transport* t = (inproc_transport*)gt; inproc_stream* s = (inproc_stream*)gs; @@ -285,8 +281,7 @@ static int init_stream(grpc_exec_ctx* exec_ctx, grpc_transport* gt, // side to avoid destruction INPROC_LOG(GPR_DEBUG, "calling accept stream cb %p %p", st->accept_stream_cb, st->accept_stream_data); - (*st->accept_stream_cb)(exec_ctx, st->accept_stream_data, &st->base, - (void*)s); + (*st->accept_stream_cb)(st->accept_stream_data, &st->base, (void*)s); } else { // This is the server-side and is being called through accept_stream_cb inproc_stream* cs = (inproc_stream*)server_data; @@ -301,19 +296,19 @@ static int init_stream(grpc_exec_ctx* exec_ctx, grpc_transport* gt, // Now transfer from the other side's write_buffer if any to the to_read // buffer if (cs->write_buffer_initial_md_filled) { - fill_in_metadata(exec_ctx, s, &cs->write_buffer_initial_md, + fill_in_metadata(s, &cs->write_buffer_initial_md, cs->write_buffer_initial_md_flags, &s->to_read_initial_md, &s->to_read_initial_md_flags, &s->to_read_initial_md_filled); s->deadline = GPR_MIN(s->deadline, cs->write_buffer_deadline); - grpc_metadata_batch_clear(exec_ctx, &cs->write_buffer_initial_md); + grpc_metadata_batch_clear(&cs->write_buffer_initial_md); cs->write_buffer_initial_md_filled = false; } if (cs->write_buffer_trailing_md_filled) { - fill_in_metadata(exec_ctx, s, &cs->write_buffer_trailing_md, 0, + fill_in_metadata(s, &cs->write_buffer_trailing_md, 0, &s->to_read_trailing_md, nullptr, &s->to_read_trailing_md_filled); - grpc_metadata_batch_clear(exec_ctx, &cs->write_buffer_trailing_md); + grpc_metadata_batch_clear(&cs->write_buffer_trailing_md); cs->write_buffer_trailing_md_filled = false; } if (cs->write_buffer_cancel_error != GRPC_ERROR_NONE) { @@ -326,11 +321,11 @@ static int init_stream(grpc_exec_ctx* exec_ctx, grpc_transport* gt, return 0; // return value is not important } -static void close_stream_locked(grpc_exec_ctx* exec_ctx, inproc_stream* s) { +static void close_stream_locked(inproc_stream* s) { if (!s->closed) { // Release the metadata that we would have written out - grpc_metadata_batch_destroy(exec_ctx, &s->write_buffer_initial_md); - grpc_metadata_batch_destroy(exec_ctx, &s->write_buffer_trailing_md); + grpc_metadata_batch_destroy(&s->write_buffer_initial_md); + grpc_metadata_batch_destroy(&s->write_buffer_trailing_md); if (s->listed) { inproc_stream* p = s->stream_list_prev; @@ -344,22 +339,21 @@ static void close_stream_locked(grpc_exec_ctx* exec_ctx, inproc_stream* s) { n->stream_list_prev = p; } s->listed = false; - unref_stream(exec_ctx, s, "close_stream:list"); + unref_stream(s, "close_stream:list"); } s->closed = true; - unref_stream(exec_ctx, s, "close_stream:closing"); + unref_stream(s, "close_stream:closing"); } } // This function means that we are done talking/listening to the other side -static void close_other_side_locked(grpc_exec_ctx* exec_ctx, inproc_stream* s, - const char* reason) { +static void close_other_side_locked(inproc_stream* s, const char* reason) { if (s->other_side != nullptr) { // First release the metadata that came from the other side's arena - grpc_metadata_batch_destroy(exec_ctx, &s->to_read_initial_md); - grpc_metadata_batch_destroy(exec_ctx, &s->to_read_trailing_md); + grpc_metadata_batch_destroy(&s->to_read_initial_md); + grpc_metadata_batch_destroy(&s->to_read_trailing_md); - unref_stream(exec_ctx, s->other_side, reason); + unref_stream(s->other_side, reason); s->other_side_closed = true; s->other_side = nullptr; } else if (!s->other_side_closed) { @@ -371,8 +365,7 @@ static void close_other_side_locked(grpc_exec_ctx* exec_ctx, inproc_stream* s, // this stream_op_batch is only one of the pending operations for this // stream. This is called when one of the pending operations for the stream // is done and about to be NULLed out -static void complete_if_batch_end_locked(grpc_exec_ctx* exec_ctx, - inproc_stream* s, grpc_error* error, +static void complete_if_batch_end_locked(inproc_stream* s, grpc_error* error, grpc_transport_stream_op_batch* op, const char* msg) { int is_sm = (int)(op == s->send_message_op); @@ -383,22 +376,20 @@ static void complete_if_batch_end_locked(grpc_exec_ctx* exec_ctx, if ((is_sm + is_stm + is_rim + is_rm + is_rtm) == 1) { INPROC_LOG(GPR_DEBUG, "%s %p %p %p", msg, s, op, error); - GRPC_CLOSURE_SCHED(exec_ctx, op->on_complete, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(op->on_complete, GRPC_ERROR_REF(error)); } } -static void maybe_schedule_op_closure_locked(grpc_exec_ctx* exec_ctx, - inproc_stream* s, +static void maybe_schedule_op_closure_locked(inproc_stream* s, grpc_error* error) { if (s && s->ops_needed && !s->op_closure_scheduled) { - GRPC_CLOSURE_SCHED(exec_ctx, &s->op_closure, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(&s->op_closure, GRPC_ERROR_REF(error)); s->op_closure_scheduled = true; s->ops_needed = false; } } -static void fail_helper_locked(grpc_exec_ctx* exec_ctx, inproc_stream* s, - grpc_error* error) { +static void fail_helper_locked(inproc_stream* s, grpc_error* error) { INPROC_LOG(GPR_DEBUG, "op_state_machine %p fail_helper", s); // If we're failing this side, we need to make sure that // we also send or have already sent trailing metadata @@ -415,14 +406,14 @@ static void fail_helper_locked(grpc_exec_ctx* exec_ctx, inproc_stream* s, : &other->to_read_trailing_md; bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled : &other->to_read_trailing_md_filled; - fill_in_metadata(exec_ctx, s, &fake_md, 0, dest, nullptr, destfilled); - grpc_metadata_batch_destroy(exec_ctx, &fake_md); + fill_in_metadata(s, &fake_md, 0, dest, nullptr, destfilled); + grpc_metadata_batch_destroy(&fake_md); if (other != nullptr) { if (other->cancel_other_error == GRPC_ERROR_NONE) { other->cancel_other_error = GRPC_ERROR_REF(error); } - maybe_schedule_op_closure_locked(exec_ctx, other, error); + maybe_schedule_op_closure_locked(other, error); } else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) { s->write_buffer_cancel_error = GRPC_ERROR_REF(error); } @@ -436,24 +427,22 @@ static void fail_helper_locked(grpc_exec_ctx* exec_ctx, inproc_stream* s, grpc_metadata_batch_init(&fake_md); grpc_linked_mdelem* path_md = (grpc_linked_mdelem*)gpr_arena_alloc(s->arena, sizeof(*path_md)); - path_md->md = - grpc_mdelem_from_slices(exec_ctx, g_fake_path_key, g_fake_path_value); - GPR_ASSERT(grpc_metadata_batch_link_tail(exec_ctx, &fake_md, path_md) == + path_md->md = grpc_mdelem_from_slices(g_fake_path_key, g_fake_path_value); + GPR_ASSERT(grpc_metadata_batch_link_tail(&fake_md, path_md) == GRPC_ERROR_NONE); grpc_linked_mdelem* auth_md = (grpc_linked_mdelem*)gpr_arena_alloc(s->arena, sizeof(*auth_md)); - auth_md->md = - grpc_mdelem_from_slices(exec_ctx, g_fake_auth_key, g_fake_auth_value); - GPR_ASSERT(grpc_metadata_batch_link_tail(exec_ctx, &fake_md, auth_md) == + auth_md->md = grpc_mdelem_from_slices(g_fake_auth_key, g_fake_auth_value); + GPR_ASSERT(grpc_metadata_batch_link_tail(&fake_md, auth_md) == GRPC_ERROR_NONE); fill_in_metadata( - exec_ctx, s, &fake_md, 0, + s, &fake_md, 0, s->recv_initial_md_op->payload->recv_initial_metadata .recv_initial_metadata, s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags, nullptr); - grpc_metadata_batch_destroy(exec_ctx, &fake_md); + grpc_metadata_batch_destroy(&fake_md); err = GRPC_ERROR_NONE; } else { err = GRPC_ERROR_REF(error); @@ -469,14 +458,13 @@ static void fail_helper_locked(grpc_exec_ctx* exec_ctx, inproc_stream* s, INPROC_LOG(GPR_DEBUG, "fail_helper %p scheduling initial-metadata-ready %p %p", s, error, err); - GRPC_CLOSURE_SCHED(exec_ctx, - s->recv_initial_md_op->payload->recv_initial_metadata + GRPC_CLOSURE_SCHED(s->recv_initial_md_op->payload->recv_initial_metadata .recv_initial_metadata_ready, err); // Last use of err so no need to REF and then UNREF it complete_if_batch_end_locked( - exec_ctx, s, error, s->recv_initial_md_op, + s, error, s->recv_initial_md_op, "fail_helper scheduling recv-initial-metadata-on-complete"); s->recv_initial_md_op = nullptr; } @@ -484,22 +472,22 @@ static void fail_helper_locked(grpc_exec_ctx* exec_ctx, inproc_stream* s, INPROC_LOG(GPR_DEBUG, "fail_helper %p scheduling message-ready %p", s, error); GRPC_CLOSURE_SCHED( - exec_ctx, s->recv_message_op->payload->recv_message.recv_message_ready, + s->recv_message_op->payload->recv_message.recv_message_ready, GRPC_ERROR_REF(error)); complete_if_batch_end_locked( - exec_ctx, s, error, s->recv_message_op, + s, error, s->recv_message_op, "fail_helper scheduling recv-message-on-complete"); s->recv_message_op = nullptr; } if (s->send_message_op) { complete_if_batch_end_locked( - exec_ctx, s, error, s->send_message_op, + s, error, s->send_message_op, "fail_helper scheduling send-message-on-complete"); s->send_message_op = nullptr; } if (s->send_trailing_md_op) { complete_if_batch_end_locked( - exec_ctx, s, error, s->send_trailing_md_op, + s, error, s->send_trailing_md_op, "fail_helper scheduling send-trailng-md-on-complete"); s->send_trailing_md_op = nullptr; } @@ -508,23 +496,22 @@ static void fail_helper_locked(grpc_exec_ctx* exec_ctx, inproc_stream* s, "fail_helper %p scheduling trailing-md-on-complete %p", s, error); complete_if_batch_end_locked( - exec_ctx, s, error, s->recv_trailing_md_op, + s, error, s->recv_trailing_md_op, "fail_helper scheduling recv-trailing-metadata-on-complete"); s->recv_trailing_md_op = nullptr; } - close_other_side_locked(exec_ctx, s, "fail_helper:other_side"); - close_stream_locked(exec_ctx, s); + close_other_side_locked(s, "fail_helper:other_side"); + close_stream_locked(s); GRPC_ERROR_UNREF(error); } -static void message_transfer_locked(grpc_exec_ctx* exec_ctx, - inproc_stream* sender, +static void message_transfer_locked(inproc_stream* sender, inproc_stream* receiver) { size_t remaining = sender->send_message_op->payload->send_message.send_message->length; if (receiver->recv_inited) { - grpc_slice_buffer_destroy_internal(exec_ctx, &receiver->recv_message); + grpc_slice_buffer_destroy_internal(&receiver->recv_message); } grpc_slice_buffer_init(&receiver->recv_message); receiver->recv_inited = true; @@ -532,13 +519,13 @@ static void message_transfer_locked(grpc_exec_ctx* exec_ctx, grpc_slice message_slice; grpc_closure unused; GPR_ASSERT(grpc_byte_stream_next( - exec_ctx, sender->send_message_op->payload->send_message.send_message, - SIZE_MAX, &unused)); + sender->send_message_op->payload->send_message.send_message, SIZE_MAX, + &unused)); grpc_error* error = grpc_byte_stream_pull( - exec_ctx, sender->send_message_op->payload->send_message.send_message, + sender->send_message_op->payload->send_message.send_message, &message_slice); if (error != GRPC_ERROR_NONE) { - cancel_stream_locked(exec_ctx, sender, GRPC_ERROR_REF(error)); + cancel_stream_locked(sender, GRPC_ERROR_REF(error)); break; } GPR_ASSERT(error == GRPC_ERROR_NONE); @@ -553,22 +540,20 @@ static void message_transfer_locked(grpc_exec_ctx* exec_ctx, INPROC_LOG(GPR_DEBUG, "message_transfer_locked %p scheduling message-ready", receiver); GRPC_CLOSURE_SCHED( - exec_ctx, receiver->recv_message_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); complete_if_batch_end_locked( - exec_ctx, sender, GRPC_ERROR_NONE, sender->send_message_op, + sender, GRPC_ERROR_NONE, sender->send_message_op, "message_transfer scheduling sender on_complete"); complete_if_batch_end_locked( - exec_ctx, receiver, GRPC_ERROR_NONE, receiver->recv_message_op, + receiver, GRPC_ERROR_NONE, receiver->recv_message_op, "message_transfer scheduling receiver on_complete"); receiver->recv_message_op = nullptr; sender->send_message_op = nullptr; } -static void op_state_machine(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { +static void op_state_machine(void* arg, grpc_error* error) { // This function gets called when we have contents in the unprocessed reads // Get what we want based on our ops wanted // Schedule our appropriate closures @@ -589,26 +574,26 @@ static void op_state_machine(grpc_exec_ctx* exec_ctx, void* arg, inproc_stream* other = s->other_side; if (s->cancel_self_error != GRPC_ERROR_NONE) { - fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(s->cancel_self_error)); + fail_helper_locked(s, GRPC_ERROR_REF(s->cancel_self_error)); goto done; } else if (s->cancel_other_error != GRPC_ERROR_NONE) { - fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(s->cancel_other_error)); + fail_helper_locked(s, GRPC_ERROR_REF(s->cancel_other_error)); goto done; } else if (error != GRPC_ERROR_NONE) { - fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(error)); + fail_helper_locked(s, GRPC_ERROR_REF(error)); goto done; } if (s->send_message_op && other) { if (other->recv_message_op) { - message_transfer_locked(exec_ctx, s, other); - maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE); + message_transfer_locked(s, other); + maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE); } else if (!s->t->is_client && (s->trailing_md_sent || other->recv_trailing_md_op)) { // A server send will never be matched if the client is waiting // for trailing metadata already complete_if_batch_end_locked( - exec_ctx, s, GRPC_ERROR_NONE, s->send_message_op, + s, GRPC_ERROR_NONE, s->send_message_op, "op_state_machine scheduling send-message-on-complete"); s->send_message_op = nullptr; } @@ -630,11 +615,11 @@ static void op_state_machine(grpc_exec_ctx* exec_ctx, void* arg, // The buffer is already in use; that's an error! INPROC_LOG(GPR_DEBUG, "Extra trailing metadata %p", s); new_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra trailing metadata"); - fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err)); + fail_helper_locked(s, GRPC_ERROR_REF(new_err)); goto done; } else { if (!other || !other->closed) { - fill_in_metadata(exec_ctx, s, + fill_in_metadata(s, s->send_trailing_md_op->payload->send_trailing_metadata .send_trailing_metadata, 0, dest, nullptr, destfilled); @@ -643,15 +628,15 @@ static void op_state_machine(grpc_exec_ctx* exec_ctx, void* arg, if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) { INPROC_LOG(GPR_DEBUG, "op_state_machine %p scheduling trailing-md-on-complete", s); - GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete, + GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->on_complete, GRPC_ERROR_NONE); s->recv_trailing_md_op = nullptr; needs_close = true; } } - maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE); + maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE); complete_if_batch_end_locked( - exec_ctx, s, GRPC_ERROR_NONE, s->send_trailing_md_op, + s, GRPC_ERROR_NONE, s->send_trailing_md_op, "op_state_machine scheduling send-trailing-metadata-on-complete"); s->send_trailing_md_op = nullptr; } @@ -664,14 +649,14 @@ static void op_state_machine(grpc_exec_ctx* exec_ctx, void* arg, "op_state_machine %p scheduling on_complete errors for already " "recvd initial md %p", s, new_err); - fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err)); + fail_helper_locked(s, GRPC_ERROR_REF(new_err)); goto done; } if (s->to_read_initial_md_filled) { s->initial_md_recvd = true; new_err = fill_in_metadata( - exec_ctx, s, &s->to_read_initial_md, s->to_read_initial_md_flags, + s, &s->to_read_initial_md, s->to_read_initial_md_flags, s->recv_initial_md_op->payload->recv_initial_metadata .recv_initial_metadata, s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags, @@ -684,17 +669,16 @@ static void op_state_machine(grpc_exec_ctx* exec_ctx, void* arg, .trailing_metadata_available = (other != nullptr && other->send_trailing_md_op != nullptr); } - grpc_metadata_batch_clear(exec_ctx, &s->to_read_initial_md); + grpc_metadata_batch_clear(&s->to_read_initial_md); s->to_read_initial_md_filled = false; INPROC_LOG(GPR_DEBUG, "op_state_machine %p scheduling initial-metadata-ready %p", s, new_err); - GRPC_CLOSURE_SCHED(exec_ctx, - s->recv_initial_md_op->payload->recv_initial_metadata + GRPC_CLOSURE_SCHED(s->recv_initial_md_op->payload->recv_initial_metadata .recv_initial_metadata_ready, GRPC_ERROR_REF(new_err)); complete_if_batch_end_locked( - exec_ctx, s, new_err, s->recv_initial_md_op, + s, new_err, s->recv_initial_md_op, "op_state_machine scheduling recv-initial-metadata-on-complete"); s->recv_initial_md_op = nullptr; @@ -702,20 +686,20 @@ static void op_state_machine(grpc_exec_ctx* exec_ctx, void* arg, INPROC_LOG(GPR_DEBUG, "op_state_machine %p scheduling on_complete errors2 %p", s, new_err); - fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err)); + fail_helper_locked(s, GRPC_ERROR_REF(new_err)); goto done; } } } if (s->recv_message_op) { if (other && other->send_message_op) { - message_transfer_locked(exec_ctx, other, s); - maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE); + message_transfer_locked(other, s); + maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE); } } if (s->recv_trailing_md_op && s->t->is_client && other && other->send_message_op) { - maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE); + maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE); } if (s->to_read_trailing_md_filled) { if (s->trailing_md_recvd) { @@ -726,7 +710,7 @@ static void op_state_machine(grpc_exec_ctx* exec_ctx, void* arg, "op_state_machine %p scheduling on_complete errors for already " "recvd trailing md %p", s, new_err); - fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err)); + fail_helper_locked(s, GRPC_ERROR_REF(new_err)); goto done; } if (s->recv_message_op != nullptr) { @@ -734,11 +718,10 @@ static void op_state_machine(grpc_exec_ctx* exec_ctx, void* arg, // satisfied INPROC_LOG(GPR_DEBUG, "op_state_machine %p scheduling message-ready", s); GRPC_CLOSURE_SCHED( - exec_ctx, s->recv_message_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); complete_if_batch_end_locked( - exec_ctx, s, new_err, s->recv_message_op, + s, new_err, s->recv_message_op, "op_state_machine scheduling recv-message-on-complete"); s->recv_message_op = nullptr; } @@ -746,7 +729,7 @@ static void op_state_machine(grpc_exec_ctx* exec_ctx, void* arg, // Nothing further will try to receive from this stream, so finish off // any outstanding send_message op complete_if_batch_end_locked( - exec_ctx, s, new_err, s->send_message_op, + s, new_err, s->send_message_op, "op_state_machine scheduling send-message-on-complete"); s->send_message_op = nullptr; } @@ -754,11 +737,11 @@ static void op_state_machine(grpc_exec_ctx* exec_ctx, void* arg, // We wanted trailing metadata and we got it s->trailing_md_recvd = true; new_err = - fill_in_metadata(exec_ctx, s, &s->to_read_trailing_md, 0, + fill_in_metadata(s, &s->to_read_trailing_md, 0, s->recv_trailing_md_op->payload ->recv_trailing_metadata.recv_trailing_metadata, nullptr, nullptr); - grpc_metadata_batch_clear(exec_ctx, &s->to_read_trailing_md); + grpc_metadata_batch_clear(&s->to_read_trailing_md); s->to_read_trailing_md_filled = false; // We should schedule the recv_trailing_md_op completion if @@ -770,7 +753,7 @@ static void op_state_machine(grpc_exec_ctx* exec_ctx, void* arg, INPROC_LOG(GPR_DEBUG, "op_state_machine %p scheduling trailing-md-on-complete %p", s, new_err); - GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete, + GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->on_complete, GRPC_ERROR_REF(new_err)); s->recv_trailing_md_op = nullptr; needs_close = true; @@ -791,10 +774,10 @@ static void op_state_machine(grpc_exec_ctx* exec_ctx, void* arg, // recv_message_op INPROC_LOG(GPR_DEBUG, "op_state_machine %p scheduling message-ready", s); GRPC_CLOSURE_SCHED( - exec_ctx, s->recv_message_op->payload->recv_message.recv_message_ready, + s->recv_message_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); complete_if_batch_end_locked( - exec_ctx, s, new_err, s->recv_message_op, + s, new_err, s->recv_message_op, "op_state_machine scheduling recv-message-on-complete"); s->recv_message_op = nullptr; } @@ -803,7 +786,7 @@ static void op_state_machine(grpc_exec_ctx* exec_ctx, void* arg, // Nothing further will try to receive from this stream, so finish off // any outstanding send_message op complete_if_batch_end_locked( - exec_ctx, s, new_err, s->send_message_op, + s, new_err, s->send_message_op, "op_state_machine scheduling send-message-on-complete"); s->send_message_op = nullptr; } @@ -819,22 +802,21 @@ static void op_state_machine(grpc_exec_ctx* exec_ctx, void* arg, } done: if (needs_close) { - close_other_side_locked(exec_ctx, s, "op_state_machine"); - close_stream_locked(exec_ctx, s); + close_other_side_locked(s, "op_state_machine"); + close_stream_locked(s); } gpr_mu_unlock(mu); GRPC_ERROR_UNREF(new_err); } -static bool cancel_stream_locked(grpc_exec_ctx* exec_ctx, inproc_stream* s, - grpc_error* error) { +static bool cancel_stream_locked(inproc_stream* s, grpc_error* error) { bool ret = false; // was the cancel accepted INPROC_LOG(GPR_DEBUG, "cancel_stream %p with %s", s, grpc_error_string(error)); if (s->cancel_self_error == GRPC_ERROR_NONE) { ret = true; s->cancel_self_error = GRPC_ERROR_REF(error); - maybe_schedule_op_closure_locked(exec_ctx, s, s->cancel_self_error); + maybe_schedule_op_closure_locked(s, s->cancel_self_error); // Send trailing md to the other side indicating cancellation, even if we // already have s->trailing_md_sent = true; @@ -848,15 +830,14 @@ static bool cancel_stream_locked(grpc_exec_ctx* exec_ctx, inproc_stream* s, : &other->to_read_trailing_md; bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled : &other->to_read_trailing_md_filled; - fill_in_metadata(exec_ctx, s, &cancel_md, 0, dest, nullptr, destfilled); - grpc_metadata_batch_destroy(exec_ctx, &cancel_md); + fill_in_metadata(s, &cancel_md, 0, dest, nullptr, destfilled); + grpc_metadata_batch_destroy(&cancel_md); if (other != nullptr) { if (other->cancel_other_error == GRPC_ERROR_NONE) { other->cancel_other_error = GRPC_ERROR_REF(s->cancel_self_error); } - maybe_schedule_op_closure_locked(exec_ctx, other, - other->cancel_other_error); + maybe_schedule_op_closure_locked(other, other->cancel_other_error); } else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) { s->write_buffer_cancel_error = GRPC_ERROR_REF(s->cancel_self_error); } @@ -866,21 +847,20 @@ static bool cancel_stream_locked(grpc_exec_ctx* exec_ctx, inproc_stream* s, // md, now's the chance if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) { complete_if_batch_end_locked( - exec_ctx, s, s->cancel_self_error, s->recv_trailing_md_op, + s, s->cancel_self_error, s->recv_trailing_md_op, "cancel_stream scheduling trailing-md-on-complete"); s->recv_trailing_md_op = nullptr; } } - close_other_side_locked(exec_ctx, s, "cancel_stream:other_side"); - close_stream_locked(exec_ctx, s); + close_other_side_locked(s, "cancel_stream:other_side"); + close_stream_locked(s); GRPC_ERROR_UNREF(error); return ret; } -static void perform_stream_op(grpc_exec_ctx* exec_ctx, grpc_transport* gt, - grpc_stream* gs, +static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, grpc_transport_stream_op_batch* op) { INPROC_LOG(GPR_DEBUG, "perform_stream_op %p %p %p", gt, gs, op); inproc_stream* s = (inproc_stream*)gs; @@ -906,7 +886,7 @@ static void perform_stream_op(grpc_exec_ctx* exec_ctx, grpc_transport* gt, if (op->cancel_stream) { // Call cancel_stream_locked without ref'ing the cancel_error because // this function is responsible to make sure that that field gets unref'ed - cancel_stream_locked(exec_ctx, s, op->payload->cancel_stream.cancel_error); + cancel_stream_locked(s, op->payload->cancel_stream.cancel_error); // this op can complete without an error } else if (s->cancel_self_error != GRPC_ERROR_NONE) { // already self-canceled so still give it an error @@ -946,8 +926,7 @@ static void perform_stream_op(grpc_exec_ctx* exec_ctx, grpc_transport* gt, } else { if (!other || !other->closed) { fill_in_metadata( - exec_ctx, s, - op->payload->send_initial_metadata.send_initial_metadata, + s, op->payload->send_initial_metadata.send_initial_metadata, op->payload->send_initial_metadata.send_initial_metadata_flags, dest, destflags, destfilled); } @@ -959,7 +938,7 @@ static void perform_stream_op(grpc_exec_ctx* exec_ctx, grpc_transport* gt, s->initial_md_sent = true; } } - maybe_schedule_op_closure_locked(exec_ctx, other, error); + maybe_schedule_op_closure_locked(other, error); } } @@ -999,7 +978,7 @@ static void perform_stream_op(grpc_exec_ctx* exec_ctx, grpc_transport* gt, (op->recv_message && other && (other->send_message_op != nullptr)) || (s->to_read_trailing_md_filled || s->trailing_md_recvd)) { if (!s->op_closure_scheduled) { - GRPC_CLOSURE_SCHED(exec_ctx, &s->op_closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(&s->op_closure, GRPC_ERROR_NONE); s->op_closure_scheduled = true; } } else { @@ -1023,7 +1002,6 @@ static void perform_stream_op(grpc_exec_ctx* exec_ctx, grpc_transport* gt, "perform_stream_op error %p scheduling initial-metadata-ready %p", s, error); GRPC_CLOSURE_SCHED( - exec_ctx, op->payload->recv_initial_metadata.recv_initial_metadata_ready, GRPC_ERROR_REF(error)); } @@ -1032,28 +1010,26 @@ static void perform_stream_op(grpc_exec_ctx* exec_ctx, grpc_transport* gt, GPR_DEBUG, "perform_stream_op error %p scheduling recv message-ready %p", s, error); - GRPC_CLOSURE_SCHED(exec_ctx, - op->payload->recv_message.recv_message_ready, + GRPC_CLOSURE_SCHED(op->payload->recv_message.recv_message_ready, GRPC_ERROR_REF(error)); } } INPROC_LOG(GPR_DEBUG, "perform_stream_op %p scheduling on_complete %p", s, error); - GRPC_CLOSURE_SCHED(exec_ctx, on_complete, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(on_complete, GRPC_ERROR_REF(error)); } if (needs_close) { - close_other_side_locked(exec_ctx, s, "perform_stream_op:other_side"); - close_stream_locked(exec_ctx, s); + close_other_side_locked(s, "perform_stream_op:other_side"); + close_stream_locked(s); } gpr_mu_unlock(mu); GRPC_ERROR_UNREF(error); } -static void close_transport_locked(grpc_exec_ctx* exec_ctx, - inproc_transport* t) { +static void close_transport_locked(inproc_transport* t) { INPROC_LOG(GPR_DEBUG, "close_transport %p %d", t, t->is_closed); grpc_connectivity_state_set( - exec_ctx, &t->connectivity, GRPC_CHANNEL_SHUTDOWN, + &t->connectivity, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Closing transport."), "close transport"); if (!t->is_closed) { @@ -1062,7 +1038,7 @@ static void close_transport_locked(grpc_exec_ctx* exec_ctx, while (t->stream_list != nullptr) { // cancel_stream_locked also adjusts stream list cancel_stream_locked( - exec_ctx, t->stream_list, + t->stream_list, grpc_error_set_int( GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport closed"), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); @@ -1070,14 +1046,13 @@ static void close_transport_locked(grpc_exec_ctx* exec_ctx, } } -static void perform_transport_op(grpc_exec_ctx* exec_ctx, grpc_transport* gt, - grpc_transport_op* op) { +static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) { inproc_transport* t = (inproc_transport*)gt; INPROC_LOG(GPR_DEBUG, "perform_transport_op %p %p", t, op); gpr_mu_lock(&t->mu->mu); if (op->on_connectivity_state_change) { grpc_connectivity_state_notify_on_state_change( - exec_ctx, &t->connectivity, op->connectivity_state, + &t->connectivity, op->connectivity_state, op->on_connectivity_state_change); } if (op->set_accept_stream) { @@ -1085,7 +1060,7 @@ static void perform_transport_op(grpc_exec_ctx* exec_ctx, grpc_transport* gt, t->accept_stream_data = op->set_accept_stream_user_data; } if (op->on_consumed) { - GRPC_CLOSURE_SCHED(exec_ctx, op->on_consumed, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE); } bool do_close = false; @@ -1099,71 +1074,67 @@ static void perform_transport_op(grpc_exec_ctx* exec_ctx, grpc_transport* gt, } if (do_close) { - close_transport_locked(exec_ctx, t); + close_transport_locked(t); } gpr_mu_unlock(&t->mu->mu); } -static void destroy_stream(grpc_exec_ctx* exec_ctx, grpc_transport* gt, - grpc_stream* gs, +static void destroy_stream(grpc_transport* gt, grpc_stream* gs, grpc_closure* then_schedule_closure) { INPROC_LOG(GPR_DEBUG, "destroy_stream %p %p", gs, then_schedule_closure); inproc_stream* s = (inproc_stream*)gs; s->closure_at_destroy = then_schedule_closure; - really_destroy_stream(exec_ctx, s); + really_destroy_stream(s); } -static void destroy_transport(grpc_exec_ctx* exec_ctx, grpc_transport* gt) { +static void destroy_transport(grpc_transport* gt) { inproc_transport* t = (inproc_transport*)gt; INPROC_LOG(GPR_DEBUG, "destroy_transport %p", t); gpr_mu_lock(&t->mu->mu); - close_transport_locked(exec_ctx, t); + close_transport_locked(t); gpr_mu_unlock(&t->mu->mu); - unref_transport(exec_ctx, t->other_side); - unref_transport(exec_ctx, t); + unref_transport(t->other_side); + unref_transport(t); } /******************************************************************************* * INTEGRATION GLUE */ -static void set_pollset(grpc_exec_ctx* exec_ctx, grpc_transport* gt, - grpc_stream* gs, grpc_pollset* pollset) { +static void set_pollset(grpc_transport* gt, grpc_stream* gs, + grpc_pollset* pollset) { // Nothing to do here } -static void set_pollset_set(grpc_exec_ctx* exec_ctx, grpc_transport* gt, - grpc_stream* gs, grpc_pollset_set* pollset_set) { +static void set_pollset_set(grpc_transport* gt, grpc_stream* gs, + grpc_pollset_set* pollset_set) { // Nothing to do here } -static grpc_endpoint* get_endpoint(grpc_exec_ctx* exec_ctx, grpc_transport* t) { - return nullptr; -} +static grpc_endpoint* get_endpoint(grpc_transport* t) { return nullptr; } /******************************************************************************* * GLOBAL INIT AND DESTROY */ -static void do_nothing(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {} +static void do_nothing(void* arg, grpc_error* error) {} void grpc_inproc_transport_init(void) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_core::ExecCtx exec_ctx; GRPC_CLOSURE_INIT(&do_nothing_closure, do_nothing, nullptr, grpc_schedule_on_exec_ctx); g_empty_slice = grpc_slice_from_static_buffer(nullptr, 0); grpc_slice key_tmp = grpc_slice_from_static_string(":path"); g_fake_path_key = grpc_slice_intern(key_tmp); - grpc_slice_unref_internal(&exec_ctx, key_tmp); + grpc_slice_unref_internal(key_tmp); g_fake_path_value = grpc_slice_from_static_string("/"); grpc_slice auth_tmp = grpc_slice_from_static_string(":authority"); g_fake_auth_key = grpc_slice_intern(auth_tmp); - grpc_slice_unref_internal(&exec_ctx, auth_tmp); + grpc_slice_unref_internal(auth_tmp); g_fake_auth_value = grpc_slice_from_static_string("inproc-fail"); - grpc_exec_ctx_finish(&exec_ctx); } static const grpc_transport_vtable inproc_vtable = { @@ -1175,8 +1146,7 @@ static const grpc_transport_vtable inproc_vtable = { /******************************************************************************* * Main inproc transport functions */ -static void inproc_transports_create(grpc_exec_ctx* exec_ctx, - grpc_transport** server_transport, +static void inproc_transports_create(grpc_transport** server_transport, const grpc_channel_args* server_args, grpc_transport** client_transport, const grpc_channel_args* client_args) { @@ -1213,7 +1183,7 @@ grpc_channel* grpc_inproc_channel_create(grpc_server* server, GRPC_API_TRACE("grpc_inproc_channel_create(server=%p, args=%p)", 2, (server, args)); - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_core::ExecCtx exec_ctx; const grpc_channel_args* server_args = grpc_server_get_channel_args(server); @@ -1228,30 +1198,26 @@ grpc_channel* grpc_inproc_channel_create(grpc_server* server, grpc_transport* server_transport; grpc_transport* client_transport; - inproc_transports_create(&exec_ctx, &server_transport, server_args, - &client_transport, client_args); + inproc_transports_create(&server_transport, server_args, &client_transport, + client_args); - grpc_server_setup_transport(&exec_ctx, server, server_transport, nullptr, - server_args); - grpc_channel* channel = - grpc_channel_create(&exec_ctx, "inproc", client_args, - GRPC_CLIENT_DIRECT_CHANNEL, client_transport); + grpc_server_setup_transport(server, server_transport, nullptr, server_args); + grpc_channel* channel = grpc_channel_create( + "inproc", client_args, GRPC_CLIENT_DIRECT_CHANNEL, client_transport); // Free up created channel args - grpc_channel_args_destroy(&exec_ctx, client_args); + grpc_channel_args_destroy(client_args); // Now finish scheduled operations - grpc_exec_ctx_finish(&exec_ctx); return channel; } void grpc_inproc_transport_shutdown(void) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_slice_unref_internal(&exec_ctx, g_empty_slice); - grpc_slice_unref_internal(&exec_ctx, g_fake_path_key); - grpc_slice_unref_internal(&exec_ctx, g_fake_path_value); - grpc_slice_unref_internal(&exec_ctx, g_fake_auth_key); - grpc_slice_unref_internal(&exec_ctx, g_fake_auth_value); - grpc_exec_ctx_finish(&exec_ctx); + grpc_core::ExecCtx exec_ctx; + grpc_slice_unref_internal(g_empty_slice); + grpc_slice_unref_internal(g_fake_path_key); + grpc_slice_unref_internal(g_fake_path_value); + grpc_slice_unref_internal(g_fake_auth_key); + grpc_slice_unref_internal(g_fake_auth_value); } -- cgit v1.2.3