aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/transport/cronet
diff options
context:
space:
mode:
authorGravatar Yash Tibrewal <yashkt@google.com>2017-10-13 16:07:13 -0700
committerGravatar Yash Tibrewal <yashkt@google.com>2017-10-18 17:12:19 -0700
commit0ee7574732a06e8cace4e099a678f4bd5dbff679 (patch)
treee43d5de442fdcc3d39cd5af687f319fa39612d3f /src/core/ext/transport/cronet
parent6bf5f833efe2cb9e2ecc14358dd9699cd5d05263 (diff)
Removing instances of exec_ctx being passed around in functions in
src/core. exec_ctx is now a thread_local pointer of type ExecCtx instead of grpc_exec_ctx which is initialized whenever ExecCtx is instantiated. ExecCtx also keeps track of the previous exec_ctx so that nesting of exec_ctx is allowed. This means that there is only one exec_ctx being used at any time. Also, grpc_exec_ctx_finish is called in the destructor of the object, and the previous exec_ctx is restored to avoid breaking current functionality. The code still explicitly calls grpc_exec_ctx_finish because removing all such instances causes the code to break.
Diffstat (limited to 'src/core/ext/transport/cronet')
-rw-r--r--src/core/ext/transport/cronet/client/secure/cronet_channel_create.cc5
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.cc209
2 files changed, 93 insertions, 121 deletions
diff --git a/src/core/ext/transport/cronet/client/secure/cronet_channel_create.cc b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.cc
index b280487ca3..59d91a3318 100644
--- a/src/core/ext/transport/cronet/client/secure/cronet_channel_create.cc
+++ b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.cc
@@ -49,7 +49,6 @@ GRPCAPI grpc_channel *grpc_cronet_secure_channel_create(
grpc_transport *ct =
grpc_create_cronet_transport(engine, target, args, reserved);
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- return grpc_channel_create(&exec_ctx, target, args,
- GRPC_CLIENT_DIRECT_CHANNEL, ct);
+ ExecCtx _local_exec_ctx;
+ return grpc_channel_create(target, args, GRPC_CLIENT_DIRECT_CHANNEL, ct);
}
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.cc b/src/core/ext/transport/cronet/transport/cronet_transport.cc
index ff1367fb28..fa7c9db710 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.cc
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.cc
@@ -197,27 +197,23 @@ typedef struct stream_obj stream_obj;
#ifndef NDEBUG
#define GRPC_CRONET_STREAM_REF(stream, reason) \
grpc_cronet_stream_ref((stream), (reason))
-#define GRPC_CRONET_STREAM_UNREF(exec_ctx, stream, reason) \
- grpc_cronet_stream_unref((exec_ctx), (stream), (reason))
+#define GRPC_CRONET_STREAM_UNREF(stream, reason) \
+ grpc_cronet_stream_unref((stream), (reason))
void grpc_cronet_stream_ref(stream_obj *s, const char *reason) {
grpc_stream_ref(s->refcount, reason);
}
-void grpc_cronet_stream_unref(grpc_exec_ctx *exec_ctx, stream_obj *s,
- const char *reason) {
- grpc_stream_unref(exec_ctx, s->refcount, reason);
+void grpc_cronet_stream_unref(stream_obj *s, const char *reason) {
+ grpc_stream_unref(s->refcount, reason);
}
#else
#define GRPC_CRONET_STREAM_REF(stream, reason) grpc_cronet_stream_ref((stream))
-#define GRPC_CRONET_STREAM_UNREF(exec_ctx, stream, reason) \
- grpc_cronet_stream_unref((exec_ctx), (stream))
+#define GRPC_CRONET_STREAM_UNREF(stream, reason) \
+ grpc_cronet_stream_unref((stream))
void grpc_cronet_stream_ref(stream_obj *s) { grpc_stream_ref(s->refcount); }
-void grpc_cronet_stream_unref(grpc_exec_ctx *exec_ctx, stream_obj *s) {
- grpc_stream_unref(exec_ctx, s->refcount);
-}
+void grpc_cronet_stream_unref(stream_obj *s) { grpc_stream_unref(s->refcount); }
#endif
-static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
- struct op_and_state *oas);
+static enum e_op_result execute_stream_op(struct op_and_state *oas);
/*
Utility function to translate enum into string for printing
@@ -373,12 +369,12 @@ static void remove_from_storage(struct stream_obj *s,
This can get executed from the Cronet network thread via cronet callback
or on the application supplied thread via the perform_stream_op function.
*/
-static void execute_from_storage(grpc_exec_ctx *exec_ctx, stream_obj *s) {
+static void execute_from_storage(stream_obj *s) {
gpr_mu_lock(&s->mu);
for (struct op_and_state *curr = s->storage.head; curr != NULL;) {
CRONET_LOG(GPR_DEBUG, "calling op at %p. done = %d", curr, curr->done);
GPR_ASSERT(curr->done == 0);
- enum e_op_result result = execute_stream_op(exec_ctx, curr);
+ enum e_op_result result = execute_stream_op(curr);
CRONET_LOG(GPR_DEBUG, "execute_stream_op[%p] returns %s", curr,
op_result_string(result));
/* if this op is done, then remove it and free memory */
@@ -402,7 +398,7 @@ static void execute_from_storage(grpc_exec_ctx *exec_ctx, stream_obj *s) {
*/
static void on_failed(bidirectional_stream *stream, int net_error) {
CRONET_LOG(GPR_DEBUG, "on_failed(%p, %d)", stream, net_error);
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ ExecCtx _local_exec_ctx;
stream_obj *s = (stream_obj *)stream->annotation;
gpr_mu_lock(&s->mu);
@@ -419,9 +415,9 @@ static void on_failed(bidirectional_stream *stream, int net_error) {
}
null_and_maybe_free_read_buffer(s);
gpr_mu_unlock(&s->mu);
- execute_from_storage(&exec_ctx, s);
- GRPC_CRONET_STREAM_UNREF(&exec_ctx, s, "cronet transport");
- grpc_exec_ctx_finish(&exec_ctx);
+ execute_from_storage(s);
+ GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
+ grpc_exec_ctx_finish();
}
/*
@@ -429,7 +425,7 @@ static void on_failed(bidirectional_stream *stream, int net_error) {
*/
static void on_canceled(bidirectional_stream *stream) {
CRONET_LOG(GPR_DEBUG, "on_canceled(%p)", stream);
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ ExecCtx _local_exec_ctx;
stream_obj *s = (stream_obj *)stream->annotation;
gpr_mu_lock(&s->mu);
@@ -446,9 +442,9 @@ static void on_canceled(bidirectional_stream *stream) {
}
null_and_maybe_free_read_buffer(s);
gpr_mu_unlock(&s->mu);
- execute_from_storage(&exec_ctx, s);
- GRPC_CRONET_STREAM_UNREF(&exec_ctx, s, "cronet transport");
- grpc_exec_ctx_finish(&exec_ctx);
+ execute_from_storage(s);
+ GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
+ grpc_exec_ctx_finish();
}
/*
@@ -456,7 +452,7 @@ static void on_canceled(bidirectional_stream *stream) {
*/
static void on_succeeded(bidirectional_stream *stream) {
CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream);
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ ExecCtx _local_exec_ctx;
stream_obj *s = (stream_obj *)stream->annotation;
gpr_mu_lock(&s->mu);
@@ -465,9 +461,9 @@ static void on_succeeded(bidirectional_stream *stream) {
s->cbs = NULL;
null_and_maybe_free_read_buffer(s);
gpr_mu_unlock(&s->mu);
- execute_from_storage(&exec_ctx, s);
- GRPC_CRONET_STREAM_UNREF(&exec_ctx, s, "cronet transport");
- grpc_exec_ctx_finish(&exec_ctx);
+ execute_from_storage(s);
+ GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
+ grpc_exec_ctx_finish();
}
/*
@@ -475,7 +471,7 @@ static void on_succeeded(bidirectional_stream *stream) {
*/
static void on_stream_ready(bidirectional_stream *stream) {
CRONET_LOG(GPR_DEBUG, "W: on_stream_ready(%p)", stream);
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ ExecCtx _local_exec_ctx;
stream_obj *s = (stream_obj *)stream->annotation;
grpc_cronet_transport *t = (grpc_cronet_transport *)s->curr_ct;
gpr_mu_lock(&s->mu);
@@ -495,8 +491,8 @@ static void on_stream_ready(bidirectional_stream *stream) {
}
}
gpr_mu_unlock(&s->mu);
- execute_from_storage(&exec_ctx, s);
- grpc_exec_ctx_finish(&exec_ctx);
+ execute_from_storage(s);
+ grpc_exec_ctx_finish();
}
/*
@@ -506,7 +502,7 @@ static void on_response_headers_received(
bidirectional_stream *stream,
const bidirectional_stream_header_array *headers,
const char *negotiated_protocol) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ ExecCtx _local_exec_ctx;
CRONET_LOG(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream,
headers, negotiated_protocol);
stream_obj *s = (stream_obj *)stream->annotation;
@@ -526,15 +522,14 @@ static void on_response_headers_received(
grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.initial_metadata,
s->arena);
for (size_t i = 0; i < headers->count; i++) {
- GRPC_LOG_IF_ERROR(
- "on_response_headers_received",
- grpc_chttp2_incoming_metadata_buffer_add(
- &exec_ctx, &s->state.rs.initial_metadata,
- grpc_mdelem_from_slices(
- &exec_ctx, grpc_slice_intern(grpc_slice_from_static_string(
- headers->headers[i].key)),
- grpc_slice_intern(grpc_slice_from_static_string(
- headers->headers[i].value)))));
+ GRPC_LOG_IF_ERROR("on_response_headers_received",
+ grpc_chttp2_incoming_metadata_buffer_add(
+ &s->state.rs.initial_metadata,
+ grpc_mdelem_from_slices(
+ grpc_slice_intern(grpc_slice_from_static_string(
+ headers->headers[i].key)),
+ grpc_slice_intern(grpc_slice_from_static_string(
+ headers->headers[i].value)))));
}
s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true;
if (!(s->state.state_op_done[OP_CANCEL_ERROR] ||
@@ -552,15 +547,15 @@ static void on_response_headers_received(
s->state.pending_read_from_cronet = true;
}
gpr_mu_unlock(&s->mu);
- execute_from_storage(&exec_ctx, s);
- grpc_exec_ctx_finish(&exec_ctx);
+ execute_from_storage(s);
+ grpc_exec_ctx_finish();
}
/*
Cronet callback
*/
static void on_write_completed(bidirectional_stream *stream, const char *data) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ ExecCtx _local_exec_ctx;
stream_obj *s = (stream_obj *)stream->annotation;
CRONET_LOG(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data);
gpr_mu_lock(&s->mu);
@@ -570,8 +565,8 @@ static void on_write_completed(bidirectional_stream *stream, const char *data) {
}
s->state.state_callback_received[OP_SEND_MESSAGE] = true;
gpr_mu_unlock(&s->mu);
- execute_from_storage(&exec_ctx, s);
- grpc_exec_ctx_finish(&exec_ctx);
+ execute_from_storage(s);
+ grpc_exec_ctx_finish();
}
/*
@@ -579,7 +574,7 @@ static void on_write_completed(bidirectional_stream *stream, const char *data) {
*/
static void on_read_completed(bidirectional_stream *stream, char *data,
int count) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ ExecCtx _local_exec_ctx;
stream_obj *s = (stream_obj *)stream->annotation;
CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data,
count);
@@ -605,15 +600,15 @@ static void on_read_completed(bidirectional_stream *stream, char *data,
gpr_mu_unlock(&s->mu);
} else {
gpr_mu_unlock(&s->mu);
- execute_from_storage(&exec_ctx, s);
+ execute_from_storage(s);
}
} else {
null_and_maybe_free_read_buffer(s);
s->state.rs.read_stream_closed = true;
gpr_mu_unlock(&s->mu);
- execute_from_storage(&exec_ctx, s);
+ execute_from_storage(s);
}
- grpc_exec_ctx_finish(&exec_ctx);
+ grpc_exec_ctx_finish();
}
/*
@@ -622,7 +617,7 @@ static void on_read_completed(bidirectional_stream *stream, char *data,
static void on_response_trailers_received(
bidirectional_stream *stream,
const bidirectional_stream_header_array *trailers) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ ExecCtx _local_exec_ctx;
CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream,
trailers);
stream_obj *s = (stream_obj *)stream->annotation;
@@ -636,15 +631,14 @@ static void on_response_trailers_received(
for (size_t i = 0; i < trailers->count; i++) {
CRONET_LOG(GPR_DEBUG, "trailer key=%s, value=%s", trailers->headers[i].key,
trailers->headers[i].value);
- GRPC_LOG_IF_ERROR(
- "on_response_trailers_received",
- grpc_chttp2_incoming_metadata_buffer_add(
- &exec_ctx, &s->state.rs.trailing_metadata,
- grpc_mdelem_from_slices(
- &exec_ctx, grpc_slice_intern(grpc_slice_from_static_string(
- trailers->headers[i].key)),
- grpc_slice_intern(grpc_slice_from_static_string(
- trailers->headers[i].value)))));
+ GRPC_LOG_IF_ERROR("on_response_trailers_received",
+ grpc_chttp2_incoming_metadata_buffer_add(
+ &s->state.rs.trailing_metadata,
+ grpc_mdelem_from_slices(
+ grpc_slice_intern(grpc_slice_from_static_string(
+ trailers->headers[i].key)),
+ grpc_slice_intern(grpc_slice_from_static_string(
+ trailers->headers[i].value)))));
s->state.rs.trailing_metadata_valid = true;
if (0 == strcmp(trailers->headers[i].key, "grpc-status") &&
0 != strcmp(trailers->headers[i].value, "0")) {
@@ -670,17 +664,16 @@ static void on_response_trailers_received(
gpr_mu_unlock(&s->mu);
} else {
gpr_mu_unlock(&s->mu);
- execute_from_storage(&exec_ctx, s);
+ execute_from_storage(s);
}
- grpc_exec_ctx_finish(&exec_ctx);
+ grpc_exec_ctx_finish();
}
/*
Utility function that takes the data from s->write_slice_buffer and assembles
into a contiguous byte stream with 5 byte gRPC header prepended.
*/
-static void create_grpc_frame(grpc_exec_ctx *exec_ctx,
- grpc_slice_buffer *write_slice_buffer,
+static void create_grpc_frame(grpc_slice_buffer *write_slice_buffer,
char **pp_write_buffer,
size_t *p_write_buffer_size, uint32_t flags) {
grpc_slice slice = grpc_slice_buffer_take_first(write_slice_buffer);
@@ -700,7 +693,7 @@ static void create_grpc_frame(grpc_exec_ctx *exec_ctx,
*p++ = (uint8_t)(length);
/* append actual data */
memcpy(p, GRPC_SLICE_START_PTR(slice), length);
- grpc_slice_unref_internal(exec_ctx, slice);
+ grpc_slice_unref_internal(slice);
}
/*
@@ -981,8 +974,7 @@ static bool op_can_be_run(grpc_transport_stream_op_batch *curr_op,
/*
TODO (makdharma): Break down this function in smaller chunks for readability.
*/
-static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
- struct op_and_state *oas) {
+static enum e_op_result execute_stream_op(struct op_and_state *oas) {
grpc_transport_stream_op_batch *stream_op = &oas->op;
struct stream_obj *s = oas->s;
grpc_cronet_transport *t = (grpc_cronet_transport *)s->curr_ct;
@@ -1040,15 +1032,14 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
grpc_slice slice;
grpc_slice_buffer_init(&write_slice_buffer);
if (1 != grpc_byte_stream_next(
- exec_ctx, stream_op->payload->send_message.send_message,
+ stream_op->payload->send_message.send_message,
stream_op->payload->send_message.send_message->length,
NULL)) {
/* Should never reach here */
GPR_ASSERT(false);
}
if (GRPC_ERROR_NONE !=
- grpc_byte_stream_pull(exec_ctx,
- stream_op->payload->send_message.send_message,
+ grpc_byte_stream_pull(stream_op->payload->send_message.send_message,
&slice)) {
/* Should never reach here */
GPR_ASSERT(false);
@@ -1061,15 +1052,15 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
}
if (write_slice_buffer.count > 0) {
size_t write_buffer_size;
- create_grpc_frame(exec_ctx, &write_slice_buffer,
- &stream_state->ws.write_buffer, &write_buffer_size,
+ create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer,
+ &write_buffer_size,
stream_op->payload->send_message.send_message->flags);
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, %p)", s->cbs,
stream_state->ws.write_buffer);
stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer,
(int)write_buffer_size, false);
- grpc_slice_buffer_destroy_internal(exec_ctx, &write_slice_buffer);
+ grpc_slice_buffer_destroy_internal(&write_slice_buffer);
if (t->use_packet_coalescing) {
if (!stream_op->send_trailing_metadata) {
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
@@ -1112,25 +1103,21 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas);
if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
GRPC_CLOSURE_SCHED(
- exec_ctx,
stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
GRPC_ERROR_NONE);
} else if (stream_state->state_callback_received[OP_FAILED]) {
GRPC_CLOSURE_SCHED(
- exec_ctx,
stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
GRPC_ERROR_NONE);
} else if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
GRPC_CLOSURE_SCHED(
- exec_ctx,
stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
GRPC_ERROR_NONE);
} else {
grpc_chttp2_incoming_metadata_buffer_publish(
- exec_ctx, &oas->s->state.rs.initial_metadata,
+ &oas->s->state.rs.initial_metadata,
stream_op->payload->recv_initial_metadata.recv_initial_metadata);
GRPC_CLOSURE_SCHED(
- exec_ctx,
stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
GRPC_ERROR_NONE);
}
@@ -1141,16 +1128,14 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas);
if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
CRONET_LOG(GPR_DEBUG, "Stream is cancelled.");
- GRPC_CLOSURE_SCHED(exec_ctx,
- stream_op->payload->recv_message.recv_message_ready,
+ GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_state->state_callback_received[OP_FAILED]) {
CRONET_LOG(GPR_DEBUG, "Stream failed.");
- GRPC_CLOSURE_SCHED(exec_ctx,
- stream_op->payload->recv_message.recv_message_ready,
+ GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
@@ -1158,16 +1143,14 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
} else if (stream_state->rs.read_stream_closed == true) {
/* No more data will be received */
CRONET_LOG(GPR_DEBUG, "read stream closed");
- GRPC_CLOSURE_SCHED(exec_ctx,
- stream_op->payload->recv_message.recv_message_ready,
+ GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_state->flush_read) {
CRONET_LOG(GPR_DEBUG, "flush read");
- GRPC_CLOSURE_SCHED(exec_ctx,
- stream_op->payload->recv_message.recv_message_ready,
+ GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
@@ -1200,7 +1183,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
CRONET_LOG(GPR_DEBUG, "read operation complete. Empty response.");
/* Clean up read_slice_buffer in case there is unread data. */
grpc_slice_buffer_destroy_internal(
- exec_ctx, &stream_state->rs.read_slice_buffer);
+ &stream_state->rs.read_slice_buffer);
grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
grpc_slice_buffer_stream_init(&stream_state->rs.sbs,
&stream_state->rs.read_slice_buffer, 0);
@@ -1211,7 +1194,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
stream_op->payload->recv_message.recv_message) =
(grpc_byte_buffer *)&stream_state->rs.sbs;
GRPC_CLOSURE_SCHED(
- exec_ctx, stream_op->payload->recv_message.recv_message_ready,
+ stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
@@ -1255,8 +1238,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
(size_t)stream_state->rs.length_field);
null_and_maybe_free_read_buffer(s);
/* Clean up read_slice_buffer in case there is unread data. */
- grpc_slice_buffer_destroy_internal(exec_ctx,
- &stream_state->rs.read_slice_buffer);
+ grpc_slice_buffer_destroy_internal(&stream_state->rs.read_slice_buffer);
grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
grpc_slice_buffer_add(&stream_state->rs.read_slice_buffer,
read_data_slice);
@@ -1267,8 +1249,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
}
*((grpc_byte_buffer **)stream_op->payload->recv_message.recv_message) =
(grpc_byte_buffer *)&stream_state->rs.sbs;
- GRPC_CLOSURE_SCHED(exec_ctx,
- stream_op->payload->recv_message.recv_message_ready,
+ GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
@@ -1291,7 +1272,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_TRAILING_METADATA", oas);
if (oas->s->state.rs.trailing_metadata_valid) {
grpc_chttp2_incoming_metadata_buffer_publish(
- exec_ctx, &oas->s->state.rs.trailing_metadata,
+ &oas->s->state.rs.trailing_metadata,
stream_op->payload->recv_trailing_metadata.recv_trailing_metadata);
stream_state->rs.trailing_metadata_valid = false;
}
@@ -1316,17 +1297,17 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
op_can_be_run(stream_op, s, &oas->state, OP_ON_COMPLETE)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_ON_COMPLETE", oas);
if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
- GRPC_CLOSURE_SCHED(exec_ctx, stream_op->on_complete,
+ GRPC_CLOSURE_SCHED(stream_op->on_complete,
GRPC_ERROR_REF(stream_state->cancel_error));
} else if (stream_state->state_callback_received[OP_FAILED]) {
GRPC_CLOSURE_SCHED(
- exec_ctx, stream_op->on_complete,
+ stream_op->on_complete,
make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."));
} else {
/* All actions in this stream_op are complete. Call the on_complete
* callback
*/
- GRPC_CLOSURE_SCHED(exec_ctx, stream_op->on_complete, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(stream_op->on_complete, GRPC_ERROR_NONE);
}
oas->state.state_op_done[OP_ON_COMPLETE] = true;
oas->done = true;
@@ -1351,9 +1332,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
Functions used by upper layers to access transport functionality.
*/
-static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
- grpc_stream *gs, grpc_stream_refcount *refcount,
- const void *server_data, gpr_arena *arena) {
+static int init_stream(grpc_transport *gt, grpc_stream *gs,
+ grpc_stream_refcount *refcount, const void *server_data,
+ gpr_arena *arena) {
stream_obj *s = (stream_obj *)gs;
s->refcount = refcount;
@@ -1384,15 +1365,13 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
return 0;
}
-static void set_pollset_do_nothing(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
- grpc_stream *gs, grpc_pollset *pollset) {}
+static void set_pollset_do_nothing(grpc_transport *gt, grpc_stream *gs,
+ grpc_pollset *pollset) {}
-static void set_pollset_set_do_nothing(grpc_exec_ctx *exec_ctx,
- grpc_transport *gt, grpc_stream *gs,
+static void set_pollset_set_do_nothing(grpc_transport *gt, grpc_stream *gs,
grpc_pollset_set *pollset_set) {}
-static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
- grpc_stream *gs,
+static void perform_stream_op(grpc_transport *gt, grpc_stream *gs,
grpc_transport_stream_op_batch *op) {
CRONET_LOG(GPR_DEBUG, "perform_stream_op");
if (op->send_initial_metadata &&
@@ -1402,42 +1381,36 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
this field is present in metadata */
if (op->recv_initial_metadata) {
GRPC_CLOSURE_SCHED(
- exec_ctx,
op->payload->recv_initial_metadata.recv_initial_metadata_ready,
GRPC_ERROR_CANCELLED);
}
if (op->recv_message) {
- GRPC_CLOSURE_SCHED(exec_ctx, op->payload->recv_message.recv_message_ready,
+ GRPC_CLOSURE_SCHED(op->payload->recv_message.recv_message_ready,
GRPC_ERROR_CANCELLED);
}
- GRPC_CLOSURE_SCHED(exec_ctx, op->on_complete, GRPC_ERROR_CANCELLED);
+ GRPC_CLOSURE_SCHED(op->on_complete, GRPC_ERROR_CANCELLED);
return;
}
stream_obj *s = (stream_obj *)gs;
add_to_storage(s, op);
- execute_from_storage(exec_ctx, s);
+ execute_from_storage(s);
}
-static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
- grpc_stream *gs,
+static void destroy_stream(grpc_transport *gt, grpc_stream *gs,
grpc_closure *then_schedule_closure) {
stream_obj *s = (stream_obj *)gs;
null_and_maybe_free_read_buffer(s);
/* Clean up read_slice_buffer in case there is unread data. */
- grpc_slice_buffer_destroy_internal(exec_ctx, &s->state.rs.read_slice_buffer);
+ grpc_slice_buffer_destroy_internal(&s->state.rs.read_slice_buffer);
GRPC_ERROR_UNREF(s->state.cancel_error);
- GRPC_CLOSURE_SCHED(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
}
-static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {}
+static void destroy_transport(grpc_transport *gt) {}
-static grpc_endpoint *get_endpoint(grpc_exec_ctx *exec_ctx,
- grpc_transport *gt) {
- return NULL;
-}
+static grpc_endpoint *get_endpoint(grpc_transport *gt) { return NULL; }
-static void perform_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
- grpc_transport_op *op) {}
+static void perform_op(grpc_transport *gt, grpc_transport_op *op) {}
static const grpc_transport_vtable grpc_cronet_vtable = {
sizeof(stream_obj),