diff options
Diffstat (limited to 'src/core/lib/transport/transport.cc')
-rw-r--r-- | src/core/lib/transport/transport.cc | 84 |
1 files changed, 49 insertions, 35 deletions
diff --git a/src/core/lib/transport/transport.cc b/src/core/lib/transport/transport.cc index 08aee04ac9..5bda1541a6 100644 --- a/src/core/lib/transport/transport.cc +++ b/src/core/lib/transport/transport.cc @@ -49,7 +49,8 @@ void grpc_stream_ref(grpc_stream_refcount* refcount) { } #ifndef NDEBUG -void grpc_stream_unref(grpc_stream_refcount* refcount, const char* reason) { +void grpc_stream_unref(grpc_exec_ctx* exec_ctx, grpc_stream_refcount* refcount, + const char* reason) { if (grpc_trace_stream_refcount.enabled()) { gpr_atm val = gpr_atm_no_barrier_load(&refcount->refs.count); gpr_log(GPR_DEBUG, "%s %p:%p UNREF %" PRIdPTR "->%" PRIdPTR " %s", @@ -57,11 +58,11 @@ void grpc_stream_unref(grpc_stream_refcount* refcount, const char* reason) { val - 1, reason); } #else -void grpc_stream_unref(grpc_stream_refcount* refcount) { +void grpc_stream_unref(grpc_exec_ctx* exec_ctx, + grpc_stream_refcount* refcount) { #endif if (gpr_unref(&refcount->refs)) { - if (grpc_core::ExecCtx::Get()->flags() & - GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP) { + if (exec_ctx->flags & GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP) { /* Ick. The thread we're running on MAY be owned (indirectly) by a call-stack. If that's the case, destroying the call-stack MAY try to destroy the @@ -72,7 +73,7 @@ void grpc_stream_unref(grpc_stream_refcount* refcount) { refcount->destroy.scheduler = grpc_executor_scheduler(GRPC_EXECUTOR_SHORT); } - GRPC_CLOSURE_SCHED(&refcount->destroy, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, &refcount->destroy, GRPC_ERROR_NONE); } } @@ -88,11 +89,11 @@ static void slice_stream_ref(void* p) { #endif } -static void slice_stream_unref(void* p) { +static void slice_stream_unref(grpc_exec_ctx* exec_ctx, void* p) { #ifndef NDEBUG - grpc_stream_unref(STREAM_REF_FROM_SLICE_REF(p), "slice"); + grpc_stream_unref(exec_ctx, STREAM_REF_FROM_SLICE_REF(p), "slice"); #else - grpc_stream_unref(STREAM_REF_FROM_SLICE_REF(p)); + grpc_stream_unref(exec_ctx, STREAM_REF_FROM_SLICE_REF(p)); #endif } @@ -150,50 +151,59 @@ size_t grpc_transport_stream_size(grpc_transport* transport) { return transport->vtable->sizeof_stream; } -void grpc_transport_destroy(grpc_transport* transport) { - transport->vtable->destroy(transport); +void grpc_transport_destroy(grpc_exec_ctx* exec_ctx, + grpc_transport* transport) { + transport->vtable->destroy(exec_ctx, transport); } -int grpc_transport_init_stream(grpc_transport* transport, grpc_stream* stream, +int grpc_transport_init_stream(grpc_exec_ctx* exec_ctx, + grpc_transport* transport, grpc_stream* stream, grpc_stream_refcount* refcount, const void* server_data, gpr_arena* arena) { - return transport->vtable->init_stream(transport, stream, refcount, + return transport->vtable->init_stream(exec_ctx, transport, stream, refcount, server_data, arena); } -void grpc_transport_perform_stream_op(grpc_transport* transport, +void grpc_transport_perform_stream_op(grpc_exec_ctx* exec_ctx, + grpc_transport* transport, grpc_stream* stream, grpc_transport_stream_op_batch* op) { - transport->vtable->perform_stream_op(transport, stream, op); + transport->vtable->perform_stream_op(exec_ctx, transport, stream, op); } -void grpc_transport_perform_op(grpc_transport* transport, +void grpc_transport_perform_op(grpc_exec_ctx* exec_ctx, + grpc_transport* transport, grpc_transport_op* op) { - transport->vtable->perform_op(transport, op); + transport->vtable->perform_op(exec_ctx, transport, op); } -void grpc_transport_set_pops(grpc_transport* transport, grpc_stream* stream, +void grpc_transport_set_pops(grpc_exec_ctx* exec_ctx, grpc_transport* transport, + grpc_stream* stream, grpc_polling_entity* pollent) { grpc_pollset* pollset; grpc_pollset_set* pollset_set; if ((pollset = grpc_polling_entity_pollset(pollent)) != nullptr) { - transport->vtable->set_pollset(transport, stream, pollset); + transport->vtable->set_pollset(exec_ctx, transport, stream, pollset); } else if ((pollset_set = grpc_polling_entity_pollset_set(pollent)) != nullptr) { - transport->vtable->set_pollset_set(transport, stream, pollset_set); + transport->vtable->set_pollset_set(exec_ctx, transport, stream, + pollset_set); } else { abort(); } } -void grpc_transport_destroy_stream(grpc_transport* transport, +void grpc_transport_destroy_stream(grpc_exec_ctx* exec_ctx, + grpc_transport* transport, grpc_stream* stream, grpc_closure* then_schedule_closure) { - transport->vtable->destroy_stream(transport, stream, then_schedule_closure); + transport->vtable->destroy_stream(exec_ctx, transport, stream, + then_schedule_closure); } -grpc_endpoint* grpc_transport_get_endpoint(grpc_transport* transport) { - return transport->vtable->get_endpoint(transport); +grpc_endpoint* grpc_transport_get_endpoint(grpc_exec_ctx* exec_ctx, + grpc_transport* transport) { + return transport->vtable->get_endpoint(exec_ctx, transport); } // This comment should be sung to the tune of @@ -204,23 +214,25 @@ grpc_endpoint* grpc_transport_get_endpoint(grpc_transport* transport) { // though it lives in lib, it handles transport stream ops sure // it's grpc_transport_stream_op_batch_finish_with_failure void grpc_transport_stream_op_batch_finish_with_failure( - grpc_transport_stream_op_batch* batch, grpc_error* error, - grpc_call_combiner* call_combiner) { + grpc_exec_ctx* exec_ctx, grpc_transport_stream_op_batch* batch, + grpc_error* error, grpc_call_combiner* call_combiner) { if (batch->send_message) { - grpc_byte_stream_destroy(batch->payload->send_message.send_message); + grpc_byte_stream_destroy(exec_ctx, + batch->payload->send_message.send_message); } if (batch->recv_message) { - GRPC_CALL_COMBINER_START( - call_combiner, batch->payload->recv_message.recv_message_ready, - GRPC_ERROR_REF(error), "failing recv_message_ready"); + GRPC_CALL_COMBINER_START(exec_ctx, call_combiner, + batch->payload->recv_message.recv_message_ready, + GRPC_ERROR_REF(error), + "failing recv_message_ready"); } if (batch->recv_initial_metadata) { GRPC_CALL_COMBINER_START( - call_combiner, + exec_ctx, call_combiner, batch->payload->recv_initial_metadata.recv_initial_metadata_ready, GRPC_ERROR_REF(error), "failing recv_initial_metadata_ready"); } - GRPC_CLOSURE_SCHED(batch->on_complete, error); + GRPC_CLOSURE_SCHED(exec_ctx, batch->on_complete, error); if (batch->cancel_stream) { GRPC_ERROR_UNREF(batch->payload->cancel_stream.cancel_error); } @@ -232,9 +244,10 @@ typedef struct { grpc_transport_op op; } made_transport_op; -static void destroy_made_transport_op(void* arg, grpc_error* error) { +static void destroy_made_transport_op(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { made_transport_op* op = (made_transport_op*)arg; - GRPC_CLOSURE_SCHED(op->inner_on_complete, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(exec_ctx, op->inner_on_complete, GRPC_ERROR_REF(error)); gpr_free(op); } @@ -255,11 +268,12 @@ typedef struct { grpc_transport_stream_op_batch_payload payload; } made_transport_stream_op; -static void destroy_made_transport_stream_op(void* arg, grpc_error* error) { +static void destroy_made_transport_stream_op(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { made_transport_stream_op* op = (made_transport_stream_op*)arg; grpc_closure* c = op->inner_on_complete; gpr_free(op); - GRPC_CLOSURE_RUN(c, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_RUN(exec_ctx, c, GRPC_ERROR_REF(error)); } grpc_transport_stream_op_batch* grpc_make_transport_stream_op( |