aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/transport/cronet/transport
diff options
context:
space:
mode:
authorGravatar Yash Tibrewal <yashkt@google.com>2017-12-06 09:47:54 -0800
committerGravatar GitHub <noreply@github.com>2017-12-06 09:47:54 -0800
commit8cf1470a51ea276ca84825e7495d4ee24743540d (patch)
tree72385cc865094115bc08cb813201d48cb09840bb /src/core/ext/transport/cronet/transport
parent1d4e99508409be052bd129ba507bae1fbe7eb7fa (diff)
Revert "Revert "All instances of exec_ctx being passed around in src/core removed""
Diffstat (limited to 'src/core/ext/transport/cronet/transport')
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.cc173
1 files changed, 69 insertions, 104 deletions
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.cc b/src/core/ext/transport/cronet/transport/cronet_transport.cc
index 4d24efe47b..c9fd94176b 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.cc
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.cc
@@ -197,27 +197,23 @@ typedef struct stream_obj stream_obj;
#ifndef NDEBUG
#define GRPC_CRONET_STREAM_REF(stream, reason) \
grpc_cronet_stream_ref((stream), (reason))
-#define GRPC_CRONET_STREAM_UNREF(exec_ctx, stream, reason) \
- grpc_cronet_stream_unref((exec_ctx), (stream), (reason))
+#define GRPC_CRONET_STREAM_UNREF(stream, reason) \
+ grpc_cronet_stream_unref((stream), (reason))
void grpc_cronet_stream_ref(stream_obj* s, const char* reason) {
grpc_stream_ref(s->refcount, reason);
}
-void grpc_cronet_stream_unref(grpc_exec_ctx* exec_ctx, stream_obj* s,
- const char* reason) {
- grpc_stream_unref(exec_ctx, s->refcount, reason);
+void grpc_cronet_stream_unref(stream_obj* s, const char* reason) {
+ grpc_stream_unref(s->refcount, reason);
}
#else
#define GRPC_CRONET_STREAM_REF(stream, reason) grpc_cronet_stream_ref((stream))
-#define GRPC_CRONET_STREAM_UNREF(exec_ctx, stream, reason) \
- grpc_cronet_stream_unref((exec_ctx), (stream))
+#define GRPC_CRONET_STREAM_UNREF(stream, reason) \
+ grpc_cronet_stream_unref((stream))
void grpc_cronet_stream_ref(stream_obj* s) { grpc_stream_ref(s->refcount); }
-void grpc_cronet_stream_unref(grpc_exec_ctx* exec_ctx, stream_obj* s) {
- grpc_stream_unref(exec_ctx, s->refcount);
-}
+void grpc_cronet_stream_unref(stream_obj* s) { grpc_stream_unref(s->refcount); }
#endif
-static enum e_op_result execute_stream_op(grpc_exec_ctx* exec_ctx,
- struct op_and_state* oas);
+static enum e_op_result execute_stream_op(struct op_and_state* oas);
/*
Utility function to translate enum into string for printing
@@ -373,12 +369,12 @@ static void remove_from_storage(struct stream_obj* s,
This can get executed from the Cronet network thread via cronet callback
or on the application supplied thread via the perform_stream_op function.
*/
-static void execute_from_storage(grpc_exec_ctx* exec_ctx, stream_obj* s) {
+static void execute_from_storage(stream_obj* s) {
gpr_mu_lock(&s->mu);
for (struct op_and_state* curr = s->storage.head; curr != nullptr;) {
CRONET_LOG(GPR_DEBUG, "calling op at %p. done = %d", curr, curr->done);
GPR_ASSERT(curr->done == 0);
- enum e_op_result result = execute_stream_op(exec_ctx, curr);
+ enum e_op_result result = execute_stream_op(curr);
CRONET_LOG(GPR_DEBUG, "execute_stream_op[%p] returns %s", curr,
op_result_string(result));
/* if this op is done, then remove it and free memory */
@@ -402,7 +398,7 @@ static void execute_from_storage(grpc_exec_ctx* exec_ctx, stream_obj* s) {
*/
static void on_failed(bidirectional_stream* stream, int net_error) {
CRONET_LOG(GPR_DEBUG, "on_failed(%p, %d)", stream, net_error);
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_core::ExecCtx exec_ctx;
stream_obj* s = (stream_obj*)stream->annotation;
gpr_mu_lock(&s->mu);
@@ -419,9 +415,8 @@ static void on_failed(bidirectional_stream* stream, int net_error) {
}
null_and_maybe_free_read_buffer(s);
gpr_mu_unlock(&s->mu);
- execute_from_storage(&exec_ctx, s);
- GRPC_CRONET_STREAM_UNREF(&exec_ctx, s, "cronet transport");
- grpc_exec_ctx_finish(&exec_ctx);
+ execute_from_storage(s);
+ GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
}
/*
@@ -429,7 +424,7 @@ static void on_failed(bidirectional_stream* stream, int net_error) {
*/
static void on_canceled(bidirectional_stream* stream) {
CRONET_LOG(GPR_DEBUG, "on_canceled(%p)", stream);
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_core::ExecCtx exec_ctx;
stream_obj* s = (stream_obj*)stream->annotation;
gpr_mu_lock(&s->mu);
@@ -446,9 +441,8 @@ static void on_canceled(bidirectional_stream* stream) {
}
null_and_maybe_free_read_buffer(s);
gpr_mu_unlock(&s->mu);
- execute_from_storage(&exec_ctx, s);
- GRPC_CRONET_STREAM_UNREF(&exec_ctx, s, "cronet transport");
- grpc_exec_ctx_finish(&exec_ctx);
+ execute_from_storage(s);
+ GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
}
/*
@@ -456,7 +450,7 @@ static void on_canceled(bidirectional_stream* stream) {
*/
static void on_succeeded(bidirectional_stream* stream) {
CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream);
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_core::ExecCtx exec_ctx;
stream_obj* s = (stream_obj*)stream->annotation;
gpr_mu_lock(&s->mu);
@@ -465,9 +459,8 @@ static void on_succeeded(bidirectional_stream* stream) {
s->cbs = nullptr;
null_and_maybe_free_read_buffer(s);
gpr_mu_unlock(&s->mu);
- execute_from_storage(&exec_ctx, s);
- GRPC_CRONET_STREAM_UNREF(&exec_ctx, s, "cronet transport");
- grpc_exec_ctx_finish(&exec_ctx);
+ execute_from_storage(s);
+ GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
}
/*
@@ -475,7 +468,7 @@ static void on_succeeded(bidirectional_stream* stream) {
*/
static void on_stream_ready(bidirectional_stream* stream) {
CRONET_LOG(GPR_DEBUG, "W: on_stream_ready(%p)", stream);
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_core::ExecCtx exec_ctx;
stream_obj* s = (stream_obj*)stream->annotation;
grpc_cronet_transport* t = (grpc_cronet_transport*)s->curr_ct;
gpr_mu_lock(&s->mu);
@@ -495,8 +488,7 @@ static void on_stream_ready(bidirectional_stream* stream) {
}
}
gpr_mu_unlock(&s->mu);
- execute_from_storage(&exec_ctx, s);
- grpc_exec_ctx_finish(&exec_ctx);
+ execute_from_storage(s);
}
/*
@@ -506,7 +498,7 @@ static void on_response_headers_received(
bidirectional_stream* stream,
const bidirectional_stream_header_array* headers,
const char* negotiated_protocol) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_core::ExecCtx exec_ctx;
CRONET_LOG(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream,
headers, negotiated_protocol);
stream_obj* s = (stream_obj*)stream->annotation;
@@ -528,9 +520,8 @@ static void on_response_headers_received(
for (size_t i = 0; i < headers->count; i++) {
GRPC_LOG_IF_ERROR("on_response_headers_received",
grpc_chttp2_incoming_metadata_buffer_add(
- &exec_ctx, &s->state.rs.initial_metadata,
+ &s->state.rs.initial_metadata,
grpc_mdelem_from_slices(
- &exec_ctx,
grpc_slice_intern(grpc_slice_from_static_string(
headers->headers[i].key)),
grpc_slice_intern(grpc_slice_from_static_string(
@@ -552,15 +543,14 @@ static void on_response_headers_received(
s->state.pending_read_from_cronet = true;
}
gpr_mu_unlock(&s->mu);
- execute_from_storage(&exec_ctx, s);
- grpc_exec_ctx_finish(&exec_ctx);
+ execute_from_storage(s);
}
/*
Cronet callback
*/
static void on_write_completed(bidirectional_stream* stream, const char* data) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_core::ExecCtx exec_ctx;
stream_obj* s = (stream_obj*)stream->annotation;
CRONET_LOG(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data);
gpr_mu_lock(&s->mu);
@@ -570,8 +560,7 @@ static void on_write_completed(bidirectional_stream* stream, const char* data) {
}
s->state.state_callback_received[OP_SEND_MESSAGE] = true;
gpr_mu_unlock(&s->mu);
- execute_from_storage(&exec_ctx, s);
- grpc_exec_ctx_finish(&exec_ctx);
+ execute_from_storage(s);
}
/*
@@ -579,7 +568,7 @@ static void on_write_completed(bidirectional_stream* stream, const char* data) {
*/
static void on_read_completed(bidirectional_stream* stream, char* data,
int count) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_core::ExecCtx exec_ctx;
stream_obj* s = (stream_obj*)stream->annotation;
CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data,
count);
@@ -605,15 +594,14 @@ static void on_read_completed(bidirectional_stream* stream, char* data,
gpr_mu_unlock(&s->mu);
} else {
gpr_mu_unlock(&s->mu);
- execute_from_storage(&exec_ctx, s);
+ execute_from_storage(s);
}
} else {
null_and_maybe_free_read_buffer(s);
s->state.rs.read_stream_closed = true;
gpr_mu_unlock(&s->mu);
- execute_from_storage(&exec_ctx, s);
+ execute_from_storage(s);
}
- grpc_exec_ctx_finish(&exec_ctx);
}
/*
@@ -622,7 +610,7 @@ static void on_read_completed(bidirectional_stream* stream, char* data,
static void on_response_trailers_received(
bidirectional_stream* stream,
const bidirectional_stream_header_array* trailers) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_core::ExecCtx exec_ctx;
CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream,
trailers);
stream_obj* s = (stream_obj*)stream->annotation;
@@ -638,9 +626,8 @@ static void on_response_trailers_received(
trailers->headers[i].value);
GRPC_LOG_IF_ERROR("on_response_trailers_received",
grpc_chttp2_incoming_metadata_buffer_add(
- &exec_ctx, &s->state.rs.trailing_metadata,
+ &s->state.rs.trailing_metadata,
grpc_mdelem_from_slices(
- &exec_ctx,
grpc_slice_intern(grpc_slice_from_static_string(
trailers->headers[i].key)),
grpc_slice_intern(grpc_slice_from_static_string(
@@ -670,17 +657,15 @@ static void on_response_trailers_received(
gpr_mu_unlock(&s->mu);
} else {
gpr_mu_unlock(&s->mu);
- execute_from_storage(&exec_ctx, s);
+ execute_from_storage(s);
}
- grpc_exec_ctx_finish(&exec_ctx);
}
/*
Utility function that takes the data from s->write_slice_buffer and assembles
into a contiguous byte stream with 5 byte gRPC header prepended.
*/
-static void create_grpc_frame(grpc_exec_ctx* exec_ctx,
- grpc_slice_buffer* write_slice_buffer,
+static void create_grpc_frame(grpc_slice_buffer* write_slice_buffer,
char** pp_write_buffer,
size_t* p_write_buffer_size, uint32_t flags) {
grpc_slice slice = grpc_slice_buffer_take_first(write_slice_buffer);
@@ -700,7 +685,7 @@ static void create_grpc_frame(grpc_exec_ctx* exec_ctx,
*p++ = (uint8_t)(length);
/* append actual data */
memcpy(p, GRPC_SLICE_START_PTR(slice), length);
- grpc_slice_unref_internal(exec_ctx, slice);
+ grpc_slice_unref_internal(slice);
}
/*
@@ -981,8 +966,7 @@ static bool op_can_be_run(grpc_transport_stream_op_batch* curr_op,
/*
TODO (makdharma): Break down this function in smaller chunks for readability.
*/
-static enum e_op_result execute_stream_op(grpc_exec_ctx* exec_ctx,
- struct op_and_state* oas) {
+static enum e_op_result execute_stream_op(struct op_and_state* oas) {
grpc_transport_stream_op_batch* stream_op = &oas->op;
struct stream_obj* s = oas->s;
grpc_cronet_transport* t = (grpc_cronet_transport*)s->curr_ct;
@@ -1040,15 +1024,14 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx* exec_ctx,
grpc_slice slice;
grpc_slice_buffer_init(&write_slice_buffer);
if (1 != grpc_byte_stream_next(
- exec_ctx, stream_op->payload->send_message.send_message,
+ stream_op->payload->send_message.send_message,
stream_op->payload->send_message.send_message->length,
nullptr)) {
/* Should never reach here */
GPR_ASSERT(false);
}
if (GRPC_ERROR_NONE !=
- grpc_byte_stream_pull(exec_ctx,
- stream_op->payload->send_message.send_message,
+ grpc_byte_stream_pull(stream_op->payload->send_message.send_message,
&slice)) {
/* Should never reach here */
GPR_ASSERT(false);
@@ -1061,15 +1044,15 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx* exec_ctx,
}
if (write_slice_buffer.count > 0) {
size_t write_buffer_size;
- create_grpc_frame(exec_ctx, &write_slice_buffer,
- &stream_state->ws.write_buffer, &write_buffer_size,
+ create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer,
+ &write_buffer_size,
stream_op->payload->send_message.send_message->flags);
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, %p)", s->cbs,
stream_state->ws.write_buffer);
stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer,
(int)write_buffer_size, false);
- grpc_slice_buffer_destroy_internal(exec_ctx, &write_slice_buffer);
+ grpc_slice_buffer_destroy_internal(&write_slice_buffer);
if (t->use_packet_coalescing) {
if (!stream_op->send_trailing_metadata) {
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
@@ -1112,25 +1095,21 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx* exec_ctx,
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas);
if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
GRPC_CLOSURE_SCHED(
- exec_ctx,
stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
GRPC_ERROR_NONE);
} else if (stream_state->state_callback_received[OP_FAILED]) {
GRPC_CLOSURE_SCHED(
- exec_ctx,
stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
GRPC_ERROR_NONE);
} else if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
GRPC_CLOSURE_SCHED(
- exec_ctx,
stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
GRPC_ERROR_NONE);
} else {
grpc_chttp2_incoming_metadata_buffer_publish(
- exec_ctx, &oas->s->state.rs.initial_metadata,
+ &oas->s->state.rs.initial_metadata,
stream_op->payload->recv_initial_metadata.recv_initial_metadata);
GRPC_CLOSURE_SCHED(
- exec_ctx,
stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
GRPC_ERROR_NONE);
}
@@ -1141,16 +1120,14 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx* exec_ctx,
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas);
if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
CRONET_LOG(GPR_DEBUG, "Stream is cancelled.");
- GRPC_CLOSURE_SCHED(exec_ctx,
- stream_op->payload->recv_message.recv_message_ready,
+ GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_state->state_callback_received[OP_FAILED]) {
CRONET_LOG(GPR_DEBUG, "Stream failed.");
- GRPC_CLOSURE_SCHED(exec_ctx,
- stream_op->payload->recv_message.recv_message_ready,
+ GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
@@ -1158,16 +1135,14 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx* exec_ctx,
} else if (stream_state->rs.read_stream_closed == true) {
/* No more data will be received */
CRONET_LOG(GPR_DEBUG, "read stream closed");
- GRPC_CLOSURE_SCHED(exec_ctx,
- stream_op->payload->recv_message.recv_message_ready,
+ GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_state->flush_read) {
CRONET_LOG(GPR_DEBUG, "flush read");
- GRPC_CLOSURE_SCHED(exec_ctx,
- stream_op->payload->recv_message.recv_message_ready,
+ GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
@@ -1200,7 +1175,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx* exec_ctx,
CRONET_LOG(GPR_DEBUG, "read operation complete. Empty response.");
/* Clean up read_slice_buffer in case there is unread data. */
grpc_slice_buffer_destroy_internal(
- exec_ctx, &stream_state->rs.read_slice_buffer);
+ &stream_state->rs.read_slice_buffer);
grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
grpc_slice_buffer_stream_init(&stream_state->rs.sbs,
&stream_state->rs.read_slice_buffer, 0);
@@ -1210,7 +1185,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx* exec_ctx,
*((grpc_byte_buffer**)stream_op->payload->recv_message.recv_message) =
(grpc_byte_buffer*)&stream_state->rs.sbs;
GRPC_CLOSURE_SCHED(
- exec_ctx, stream_op->payload->recv_message.recv_message_ready,
+ stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
@@ -1254,8 +1229,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx* exec_ctx,
(size_t)stream_state->rs.length_field);
null_and_maybe_free_read_buffer(s);
/* Clean up read_slice_buffer in case there is unread data. */
- grpc_slice_buffer_destroy_internal(exec_ctx,
- &stream_state->rs.read_slice_buffer);
+ grpc_slice_buffer_destroy_internal(&stream_state->rs.read_slice_buffer);
grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
grpc_slice_buffer_add(&stream_state->rs.read_slice_buffer,
read_data_slice);
@@ -1266,8 +1240,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx* exec_ctx,
}
*((grpc_byte_buffer**)stream_op->payload->recv_message.recv_message) =
(grpc_byte_buffer*)&stream_state->rs.sbs;
- GRPC_CLOSURE_SCHED(exec_ctx,
- stream_op->payload->recv_message.recv_message_ready,
+ GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
@@ -1290,7 +1263,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx* exec_ctx,
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_TRAILING_METADATA", oas);
if (oas->s->state.rs.trailing_metadata_valid) {
grpc_chttp2_incoming_metadata_buffer_publish(
- exec_ctx, &oas->s->state.rs.trailing_metadata,
+ &oas->s->state.rs.trailing_metadata,
stream_op->payload->recv_trailing_metadata.recv_trailing_metadata);
stream_state->rs.trailing_metadata_valid = false;
}
@@ -1315,17 +1288,17 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx* exec_ctx,
op_can_be_run(stream_op, s, &oas->state, OP_ON_COMPLETE)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_ON_COMPLETE", oas);
if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
- GRPC_CLOSURE_SCHED(exec_ctx, stream_op->on_complete,
+ GRPC_CLOSURE_SCHED(stream_op->on_complete,
GRPC_ERROR_REF(stream_state->cancel_error));
} else if (stream_state->state_callback_received[OP_FAILED]) {
GRPC_CLOSURE_SCHED(
- exec_ctx, stream_op->on_complete,
+ stream_op->on_complete,
make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."));
} else {
/* All actions in this stream_op are complete. Call the on_complete
* callback
*/
- GRPC_CLOSURE_SCHED(exec_ctx, stream_op->on_complete, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(stream_op->on_complete, GRPC_ERROR_NONE);
}
oas->state.state_op_done[OP_ON_COMPLETE] = true;
oas->done = true;
@@ -1350,9 +1323,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx* exec_ctx,
Functions used by upper layers to access transport functionality.
*/
-static int init_stream(grpc_exec_ctx* exec_ctx, grpc_transport* gt,
- grpc_stream* gs, grpc_stream_refcount* refcount,
- const void* server_data, gpr_arena* arena) {
+static int init_stream(grpc_transport* gt, grpc_stream* gs,
+ grpc_stream_refcount* refcount, const void* server_data,
+ gpr_arena* arena) {
stream_obj* s = (stream_obj*)gs;
s->refcount = refcount;
@@ -1383,15 +1356,13 @@ static int init_stream(grpc_exec_ctx* exec_ctx, grpc_transport* gt,
return 0;
}
-static void set_pollset_do_nothing(grpc_exec_ctx* exec_ctx, grpc_transport* gt,
- grpc_stream* gs, grpc_pollset* pollset) {}
+static void set_pollset_do_nothing(grpc_transport* gt, grpc_stream* gs,
+ grpc_pollset* pollset) {}
-static void set_pollset_set_do_nothing(grpc_exec_ctx* exec_ctx,
- grpc_transport* gt, grpc_stream* gs,
+static void set_pollset_set_do_nothing(grpc_transport* gt, grpc_stream* gs,
grpc_pollset_set* pollset_set) {}
-static void perform_stream_op(grpc_exec_ctx* exec_ctx, grpc_transport* gt,
- grpc_stream* gs,
+static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
grpc_transport_stream_op_batch* op) {
CRONET_LOG(GPR_DEBUG, "perform_stream_op");
if (op->send_initial_metadata &&
@@ -1401,42 +1372,36 @@ static void perform_stream_op(grpc_exec_ctx* exec_ctx, grpc_transport* gt,
this field is present in metadata */
if (op->recv_initial_metadata) {
GRPC_CLOSURE_SCHED(
- exec_ctx,
op->payload->recv_initial_metadata.recv_initial_metadata_ready,
GRPC_ERROR_CANCELLED);
}
if (op->recv_message) {
- GRPC_CLOSURE_SCHED(exec_ctx, op->payload->recv_message.recv_message_ready,
+ GRPC_CLOSURE_SCHED(op->payload->recv_message.recv_message_ready,
GRPC_ERROR_CANCELLED);
}
- GRPC_CLOSURE_SCHED(exec_ctx, op->on_complete, GRPC_ERROR_CANCELLED);
+ GRPC_CLOSURE_SCHED(op->on_complete, GRPC_ERROR_CANCELLED);
return;
}
stream_obj* s = (stream_obj*)gs;
add_to_storage(s, op);
- execute_from_storage(exec_ctx, s);
+ execute_from_storage(s);
}
-static void destroy_stream(grpc_exec_ctx* exec_ctx, grpc_transport* gt,
- grpc_stream* gs,
+static void destroy_stream(grpc_transport* gt, grpc_stream* gs,
grpc_closure* then_schedule_closure) {
stream_obj* s = (stream_obj*)gs;
null_and_maybe_free_read_buffer(s);
/* Clean up read_slice_buffer in case there is unread data. */
- grpc_slice_buffer_destroy_internal(exec_ctx, &s->state.rs.read_slice_buffer);
+ grpc_slice_buffer_destroy_internal(&s->state.rs.read_slice_buffer);
GRPC_ERROR_UNREF(s->state.cancel_error);
- GRPC_CLOSURE_SCHED(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
}
-static void destroy_transport(grpc_exec_ctx* exec_ctx, grpc_transport* gt) {}
+static void destroy_transport(grpc_transport* gt) {}
-static grpc_endpoint* get_endpoint(grpc_exec_ctx* exec_ctx,
- grpc_transport* gt) {
- return nullptr;
-}
+static grpc_endpoint* get_endpoint(grpc_transport* gt) { return nullptr; }
-static void perform_op(grpc_exec_ctx* exec_ctx, grpc_transport* gt,
- grpc_transport_op* op) {}
+static void perform_op(grpc_transport* gt, grpc_transport_op* op) {}
static const grpc_transport_vtable grpc_cronet_vtable = {
sizeof(stream_obj),