diff options
Diffstat (limited to 'src/core/lib/transport/transport.cc')
-rw-r--r-- | src/core/lib/transport/transport.cc | 84 |
1 files changed, 35 insertions, 49 deletions
diff --git a/src/core/lib/transport/transport.cc b/src/core/lib/transport/transport.cc index 5bda1541a6..08aee04ac9 100644 --- a/src/core/lib/transport/transport.cc +++ b/src/core/lib/transport/transport.cc @@ -49,8 +49,7 @@ void grpc_stream_ref(grpc_stream_refcount* refcount) { } #ifndef NDEBUG -void grpc_stream_unref(grpc_exec_ctx* exec_ctx, grpc_stream_refcount* refcount, - const char* reason) { +void grpc_stream_unref(grpc_stream_refcount* refcount, const char* reason) { if (grpc_trace_stream_refcount.enabled()) { gpr_atm val = gpr_atm_no_barrier_load(&refcount->refs.count); gpr_log(GPR_DEBUG, "%s %p:%p UNREF %" PRIdPTR "->%" PRIdPTR " %s", @@ -58,11 +57,11 @@ void grpc_stream_unref(grpc_exec_ctx* exec_ctx, grpc_stream_refcount* refcount, val - 1, reason); } #else -void grpc_stream_unref(grpc_exec_ctx* exec_ctx, - grpc_stream_refcount* refcount) { +void grpc_stream_unref(grpc_stream_refcount* refcount) { #endif if (gpr_unref(&refcount->refs)) { - if (exec_ctx->flags & GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP) { + if (grpc_core::ExecCtx::Get()->flags() & + GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP) { /* Ick. The thread we're running on MAY be owned (indirectly) by a call-stack. If that's the case, destroying the call-stack MAY try to destroy the @@ -73,7 +72,7 @@ void grpc_stream_unref(grpc_exec_ctx* exec_ctx, refcount->destroy.scheduler = grpc_executor_scheduler(GRPC_EXECUTOR_SHORT); } - GRPC_CLOSURE_SCHED(exec_ctx, &refcount->destroy, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(&refcount->destroy, GRPC_ERROR_NONE); } } @@ -89,11 +88,11 @@ static void slice_stream_ref(void* p) { #endif } -static void slice_stream_unref(grpc_exec_ctx* exec_ctx, void* p) { +static void slice_stream_unref(void* p) { #ifndef NDEBUG - grpc_stream_unref(exec_ctx, STREAM_REF_FROM_SLICE_REF(p), "slice"); + grpc_stream_unref(STREAM_REF_FROM_SLICE_REF(p), "slice"); #else - grpc_stream_unref(exec_ctx, STREAM_REF_FROM_SLICE_REF(p)); + grpc_stream_unref(STREAM_REF_FROM_SLICE_REF(p)); #endif } @@ -151,59 +150,50 @@ size_t grpc_transport_stream_size(grpc_transport* transport) { return transport->vtable->sizeof_stream; } -void grpc_transport_destroy(grpc_exec_ctx* exec_ctx, - grpc_transport* transport) { - transport->vtable->destroy(exec_ctx, transport); +void grpc_transport_destroy(grpc_transport* transport) { + transport->vtable->destroy(transport); } -int grpc_transport_init_stream(grpc_exec_ctx* exec_ctx, - grpc_transport* transport, grpc_stream* stream, +int grpc_transport_init_stream(grpc_transport* transport, grpc_stream* stream, grpc_stream_refcount* refcount, const void* server_data, gpr_arena* arena) { - return transport->vtable->init_stream(exec_ctx, transport, stream, refcount, + return transport->vtable->init_stream(transport, stream, refcount, server_data, arena); } -void grpc_transport_perform_stream_op(grpc_exec_ctx* exec_ctx, - grpc_transport* transport, +void grpc_transport_perform_stream_op(grpc_transport* transport, grpc_stream* stream, grpc_transport_stream_op_batch* op) { - transport->vtable->perform_stream_op(exec_ctx, transport, stream, op); + transport->vtable->perform_stream_op(transport, stream, op); } -void grpc_transport_perform_op(grpc_exec_ctx* exec_ctx, - grpc_transport* transport, +void grpc_transport_perform_op(grpc_transport* transport, grpc_transport_op* op) { - transport->vtable->perform_op(exec_ctx, transport, op); + transport->vtable->perform_op(transport, op); } -void grpc_transport_set_pops(grpc_exec_ctx* exec_ctx, grpc_transport* transport, - grpc_stream* stream, +void grpc_transport_set_pops(grpc_transport* transport, grpc_stream* stream, grpc_polling_entity* pollent) { grpc_pollset* pollset; grpc_pollset_set* pollset_set; if ((pollset = grpc_polling_entity_pollset(pollent)) != nullptr) { - transport->vtable->set_pollset(exec_ctx, transport, stream, pollset); + transport->vtable->set_pollset(transport, stream, pollset); } else if ((pollset_set = grpc_polling_entity_pollset_set(pollent)) != nullptr) { - transport->vtable->set_pollset_set(exec_ctx, transport, stream, - pollset_set); + transport->vtable->set_pollset_set(transport, stream, pollset_set); } else { abort(); } } -void grpc_transport_destroy_stream(grpc_exec_ctx* exec_ctx, - grpc_transport* transport, +void grpc_transport_destroy_stream(grpc_transport* transport, grpc_stream* stream, grpc_closure* then_schedule_closure) { - transport->vtable->destroy_stream(exec_ctx, transport, stream, - then_schedule_closure); + transport->vtable->destroy_stream(transport, stream, then_schedule_closure); } -grpc_endpoint* grpc_transport_get_endpoint(grpc_exec_ctx* exec_ctx, - grpc_transport* transport) { - return transport->vtable->get_endpoint(exec_ctx, transport); +grpc_endpoint* grpc_transport_get_endpoint(grpc_transport* transport) { + return transport->vtable->get_endpoint(transport); } // This comment should be sung to the tune of @@ -214,25 +204,23 @@ grpc_endpoint* grpc_transport_get_endpoint(grpc_exec_ctx* exec_ctx, // though it lives in lib, it handles transport stream ops sure // it's grpc_transport_stream_op_batch_finish_with_failure void grpc_transport_stream_op_batch_finish_with_failure( - grpc_exec_ctx* exec_ctx, grpc_transport_stream_op_batch* batch, - grpc_error* error, grpc_call_combiner* call_combiner) { + grpc_transport_stream_op_batch* batch, grpc_error* error, + grpc_call_combiner* call_combiner) { if (batch->send_message) { - grpc_byte_stream_destroy(exec_ctx, - batch->payload->send_message.send_message); + grpc_byte_stream_destroy(batch->payload->send_message.send_message); } if (batch->recv_message) { - GRPC_CALL_COMBINER_START(exec_ctx, call_combiner, - batch->payload->recv_message.recv_message_ready, - GRPC_ERROR_REF(error), - "failing recv_message_ready"); + GRPC_CALL_COMBINER_START( + call_combiner, batch->payload->recv_message.recv_message_ready, + GRPC_ERROR_REF(error), "failing recv_message_ready"); } if (batch->recv_initial_metadata) { GRPC_CALL_COMBINER_START( - exec_ctx, call_combiner, + call_combiner, batch->payload->recv_initial_metadata.recv_initial_metadata_ready, GRPC_ERROR_REF(error), "failing recv_initial_metadata_ready"); } - GRPC_CLOSURE_SCHED(exec_ctx, batch->on_complete, error); + GRPC_CLOSURE_SCHED(batch->on_complete, error); if (batch->cancel_stream) { GRPC_ERROR_UNREF(batch->payload->cancel_stream.cancel_error); } @@ -244,10 +232,9 @@ typedef struct { grpc_transport_op op; } made_transport_op; -static void destroy_made_transport_op(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { +static void destroy_made_transport_op(void* arg, grpc_error* error) { made_transport_op* op = (made_transport_op*)arg; - GRPC_CLOSURE_SCHED(exec_ctx, op->inner_on_complete, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(op->inner_on_complete, GRPC_ERROR_REF(error)); gpr_free(op); } @@ -268,12 +255,11 @@ typedef struct { grpc_transport_stream_op_batch_payload payload; } made_transport_stream_op; -static void destroy_made_transport_stream_op(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { +static void destroy_made_transport_stream_op(void* arg, grpc_error* error) { made_transport_stream_op* op = (made_transport_stream_op*)arg; grpc_closure* c = op->inner_on_complete; gpr_free(op); - GRPC_CLOSURE_RUN(exec_ctx, c, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_RUN(c, GRPC_ERROR_REF(error)); } grpc_transport_stream_op_batch* grpc_make_transport_stream_op( |