diff options
Diffstat (limited to 'src/core/ext/transport/cronet/transport/cronet_transport.cc')
-rw-r--r-- | src/core/ext/transport/cronet/transport/cronet_transport.cc | 173 |
1 files changed, 104 insertions, 69 deletions
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.cc b/src/core/ext/transport/cronet/transport/cronet_transport.cc index c9fd94176b..4d24efe47b 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.cc +++ b/src/core/ext/transport/cronet/transport/cronet_transport.cc @@ -197,23 +197,27 @@ typedef struct stream_obj stream_obj; #ifndef NDEBUG #define GRPC_CRONET_STREAM_REF(stream, reason) \ grpc_cronet_stream_ref((stream), (reason)) -#define GRPC_CRONET_STREAM_UNREF(stream, reason) \ - grpc_cronet_stream_unref((stream), (reason)) +#define GRPC_CRONET_STREAM_UNREF(exec_ctx, stream, reason) \ + grpc_cronet_stream_unref((exec_ctx), (stream), (reason)) void grpc_cronet_stream_ref(stream_obj* s, const char* reason) { grpc_stream_ref(s->refcount, reason); } -void grpc_cronet_stream_unref(stream_obj* s, const char* reason) { - grpc_stream_unref(s->refcount, reason); +void grpc_cronet_stream_unref(grpc_exec_ctx* exec_ctx, stream_obj* s, + const char* reason) { + grpc_stream_unref(exec_ctx, s->refcount, reason); } #else #define GRPC_CRONET_STREAM_REF(stream, reason) grpc_cronet_stream_ref((stream)) -#define GRPC_CRONET_STREAM_UNREF(stream, reason) \ - grpc_cronet_stream_unref((stream)) +#define GRPC_CRONET_STREAM_UNREF(exec_ctx, stream, reason) \ + grpc_cronet_stream_unref((exec_ctx), (stream)) void grpc_cronet_stream_ref(stream_obj* s) { grpc_stream_ref(s->refcount); } -void grpc_cronet_stream_unref(stream_obj* s) { grpc_stream_unref(s->refcount); } +void grpc_cronet_stream_unref(grpc_exec_ctx* exec_ctx, stream_obj* s) { + grpc_stream_unref(exec_ctx, s->refcount); +} #endif -static enum e_op_result execute_stream_op(struct op_and_state* oas); +static enum e_op_result execute_stream_op(grpc_exec_ctx* exec_ctx, + struct op_and_state* oas); /* Utility function to translate enum into string for printing @@ -369,12 +373,12 @@ static void remove_from_storage(struct stream_obj* s, This can get executed from the Cronet network thread via cronet callback or on the application supplied thread via the perform_stream_op function. */ -static void execute_from_storage(stream_obj* s) { +static void execute_from_storage(grpc_exec_ctx* exec_ctx, stream_obj* s) { gpr_mu_lock(&s->mu); for (struct op_and_state* curr = s->storage.head; curr != nullptr;) { CRONET_LOG(GPR_DEBUG, "calling op at %p. done = %d", curr, curr->done); GPR_ASSERT(curr->done == 0); - enum e_op_result result = execute_stream_op(curr); + enum e_op_result result = execute_stream_op(exec_ctx, curr); CRONET_LOG(GPR_DEBUG, "execute_stream_op[%p] returns %s", curr, op_result_string(result)); /* if this op is done, then remove it and free memory */ @@ -398,7 +402,7 @@ static void execute_from_storage(stream_obj* s) { */ static void on_failed(bidirectional_stream* stream, int net_error) { CRONET_LOG(GPR_DEBUG, "on_failed(%p, %d)", stream, net_error); - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; stream_obj* s = (stream_obj*)stream->annotation; gpr_mu_lock(&s->mu); @@ -415,8 +419,9 @@ static void on_failed(bidirectional_stream* stream, int net_error) { } null_and_maybe_free_read_buffer(s); gpr_mu_unlock(&s->mu); - execute_from_storage(s); - GRPC_CRONET_STREAM_UNREF(s, "cronet transport"); + execute_from_storage(&exec_ctx, s); + GRPC_CRONET_STREAM_UNREF(&exec_ctx, s, "cronet transport"); + grpc_exec_ctx_finish(&exec_ctx); } /* @@ -424,7 +429,7 @@ static void on_failed(bidirectional_stream* stream, int net_error) { */ static void on_canceled(bidirectional_stream* stream) { CRONET_LOG(GPR_DEBUG, "on_canceled(%p)", stream); - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; stream_obj* s = (stream_obj*)stream->annotation; gpr_mu_lock(&s->mu); @@ -441,8 +446,9 @@ static void on_canceled(bidirectional_stream* stream) { } null_and_maybe_free_read_buffer(s); gpr_mu_unlock(&s->mu); - execute_from_storage(s); - GRPC_CRONET_STREAM_UNREF(s, "cronet transport"); + execute_from_storage(&exec_ctx, s); + GRPC_CRONET_STREAM_UNREF(&exec_ctx, s, "cronet transport"); + grpc_exec_ctx_finish(&exec_ctx); } /* @@ -450,7 +456,7 @@ static void on_canceled(bidirectional_stream* stream) { */ static void on_succeeded(bidirectional_stream* stream) { CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream); - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; stream_obj* s = (stream_obj*)stream->annotation; gpr_mu_lock(&s->mu); @@ -459,8 +465,9 @@ static void on_succeeded(bidirectional_stream* stream) { s->cbs = nullptr; null_and_maybe_free_read_buffer(s); gpr_mu_unlock(&s->mu); - execute_from_storage(s); - GRPC_CRONET_STREAM_UNREF(s, "cronet transport"); + execute_from_storage(&exec_ctx, s); + GRPC_CRONET_STREAM_UNREF(&exec_ctx, s, "cronet transport"); + grpc_exec_ctx_finish(&exec_ctx); } /* @@ -468,7 +475,7 @@ static void on_succeeded(bidirectional_stream* stream) { */ static void on_stream_ready(bidirectional_stream* stream) { CRONET_LOG(GPR_DEBUG, "W: on_stream_ready(%p)", stream); - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; stream_obj* s = (stream_obj*)stream->annotation; grpc_cronet_transport* t = (grpc_cronet_transport*)s->curr_ct; gpr_mu_lock(&s->mu); @@ -488,7 +495,8 @@ static void on_stream_ready(bidirectional_stream* stream) { } } gpr_mu_unlock(&s->mu); - execute_from_storage(s); + execute_from_storage(&exec_ctx, s); + grpc_exec_ctx_finish(&exec_ctx); } /* @@ -498,7 +506,7 @@ static void on_response_headers_received( bidirectional_stream* stream, const bidirectional_stream_header_array* headers, const char* negotiated_protocol) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; CRONET_LOG(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream, headers, negotiated_protocol); stream_obj* s = (stream_obj*)stream->annotation; @@ -520,8 +528,9 @@ static void on_response_headers_received( for (size_t i = 0; i < headers->count; i++) { GRPC_LOG_IF_ERROR("on_response_headers_received", grpc_chttp2_incoming_metadata_buffer_add( - &s->state.rs.initial_metadata, + &exec_ctx, &s->state.rs.initial_metadata, grpc_mdelem_from_slices( + &exec_ctx, grpc_slice_intern(grpc_slice_from_static_string( headers->headers[i].key)), grpc_slice_intern(grpc_slice_from_static_string( @@ -543,14 +552,15 @@ static void on_response_headers_received( s->state.pending_read_from_cronet = true; } gpr_mu_unlock(&s->mu); - execute_from_storage(s); + execute_from_storage(&exec_ctx, s); + grpc_exec_ctx_finish(&exec_ctx); } /* Cronet callback */ static void on_write_completed(bidirectional_stream* stream, const char* data) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; stream_obj* s = (stream_obj*)stream->annotation; CRONET_LOG(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data); gpr_mu_lock(&s->mu); @@ -560,7 +570,8 @@ static void on_write_completed(bidirectional_stream* stream, const char* data) { } s->state.state_callback_received[OP_SEND_MESSAGE] = true; gpr_mu_unlock(&s->mu); - execute_from_storage(s); + execute_from_storage(&exec_ctx, s); + grpc_exec_ctx_finish(&exec_ctx); } /* @@ -568,7 +579,7 @@ static void on_write_completed(bidirectional_stream* stream, const char* data) { */ static void on_read_completed(bidirectional_stream* stream, char* data, int count) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; stream_obj* s = (stream_obj*)stream->annotation; CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data, count); @@ -594,14 +605,15 @@ static void on_read_completed(bidirectional_stream* stream, char* data, gpr_mu_unlock(&s->mu); } else { gpr_mu_unlock(&s->mu); - execute_from_storage(s); + execute_from_storage(&exec_ctx, s); } } else { null_and_maybe_free_read_buffer(s); s->state.rs.read_stream_closed = true; gpr_mu_unlock(&s->mu); - execute_from_storage(s); + execute_from_storage(&exec_ctx, s); } + grpc_exec_ctx_finish(&exec_ctx); } /* @@ -610,7 +622,7 @@ static void on_read_completed(bidirectional_stream* stream, char* data, static void on_response_trailers_received( bidirectional_stream* stream, const bidirectional_stream_header_array* trailers) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream, trailers); stream_obj* s = (stream_obj*)stream->annotation; @@ -626,8 +638,9 @@ static void on_response_trailers_received( trailers->headers[i].value); GRPC_LOG_IF_ERROR("on_response_trailers_received", grpc_chttp2_incoming_metadata_buffer_add( - &s->state.rs.trailing_metadata, + &exec_ctx, &s->state.rs.trailing_metadata, grpc_mdelem_from_slices( + &exec_ctx, grpc_slice_intern(grpc_slice_from_static_string( trailers->headers[i].key)), grpc_slice_intern(grpc_slice_from_static_string( @@ -657,15 +670,17 @@ static void on_response_trailers_received( gpr_mu_unlock(&s->mu); } else { gpr_mu_unlock(&s->mu); - execute_from_storage(s); + execute_from_storage(&exec_ctx, s); } + grpc_exec_ctx_finish(&exec_ctx); } /* Utility function that takes the data from s->write_slice_buffer and assembles into a contiguous byte stream with 5 byte gRPC header prepended. */ -static void create_grpc_frame(grpc_slice_buffer* write_slice_buffer, +static void create_grpc_frame(grpc_exec_ctx* exec_ctx, + grpc_slice_buffer* write_slice_buffer, char** pp_write_buffer, size_t* p_write_buffer_size, uint32_t flags) { grpc_slice slice = grpc_slice_buffer_take_first(write_slice_buffer); @@ -685,7 +700,7 @@ static void create_grpc_frame(grpc_slice_buffer* write_slice_buffer, *p++ = (uint8_t)(length); /* append actual data */ memcpy(p, GRPC_SLICE_START_PTR(slice), length); - grpc_slice_unref_internal(slice); + grpc_slice_unref_internal(exec_ctx, slice); } /* @@ -966,7 +981,8 @@ static bool op_can_be_run(grpc_transport_stream_op_batch* curr_op, /* TODO (makdharma): Break down this function in smaller chunks for readability. */ -static enum e_op_result execute_stream_op(struct op_and_state* oas) { +static enum e_op_result execute_stream_op(grpc_exec_ctx* exec_ctx, + struct op_and_state* oas) { grpc_transport_stream_op_batch* stream_op = &oas->op; struct stream_obj* s = oas->s; grpc_cronet_transport* t = (grpc_cronet_transport*)s->curr_ct; @@ -1024,14 +1040,15 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) { grpc_slice slice; grpc_slice_buffer_init(&write_slice_buffer); if (1 != grpc_byte_stream_next( - stream_op->payload->send_message.send_message, + exec_ctx, stream_op->payload->send_message.send_message, stream_op->payload->send_message.send_message->length, nullptr)) { /* Should never reach here */ GPR_ASSERT(false); } if (GRPC_ERROR_NONE != - grpc_byte_stream_pull(stream_op->payload->send_message.send_message, + grpc_byte_stream_pull(exec_ctx, + stream_op->payload->send_message.send_message, &slice)) { /* Should never reach here */ GPR_ASSERT(false); @@ -1044,15 +1061,15 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) { } if (write_slice_buffer.count > 0) { size_t write_buffer_size; - create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer, - &write_buffer_size, + create_grpc_frame(exec_ctx, &write_slice_buffer, + &stream_state->ws.write_buffer, &write_buffer_size, stream_op->payload->send_message.send_message->flags); CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, %p)", s->cbs, stream_state->ws.write_buffer); stream_state->state_callback_received[OP_SEND_MESSAGE] = false; bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer, (int)write_buffer_size, false); - grpc_slice_buffer_destroy_internal(&write_slice_buffer); + grpc_slice_buffer_destroy_internal(exec_ctx, &write_slice_buffer); if (t->use_packet_coalescing) { if (!stream_op->send_trailing_metadata) { CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs); @@ -1095,21 +1112,25 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) { CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas); if (stream_state->state_op_done[OP_CANCEL_ERROR]) { GRPC_CLOSURE_SCHED( + exec_ctx, stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready, GRPC_ERROR_NONE); } else if (stream_state->state_callback_received[OP_FAILED]) { GRPC_CLOSURE_SCHED( + exec_ctx, stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready, GRPC_ERROR_NONE); } else if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) { GRPC_CLOSURE_SCHED( + exec_ctx, stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready, GRPC_ERROR_NONE); } else { grpc_chttp2_incoming_metadata_buffer_publish( - &oas->s->state.rs.initial_metadata, + exec_ctx, &oas->s->state.rs.initial_metadata, stream_op->payload->recv_initial_metadata.recv_initial_metadata); GRPC_CLOSURE_SCHED( + exec_ctx, stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready, GRPC_ERROR_NONE); } @@ -1120,14 +1141,16 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) { CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas); if (stream_state->state_op_done[OP_CANCEL_ERROR]) { CRONET_LOG(GPR_DEBUG, "Stream is cancelled."); - GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready, + GRPC_CLOSURE_SCHED(exec_ctx, + stream_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; result = ACTION_TAKEN_NO_CALLBACK; } else if (stream_state->state_callback_received[OP_FAILED]) { CRONET_LOG(GPR_DEBUG, "Stream failed."); - GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready, + GRPC_CLOSURE_SCHED(exec_ctx, + stream_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; @@ -1135,14 +1158,16 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) { } else if (stream_state->rs.read_stream_closed == true) { /* No more data will be received */ CRONET_LOG(GPR_DEBUG, "read stream closed"); - GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready, + GRPC_CLOSURE_SCHED(exec_ctx, + stream_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; result = ACTION_TAKEN_NO_CALLBACK; } else if (stream_state->flush_read) { CRONET_LOG(GPR_DEBUG, "flush read"); - GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready, + GRPC_CLOSURE_SCHED(exec_ctx, + stream_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; @@ -1175,7 +1200,7 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) { CRONET_LOG(GPR_DEBUG, "read operation complete. Empty response."); /* Clean up read_slice_buffer in case there is unread data. */ grpc_slice_buffer_destroy_internal( - &stream_state->rs.read_slice_buffer); + exec_ctx, &stream_state->rs.read_slice_buffer); grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer); grpc_slice_buffer_stream_init(&stream_state->rs.sbs, &stream_state->rs.read_slice_buffer, 0); @@ -1185,7 +1210,7 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) { *((grpc_byte_buffer**)stream_op->payload->recv_message.recv_message) = (grpc_byte_buffer*)&stream_state->rs.sbs; GRPC_CLOSURE_SCHED( - stream_op->payload->recv_message.recv_message_ready, + exec_ctx, stream_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; @@ -1229,7 +1254,8 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) { (size_t)stream_state->rs.length_field); null_and_maybe_free_read_buffer(s); /* Clean up read_slice_buffer in case there is unread data. */ - grpc_slice_buffer_destroy_internal(&stream_state->rs.read_slice_buffer); + grpc_slice_buffer_destroy_internal(exec_ctx, + &stream_state->rs.read_slice_buffer); grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer); grpc_slice_buffer_add(&stream_state->rs.read_slice_buffer, read_data_slice); @@ -1240,7 +1266,8 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) { } *((grpc_byte_buffer**)stream_op->payload->recv_message.recv_message) = (grpc_byte_buffer*)&stream_state->rs.sbs; - GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready, + GRPC_CLOSURE_SCHED(exec_ctx, + stream_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; @@ -1263,7 +1290,7 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) { CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_TRAILING_METADATA", oas); if (oas->s->state.rs.trailing_metadata_valid) { grpc_chttp2_incoming_metadata_buffer_publish( - &oas->s->state.rs.trailing_metadata, + exec_ctx, &oas->s->state.rs.trailing_metadata, stream_op->payload->recv_trailing_metadata.recv_trailing_metadata); stream_state->rs.trailing_metadata_valid = false; } @@ -1288,17 +1315,17 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) { op_can_be_run(stream_op, s, &oas->state, OP_ON_COMPLETE)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_ON_COMPLETE", oas); if (stream_state->state_op_done[OP_CANCEL_ERROR]) { - GRPC_CLOSURE_SCHED(stream_op->on_complete, + GRPC_CLOSURE_SCHED(exec_ctx, stream_op->on_complete, GRPC_ERROR_REF(stream_state->cancel_error)); } else if (stream_state->state_callback_received[OP_FAILED]) { GRPC_CLOSURE_SCHED( - stream_op->on_complete, + exec_ctx, stream_op->on_complete, make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable.")); } else { /* All actions in this stream_op are complete. Call the on_complete * callback */ - GRPC_CLOSURE_SCHED(stream_op->on_complete, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, stream_op->on_complete, GRPC_ERROR_NONE); } oas->state.state_op_done[OP_ON_COMPLETE] = true; oas->done = true; @@ -1323,9 +1350,9 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) { Functions used by upper layers to access transport functionality. */ -static int init_stream(grpc_transport* gt, grpc_stream* gs, - grpc_stream_refcount* refcount, const void* server_data, - gpr_arena* arena) { +static int init_stream(grpc_exec_ctx* exec_ctx, grpc_transport* gt, + grpc_stream* gs, grpc_stream_refcount* refcount, + const void* server_data, gpr_arena* arena) { stream_obj* s = (stream_obj*)gs; s->refcount = refcount; @@ -1356,13 +1383,15 @@ static int init_stream(grpc_transport* gt, grpc_stream* gs, return 0; } -static void set_pollset_do_nothing(grpc_transport* gt, grpc_stream* gs, - grpc_pollset* pollset) {} +static void set_pollset_do_nothing(grpc_exec_ctx* exec_ctx, grpc_transport* gt, + grpc_stream* gs, grpc_pollset* pollset) {} -static void set_pollset_set_do_nothing(grpc_transport* gt, grpc_stream* gs, +static void set_pollset_set_do_nothing(grpc_exec_ctx* exec_ctx, + grpc_transport* gt, grpc_stream* gs, grpc_pollset_set* pollset_set) {} -static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, +static void perform_stream_op(grpc_exec_ctx* exec_ctx, grpc_transport* gt, + grpc_stream* gs, grpc_transport_stream_op_batch* op) { CRONET_LOG(GPR_DEBUG, "perform_stream_op"); if (op->send_initial_metadata && @@ -1372,36 +1401,42 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, this field is present in metadata */ if (op->recv_initial_metadata) { GRPC_CLOSURE_SCHED( + exec_ctx, op->payload->recv_initial_metadata.recv_initial_metadata_ready, GRPC_ERROR_CANCELLED); } if (op->recv_message) { - GRPC_CLOSURE_SCHED(op->payload->recv_message.recv_message_ready, + GRPC_CLOSURE_SCHED(exec_ctx, op->payload->recv_message.recv_message_ready, GRPC_ERROR_CANCELLED); } - GRPC_CLOSURE_SCHED(op->on_complete, GRPC_ERROR_CANCELLED); + GRPC_CLOSURE_SCHED(exec_ctx, op->on_complete, GRPC_ERROR_CANCELLED); return; } stream_obj* s = (stream_obj*)gs; add_to_storage(s, op); - execute_from_storage(s); + execute_from_storage(exec_ctx, s); } -static void destroy_stream(grpc_transport* gt, grpc_stream* gs, +static void destroy_stream(grpc_exec_ctx* exec_ctx, grpc_transport* gt, + grpc_stream* gs, grpc_closure* then_schedule_closure) { stream_obj* s = (stream_obj*)gs; null_and_maybe_free_read_buffer(s); /* Clean up read_slice_buffer in case there is unread data. */ - grpc_slice_buffer_destroy_internal(&s->state.rs.read_slice_buffer); + grpc_slice_buffer_destroy_internal(exec_ctx, &s->state.rs.read_slice_buffer); GRPC_ERROR_UNREF(s->state.cancel_error); - GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE); } -static void destroy_transport(grpc_transport* gt) {} +static void destroy_transport(grpc_exec_ctx* exec_ctx, grpc_transport* gt) {} -static grpc_endpoint* get_endpoint(grpc_transport* gt) { return nullptr; } +static grpc_endpoint* get_endpoint(grpc_exec_ctx* exec_ctx, + grpc_transport* gt) { + return nullptr; +} -static void perform_op(grpc_transport* gt, grpc_transport_op* op) {} +static void perform_op(grpc_exec_ctx* exec_ctx, grpc_transport* gt, + grpc_transport_op* op) {} static const grpc_transport_vtable grpc_cronet_vtable = { sizeof(stream_obj), |