From 0ee7574732a06e8cace4e099a678f4bd5dbff679 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Fri, 13 Oct 2017 16:07:13 -0700 Subject: Removing instances of exec_ctx being passed around in functions in src/core. exec_ctx is now a thread_local pointer of type ExecCtx instead of grpc_exec_ctx which is initialized whenever ExecCtx is instantiated. ExecCtx also keeps track of the previous exec_ctx so that nesting of exec_ctx is allowed. This means that there is only one exec_ctx being used at any time. Also, grpc_exec_ctx_finish is called in the destructor of the object, and the previous exec_ctx is restored to avoid breaking current functionality. The code still explicitly calls grpc_exec_ctx_finish because removing all such instances causes the code to break. --- .../cronet/client/secure/cronet_channel_create.cc | 5 +- .../transport/cronet/transport/cronet_transport.cc | 209 +++++++++------------ 2 files changed, 93 insertions(+), 121 deletions(-) (limited to 'src/core/ext/transport/cronet') diff --git a/src/core/ext/transport/cronet/client/secure/cronet_channel_create.cc b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.cc index b280487ca3..59d91a3318 100644 --- a/src/core/ext/transport/cronet/client/secure/cronet_channel_create.cc +++ b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.cc @@ -49,7 +49,6 @@ GRPCAPI grpc_channel *grpc_cronet_secure_channel_create( grpc_transport *ct = grpc_create_cronet_transport(engine, target, args, reserved); - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - return grpc_channel_create(&exec_ctx, target, args, - GRPC_CLIENT_DIRECT_CHANNEL, ct); + ExecCtx _local_exec_ctx; + return grpc_channel_create(target, args, GRPC_CLIENT_DIRECT_CHANNEL, ct); } diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.cc b/src/core/ext/transport/cronet/transport/cronet_transport.cc index ff1367fb28..fa7c9db710 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.cc +++ b/src/core/ext/transport/cronet/transport/cronet_transport.cc @@ -197,27 +197,23 @@ typedef struct stream_obj stream_obj; #ifndef NDEBUG #define GRPC_CRONET_STREAM_REF(stream, reason) \ grpc_cronet_stream_ref((stream), (reason)) -#define GRPC_CRONET_STREAM_UNREF(exec_ctx, stream, reason) \ - grpc_cronet_stream_unref((exec_ctx), (stream), (reason)) +#define GRPC_CRONET_STREAM_UNREF(stream, reason) \ + grpc_cronet_stream_unref((stream), (reason)) void grpc_cronet_stream_ref(stream_obj *s, const char *reason) { grpc_stream_ref(s->refcount, reason); } -void grpc_cronet_stream_unref(grpc_exec_ctx *exec_ctx, stream_obj *s, - const char *reason) { - grpc_stream_unref(exec_ctx, s->refcount, reason); +void grpc_cronet_stream_unref(stream_obj *s, const char *reason) { + grpc_stream_unref(s->refcount, reason); } #else #define GRPC_CRONET_STREAM_REF(stream, reason) grpc_cronet_stream_ref((stream)) -#define GRPC_CRONET_STREAM_UNREF(exec_ctx, stream, reason) \ - grpc_cronet_stream_unref((exec_ctx), (stream)) +#define GRPC_CRONET_STREAM_UNREF(stream, reason) \ + grpc_cronet_stream_unref((stream)) void grpc_cronet_stream_ref(stream_obj *s) { grpc_stream_ref(s->refcount); } -void grpc_cronet_stream_unref(grpc_exec_ctx *exec_ctx, stream_obj *s) { - grpc_stream_unref(exec_ctx, s->refcount); -} +void grpc_cronet_stream_unref(stream_obj *s) { grpc_stream_unref(s->refcount); } #endif -static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, - struct op_and_state *oas); +static enum e_op_result execute_stream_op(struct op_and_state *oas); /* Utility function to translate enum into string for printing @@ -373,12 +369,12 @@ static void remove_from_storage(struct stream_obj *s, This can get executed from the Cronet network thread via cronet callback or on the application supplied thread via the perform_stream_op function. */ -static void execute_from_storage(grpc_exec_ctx *exec_ctx, stream_obj *s) { +static void execute_from_storage(stream_obj *s) { gpr_mu_lock(&s->mu); for (struct op_and_state *curr = s->storage.head; curr != NULL;) { CRONET_LOG(GPR_DEBUG, "calling op at %p. done = %d", curr, curr->done); GPR_ASSERT(curr->done == 0); - enum e_op_result result = execute_stream_op(exec_ctx, curr); + enum e_op_result result = execute_stream_op(curr); CRONET_LOG(GPR_DEBUG, "execute_stream_op[%p] returns %s", curr, op_result_string(result)); /* if this op is done, then remove it and free memory */ @@ -402,7 +398,7 @@ static void execute_from_storage(grpc_exec_ctx *exec_ctx, stream_obj *s) { */ static void on_failed(bidirectional_stream *stream, int net_error) { CRONET_LOG(GPR_DEBUG, "on_failed(%p, %d)", stream, net_error); - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + ExecCtx _local_exec_ctx; stream_obj *s = (stream_obj *)stream->annotation; gpr_mu_lock(&s->mu); @@ -419,9 +415,9 @@ static void on_failed(bidirectional_stream *stream, int net_error) { } null_and_maybe_free_read_buffer(s); gpr_mu_unlock(&s->mu); - execute_from_storage(&exec_ctx, s); - GRPC_CRONET_STREAM_UNREF(&exec_ctx, s, "cronet transport"); - grpc_exec_ctx_finish(&exec_ctx); + execute_from_storage(s); + GRPC_CRONET_STREAM_UNREF(s, "cronet transport"); + grpc_exec_ctx_finish(); } /* @@ -429,7 +425,7 @@ static void on_failed(bidirectional_stream *stream, int net_error) { */ static void on_canceled(bidirectional_stream *stream) { CRONET_LOG(GPR_DEBUG, "on_canceled(%p)", stream); - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + ExecCtx _local_exec_ctx; stream_obj *s = (stream_obj *)stream->annotation; gpr_mu_lock(&s->mu); @@ -446,9 +442,9 @@ static void on_canceled(bidirectional_stream *stream) { } null_and_maybe_free_read_buffer(s); gpr_mu_unlock(&s->mu); - execute_from_storage(&exec_ctx, s); - GRPC_CRONET_STREAM_UNREF(&exec_ctx, s, "cronet transport"); - grpc_exec_ctx_finish(&exec_ctx); + execute_from_storage(s); + GRPC_CRONET_STREAM_UNREF(s, "cronet transport"); + grpc_exec_ctx_finish(); } /* @@ -456,7 +452,7 @@ static void on_canceled(bidirectional_stream *stream) { */ static void on_succeeded(bidirectional_stream *stream) { CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream); - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + ExecCtx _local_exec_ctx; stream_obj *s = (stream_obj *)stream->annotation; gpr_mu_lock(&s->mu); @@ -465,9 +461,9 @@ static void on_succeeded(bidirectional_stream *stream) { s->cbs = NULL; null_and_maybe_free_read_buffer(s); gpr_mu_unlock(&s->mu); - execute_from_storage(&exec_ctx, s); - GRPC_CRONET_STREAM_UNREF(&exec_ctx, s, "cronet transport"); - grpc_exec_ctx_finish(&exec_ctx); + execute_from_storage(s); + GRPC_CRONET_STREAM_UNREF(s, "cronet transport"); + grpc_exec_ctx_finish(); } /* @@ -475,7 +471,7 @@ static void on_succeeded(bidirectional_stream *stream) { */ static void on_stream_ready(bidirectional_stream *stream) { CRONET_LOG(GPR_DEBUG, "W: on_stream_ready(%p)", stream); - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + ExecCtx _local_exec_ctx; stream_obj *s = (stream_obj *)stream->annotation; grpc_cronet_transport *t = (grpc_cronet_transport *)s->curr_ct; gpr_mu_lock(&s->mu); @@ -495,8 +491,8 @@ static void on_stream_ready(bidirectional_stream *stream) { } } gpr_mu_unlock(&s->mu); - execute_from_storage(&exec_ctx, s); - grpc_exec_ctx_finish(&exec_ctx); + execute_from_storage(s); + grpc_exec_ctx_finish(); } /* @@ -506,7 +502,7 @@ static void on_response_headers_received( bidirectional_stream *stream, const bidirectional_stream_header_array *headers, const char *negotiated_protocol) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + ExecCtx _local_exec_ctx; CRONET_LOG(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream, headers, negotiated_protocol); stream_obj *s = (stream_obj *)stream->annotation; @@ -526,15 +522,14 @@ static void on_response_headers_received( grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.initial_metadata, s->arena); for (size_t i = 0; i < headers->count; i++) { - GRPC_LOG_IF_ERROR( - "on_response_headers_received", - grpc_chttp2_incoming_metadata_buffer_add( - &exec_ctx, &s->state.rs.initial_metadata, - grpc_mdelem_from_slices( - &exec_ctx, grpc_slice_intern(grpc_slice_from_static_string( - headers->headers[i].key)), - grpc_slice_intern(grpc_slice_from_static_string( - headers->headers[i].value))))); + GRPC_LOG_IF_ERROR("on_response_headers_received", + grpc_chttp2_incoming_metadata_buffer_add( + &s->state.rs.initial_metadata, + grpc_mdelem_from_slices( + grpc_slice_intern(grpc_slice_from_static_string( + headers->headers[i].key)), + grpc_slice_intern(grpc_slice_from_static_string( + headers->headers[i].value))))); } s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true; if (!(s->state.state_op_done[OP_CANCEL_ERROR] || @@ -552,15 +547,15 @@ static void on_response_headers_received( s->state.pending_read_from_cronet = true; } gpr_mu_unlock(&s->mu); - execute_from_storage(&exec_ctx, s); - grpc_exec_ctx_finish(&exec_ctx); + execute_from_storage(s); + grpc_exec_ctx_finish(); } /* Cronet callback */ static void on_write_completed(bidirectional_stream *stream, const char *data) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + ExecCtx _local_exec_ctx; stream_obj *s = (stream_obj *)stream->annotation; CRONET_LOG(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data); gpr_mu_lock(&s->mu); @@ -570,8 +565,8 @@ static void on_write_completed(bidirectional_stream *stream, const char *data) { } s->state.state_callback_received[OP_SEND_MESSAGE] = true; gpr_mu_unlock(&s->mu); - execute_from_storage(&exec_ctx, s); - grpc_exec_ctx_finish(&exec_ctx); + execute_from_storage(s); + grpc_exec_ctx_finish(); } /* @@ -579,7 +574,7 @@ static void on_write_completed(bidirectional_stream *stream, const char *data) { */ static void on_read_completed(bidirectional_stream *stream, char *data, int count) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + ExecCtx _local_exec_ctx; stream_obj *s = (stream_obj *)stream->annotation; CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data, count); @@ -605,15 +600,15 @@ static void on_read_completed(bidirectional_stream *stream, char *data, gpr_mu_unlock(&s->mu); } else { gpr_mu_unlock(&s->mu); - execute_from_storage(&exec_ctx, s); + execute_from_storage(s); } } else { null_and_maybe_free_read_buffer(s); s->state.rs.read_stream_closed = true; gpr_mu_unlock(&s->mu); - execute_from_storage(&exec_ctx, s); + execute_from_storage(s); } - grpc_exec_ctx_finish(&exec_ctx); + grpc_exec_ctx_finish(); } /* @@ -622,7 +617,7 @@ static void on_read_completed(bidirectional_stream *stream, char *data, static void on_response_trailers_received( bidirectional_stream *stream, const bidirectional_stream_header_array *trailers) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + ExecCtx _local_exec_ctx; CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream, trailers); stream_obj *s = (stream_obj *)stream->annotation; @@ -636,15 +631,14 @@ static void on_response_trailers_received( for (size_t i = 0; i < trailers->count; i++) { CRONET_LOG(GPR_DEBUG, "trailer key=%s, value=%s", trailers->headers[i].key, trailers->headers[i].value); - GRPC_LOG_IF_ERROR( - "on_response_trailers_received", - grpc_chttp2_incoming_metadata_buffer_add( - &exec_ctx, &s->state.rs.trailing_metadata, - grpc_mdelem_from_slices( - &exec_ctx, grpc_slice_intern(grpc_slice_from_static_string( - trailers->headers[i].key)), - grpc_slice_intern(grpc_slice_from_static_string( - trailers->headers[i].value))))); + GRPC_LOG_IF_ERROR("on_response_trailers_received", + grpc_chttp2_incoming_metadata_buffer_add( + &s->state.rs.trailing_metadata, + grpc_mdelem_from_slices( + grpc_slice_intern(grpc_slice_from_static_string( + trailers->headers[i].key)), + grpc_slice_intern(grpc_slice_from_static_string( + trailers->headers[i].value))))); s->state.rs.trailing_metadata_valid = true; if (0 == strcmp(trailers->headers[i].key, "grpc-status") && 0 != strcmp(trailers->headers[i].value, "0")) { @@ -670,17 +664,16 @@ static void on_response_trailers_received( gpr_mu_unlock(&s->mu); } else { gpr_mu_unlock(&s->mu); - execute_from_storage(&exec_ctx, s); + execute_from_storage(s); } - grpc_exec_ctx_finish(&exec_ctx); + grpc_exec_ctx_finish(); } /* Utility function that takes the data from s->write_slice_buffer and assembles into a contiguous byte stream with 5 byte gRPC header prepended. */ -static void create_grpc_frame(grpc_exec_ctx *exec_ctx, - grpc_slice_buffer *write_slice_buffer, +static void create_grpc_frame(grpc_slice_buffer *write_slice_buffer, char **pp_write_buffer, size_t *p_write_buffer_size, uint32_t flags) { grpc_slice slice = grpc_slice_buffer_take_first(write_slice_buffer); @@ -700,7 +693,7 @@ static void create_grpc_frame(grpc_exec_ctx *exec_ctx, *p++ = (uint8_t)(length); /* append actual data */ memcpy(p, GRPC_SLICE_START_PTR(slice), length); - grpc_slice_unref_internal(exec_ctx, slice); + grpc_slice_unref_internal(slice); } /* @@ -981,8 +974,7 @@ static bool op_can_be_run(grpc_transport_stream_op_batch *curr_op, /* TODO (makdharma): Break down this function in smaller chunks for readability. */ -static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, - struct op_and_state *oas) { +static enum e_op_result execute_stream_op(struct op_and_state *oas) { grpc_transport_stream_op_batch *stream_op = &oas->op; struct stream_obj *s = oas->s; grpc_cronet_transport *t = (grpc_cronet_transport *)s->curr_ct; @@ -1040,15 +1032,14 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, grpc_slice slice; grpc_slice_buffer_init(&write_slice_buffer); if (1 != grpc_byte_stream_next( - exec_ctx, stream_op->payload->send_message.send_message, + stream_op->payload->send_message.send_message, stream_op->payload->send_message.send_message->length, NULL)) { /* Should never reach here */ GPR_ASSERT(false); } if (GRPC_ERROR_NONE != - grpc_byte_stream_pull(exec_ctx, - stream_op->payload->send_message.send_message, + grpc_byte_stream_pull(stream_op->payload->send_message.send_message, &slice)) { /* Should never reach here */ GPR_ASSERT(false); @@ -1061,15 +1052,15 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, } if (write_slice_buffer.count > 0) { size_t write_buffer_size; - create_grpc_frame(exec_ctx, &write_slice_buffer, - &stream_state->ws.write_buffer, &write_buffer_size, + create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer, + &write_buffer_size, stream_op->payload->send_message.send_message->flags); CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, %p)", s->cbs, stream_state->ws.write_buffer); stream_state->state_callback_received[OP_SEND_MESSAGE] = false; bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer, (int)write_buffer_size, false); - grpc_slice_buffer_destroy_internal(exec_ctx, &write_slice_buffer); + grpc_slice_buffer_destroy_internal(&write_slice_buffer); if (t->use_packet_coalescing) { if (!stream_op->send_trailing_metadata) { CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs); @@ -1112,25 +1103,21 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas); if (stream_state->state_op_done[OP_CANCEL_ERROR]) { GRPC_CLOSURE_SCHED( - exec_ctx, stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready, GRPC_ERROR_NONE); } else if (stream_state->state_callback_received[OP_FAILED]) { GRPC_CLOSURE_SCHED( - exec_ctx, stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready, GRPC_ERROR_NONE); } else if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) { GRPC_CLOSURE_SCHED( - exec_ctx, stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready, GRPC_ERROR_NONE); } else { grpc_chttp2_incoming_metadata_buffer_publish( - exec_ctx, &oas->s->state.rs.initial_metadata, + &oas->s->state.rs.initial_metadata, stream_op->payload->recv_initial_metadata.recv_initial_metadata); GRPC_CLOSURE_SCHED( - exec_ctx, stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready, GRPC_ERROR_NONE); } @@ -1141,16 +1128,14 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas); if (stream_state->state_op_done[OP_CANCEL_ERROR]) { CRONET_LOG(GPR_DEBUG, "Stream is cancelled."); - GRPC_CLOSURE_SCHED(exec_ctx, - stream_op->payload->recv_message.recv_message_ready, + GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; result = ACTION_TAKEN_NO_CALLBACK; } else if (stream_state->state_callback_received[OP_FAILED]) { CRONET_LOG(GPR_DEBUG, "Stream failed."); - GRPC_CLOSURE_SCHED(exec_ctx, - stream_op->payload->recv_message.recv_message_ready, + GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; @@ -1158,16 +1143,14 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, } else if (stream_state->rs.read_stream_closed == true) { /* No more data will be received */ CRONET_LOG(GPR_DEBUG, "read stream closed"); - GRPC_CLOSURE_SCHED(exec_ctx, - stream_op->payload->recv_message.recv_message_ready, + GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; result = ACTION_TAKEN_NO_CALLBACK; } else if (stream_state->flush_read) { CRONET_LOG(GPR_DEBUG, "flush read"); - GRPC_CLOSURE_SCHED(exec_ctx, - stream_op->payload->recv_message.recv_message_ready, + GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; @@ -1200,7 +1183,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, CRONET_LOG(GPR_DEBUG, "read operation complete. Empty response."); /* Clean up read_slice_buffer in case there is unread data. */ grpc_slice_buffer_destroy_internal( - exec_ctx, &stream_state->rs.read_slice_buffer); + &stream_state->rs.read_slice_buffer); grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer); grpc_slice_buffer_stream_init(&stream_state->rs.sbs, &stream_state->rs.read_slice_buffer, 0); @@ -1211,7 +1194,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, stream_op->payload->recv_message.recv_message) = (grpc_byte_buffer *)&stream_state->rs.sbs; GRPC_CLOSURE_SCHED( - exec_ctx, stream_op->payload->recv_message.recv_message_ready, + stream_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; @@ -1255,8 +1238,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, (size_t)stream_state->rs.length_field); null_and_maybe_free_read_buffer(s); /* Clean up read_slice_buffer in case there is unread data. */ - grpc_slice_buffer_destroy_internal(exec_ctx, - &stream_state->rs.read_slice_buffer); + grpc_slice_buffer_destroy_internal(&stream_state->rs.read_slice_buffer); grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer); grpc_slice_buffer_add(&stream_state->rs.read_slice_buffer, read_data_slice); @@ -1267,8 +1249,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, } *((grpc_byte_buffer **)stream_op->payload->recv_message.recv_message) = (grpc_byte_buffer *)&stream_state->rs.sbs; - GRPC_CLOSURE_SCHED(exec_ctx, - stream_op->payload->recv_message.recv_message_ready, + GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; @@ -1291,7 +1272,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_TRAILING_METADATA", oas); if (oas->s->state.rs.trailing_metadata_valid) { grpc_chttp2_incoming_metadata_buffer_publish( - exec_ctx, &oas->s->state.rs.trailing_metadata, + &oas->s->state.rs.trailing_metadata, stream_op->payload->recv_trailing_metadata.recv_trailing_metadata); stream_state->rs.trailing_metadata_valid = false; } @@ -1316,17 +1297,17 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, op_can_be_run(stream_op, s, &oas->state, OP_ON_COMPLETE)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_ON_COMPLETE", oas); if (stream_state->state_op_done[OP_CANCEL_ERROR]) { - GRPC_CLOSURE_SCHED(exec_ctx, stream_op->on_complete, + GRPC_CLOSURE_SCHED(stream_op->on_complete, GRPC_ERROR_REF(stream_state->cancel_error)); } else if (stream_state->state_callback_received[OP_FAILED]) { GRPC_CLOSURE_SCHED( - exec_ctx, stream_op->on_complete, + stream_op->on_complete, make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable.")); } else { /* All actions in this stream_op are complete. Call the on_complete * callback */ - GRPC_CLOSURE_SCHED(exec_ctx, stream_op->on_complete, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(stream_op->on_complete, GRPC_ERROR_NONE); } oas->state.state_op_done[OP_ON_COMPLETE] = true; oas->done = true; @@ -1351,9 +1332,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, Functions used by upper layers to access transport functionality. */ -static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, grpc_stream_refcount *refcount, - const void *server_data, gpr_arena *arena) { +static int init_stream(grpc_transport *gt, grpc_stream *gs, + grpc_stream_refcount *refcount, const void *server_data, + gpr_arena *arena) { stream_obj *s = (stream_obj *)gs; s->refcount = refcount; @@ -1384,15 +1365,13 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, return 0; } -static void set_pollset_do_nothing(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, grpc_pollset *pollset) {} +static void set_pollset_do_nothing(grpc_transport *gt, grpc_stream *gs, + grpc_pollset *pollset) {} -static void set_pollset_set_do_nothing(grpc_exec_ctx *exec_ctx, - grpc_transport *gt, grpc_stream *gs, +static void set_pollset_set_do_nothing(grpc_transport *gt, grpc_stream *gs, grpc_pollset_set *pollset_set) {} -static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, +static void perform_stream_op(grpc_transport *gt, grpc_stream *gs, grpc_transport_stream_op_batch *op) { CRONET_LOG(GPR_DEBUG, "perform_stream_op"); if (op->send_initial_metadata && @@ -1402,42 +1381,36 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, this field is present in metadata */ if (op->recv_initial_metadata) { GRPC_CLOSURE_SCHED( - exec_ctx, op->payload->recv_initial_metadata.recv_initial_metadata_ready, GRPC_ERROR_CANCELLED); } if (op->recv_message) { - GRPC_CLOSURE_SCHED(exec_ctx, op->payload->recv_message.recv_message_ready, + GRPC_CLOSURE_SCHED(op->payload->recv_message.recv_message_ready, GRPC_ERROR_CANCELLED); } - GRPC_CLOSURE_SCHED(exec_ctx, op->on_complete, GRPC_ERROR_CANCELLED); + GRPC_CLOSURE_SCHED(op->on_complete, GRPC_ERROR_CANCELLED); return; } stream_obj *s = (stream_obj *)gs; add_to_storage(s, op); - execute_from_storage(exec_ctx, s); + execute_from_storage(s); } -static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, +static void destroy_stream(grpc_transport *gt, grpc_stream *gs, grpc_closure *then_schedule_closure) { stream_obj *s = (stream_obj *)gs; null_and_maybe_free_read_buffer(s); /* Clean up read_slice_buffer in case there is unread data. */ - grpc_slice_buffer_destroy_internal(exec_ctx, &s->state.rs.read_slice_buffer); + grpc_slice_buffer_destroy_internal(&s->state.rs.read_slice_buffer); GRPC_ERROR_UNREF(s->state.cancel_error); - GRPC_CLOSURE_SCHED(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE); } -static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {} +static void destroy_transport(grpc_transport *gt) {} -static grpc_endpoint *get_endpoint(grpc_exec_ctx *exec_ctx, - grpc_transport *gt) { - return NULL; -} +static grpc_endpoint *get_endpoint(grpc_transport *gt) { return NULL; } -static void perform_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_transport_op *op) {} +static void perform_op(grpc_transport *gt, grpc_transport_op *op) {} static const grpc_transport_vtable grpc_cronet_vtable = { sizeof(stream_obj), -- cgit v1.2.3