aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/transport/cronet/transport/cronet_transport.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/transport/cronet/transport/cronet_transport.cc')
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.cc173
1 files changed, 104 insertions, 69 deletions
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.cc b/src/core/ext/transport/cronet/transport/cronet_transport.cc
index c9fd94176b..4d24efe47b 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.cc
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.cc
@@ -197,23 +197,27 @@ typedef struct stream_obj stream_obj;
#ifndef NDEBUG
#define GRPC_CRONET_STREAM_REF(stream, reason) \
grpc_cronet_stream_ref((stream), (reason))
-#define GRPC_CRONET_STREAM_UNREF(stream, reason) \
- grpc_cronet_stream_unref((stream), (reason))
+#define GRPC_CRONET_STREAM_UNREF(exec_ctx, stream, reason) \
+ grpc_cronet_stream_unref((exec_ctx), (stream), (reason))
void grpc_cronet_stream_ref(stream_obj* s, const char* reason) {
grpc_stream_ref(s->refcount, reason);
}
-void grpc_cronet_stream_unref(stream_obj* s, const char* reason) {
- grpc_stream_unref(s->refcount, reason);
+void grpc_cronet_stream_unref(grpc_exec_ctx* exec_ctx, stream_obj* s,
+ const char* reason) {
+ grpc_stream_unref(exec_ctx, s->refcount, reason);
}
#else
#define GRPC_CRONET_STREAM_REF(stream, reason) grpc_cronet_stream_ref((stream))
-#define GRPC_CRONET_STREAM_UNREF(stream, reason) \
- grpc_cronet_stream_unref((stream))
+#define GRPC_CRONET_STREAM_UNREF(exec_ctx, stream, reason) \
+ grpc_cronet_stream_unref((exec_ctx), (stream))
void grpc_cronet_stream_ref(stream_obj* s) { grpc_stream_ref(s->refcount); }
-void grpc_cronet_stream_unref(stream_obj* s) { grpc_stream_unref(s->refcount); }
+void grpc_cronet_stream_unref(grpc_exec_ctx* exec_ctx, stream_obj* s) {
+ grpc_stream_unref(exec_ctx, s->refcount);
+}
#endif
-static enum e_op_result execute_stream_op(struct op_and_state* oas);
+static enum e_op_result execute_stream_op(grpc_exec_ctx* exec_ctx,
+ struct op_and_state* oas);
/*
Utility function to translate enum into string for printing
@@ -369,12 +373,12 @@ static void remove_from_storage(struct stream_obj* s,
This can get executed from the Cronet network thread via cronet callback
or on the application supplied thread via the perform_stream_op function.
*/
-static void execute_from_storage(stream_obj* s) {
+static void execute_from_storage(grpc_exec_ctx* exec_ctx, stream_obj* s) {
gpr_mu_lock(&s->mu);
for (struct op_and_state* curr = s->storage.head; curr != nullptr;) {
CRONET_LOG(GPR_DEBUG, "calling op at %p. done = %d", curr, curr->done);
GPR_ASSERT(curr->done == 0);
- enum e_op_result result = execute_stream_op(curr);
+ enum e_op_result result = execute_stream_op(exec_ctx, curr);
CRONET_LOG(GPR_DEBUG, "execute_stream_op[%p] returns %s", curr,
op_result_string(result));
/* if this op is done, then remove it and free memory */
@@ -398,7 +402,7 @@ static void execute_from_storage(stream_obj* s) {
*/
static void on_failed(bidirectional_stream* stream, int net_error) {
CRONET_LOG(GPR_DEBUG, "on_failed(%p, %d)", stream, net_error);
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
stream_obj* s = (stream_obj*)stream->annotation;
gpr_mu_lock(&s->mu);
@@ -415,8 +419,9 @@ static void on_failed(bidirectional_stream* stream, int net_error) {
}
null_and_maybe_free_read_buffer(s);
gpr_mu_unlock(&s->mu);
- execute_from_storage(s);
- GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
+ execute_from_storage(&exec_ctx, s);
+ GRPC_CRONET_STREAM_UNREF(&exec_ctx, s, "cronet transport");
+ grpc_exec_ctx_finish(&exec_ctx);
}
/*
@@ -424,7 +429,7 @@ static void on_failed(bidirectional_stream* stream, int net_error) {
*/
static void on_canceled(bidirectional_stream* stream) {
CRONET_LOG(GPR_DEBUG, "on_canceled(%p)", stream);
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
stream_obj* s = (stream_obj*)stream->annotation;
gpr_mu_lock(&s->mu);
@@ -441,8 +446,9 @@ static void on_canceled(bidirectional_stream* stream) {
}
null_and_maybe_free_read_buffer(s);
gpr_mu_unlock(&s->mu);
- execute_from_storage(s);
- GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
+ execute_from_storage(&exec_ctx, s);
+ GRPC_CRONET_STREAM_UNREF(&exec_ctx, s, "cronet transport");
+ grpc_exec_ctx_finish(&exec_ctx);
}
/*
@@ -450,7 +456,7 @@ static void on_canceled(bidirectional_stream* stream) {
*/
static void on_succeeded(bidirectional_stream* stream) {
CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream);
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
stream_obj* s = (stream_obj*)stream->annotation;
gpr_mu_lock(&s->mu);
@@ -459,8 +465,9 @@ static void on_succeeded(bidirectional_stream* stream) {
s->cbs = nullptr;
null_and_maybe_free_read_buffer(s);
gpr_mu_unlock(&s->mu);
- execute_from_storage(s);
- GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
+ execute_from_storage(&exec_ctx, s);
+ GRPC_CRONET_STREAM_UNREF(&exec_ctx, s, "cronet transport");
+ grpc_exec_ctx_finish(&exec_ctx);
}
/*
@@ -468,7 +475,7 @@ static void on_succeeded(bidirectional_stream* stream) {
*/
static void on_stream_ready(bidirectional_stream* stream) {
CRONET_LOG(GPR_DEBUG, "W: on_stream_ready(%p)", stream);
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
stream_obj* s = (stream_obj*)stream->annotation;
grpc_cronet_transport* t = (grpc_cronet_transport*)s->curr_ct;
gpr_mu_lock(&s->mu);
@@ -488,7 +495,8 @@ static void on_stream_ready(bidirectional_stream* stream) {
}
}
gpr_mu_unlock(&s->mu);
- execute_from_storage(s);
+ execute_from_storage(&exec_ctx, s);
+ grpc_exec_ctx_finish(&exec_ctx);
}
/*
@@ -498,7 +506,7 @@ static void on_response_headers_received(
bidirectional_stream* stream,
const bidirectional_stream_header_array* headers,
const char* negotiated_protocol) {
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
CRONET_LOG(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream,
headers, negotiated_protocol);
stream_obj* s = (stream_obj*)stream->annotation;
@@ -520,8 +528,9 @@ static void on_response_headers_received(
for (size_t i = 0; i < headers->count; i++) {
GRPC_LOG_IF_ERROR("on_response_headers_received",
grpc_chttp2_incoming_metadata_buffer_add(
- &s->state.rs.initial_metadata,
+ &exec_ctx, &s->state.rs.initial_metadata,
grpc_mdelem_from_slices(
+ &exec_ctx,
grpc_slice_intern(grpc_slice_from_static_string(
headers->headers[i].key)),
grpc_slice_intern(grpc_slice_from_static_string(
@@ -543,14 +552,15 @@ static void on_response_headers_received(
s->state.pending_read_from_cronet = true;
}
gpr_mu_unlock(&s->mu);
- execute_from_storage(s);
+ execute_from_storage(&exec_ctx, s);
+ grpc_exec_ctx_finish(&exec_ctx);
}
/*
Cronet callback
*/
static void on_write_completed(bidirectional_stream* stream, const char* data) {
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
stream_obj* s = (stream_obj*)stream->annotation;
CRONET_LOG(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data);
gpr_mu_lock(&s->mu);
@@ -560,7 +570,8 @@ static void on_write_completed(bidirectional_stream* stream, const char* data) {
}
s->state.state_callback_received[OP_SEND_MESSAGE] = true;
gpr_mu_unlock(&s->mu);
- execute_from_storage(s);
+ execute_from_storage(&exec_ctx, s);
+ grpc_exec_ctx_finish(&exec_ctx);
}
/*
@@ -568,7 +579,7 @@ static void on_write_completed(bidirectional_stream* stream, const char* data) {
*/
static void on_read_completed(bidirectional_stream* stream, char* data,
int count) {
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
stream_obj* s = (stream_obj*)stream->annotation;
CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data,
count);
@@ -594,14 +605,15 @@ static void on_read_completed(bidirectional_stream* stream, char* data,
gpr_mu_unlock(&s->mu);
} else {
gpr_mu_unlock(&s->mu);
- execute_from_storage(s);
+ execute_from_storage(&exec_ctx, s);
}
} else {
null_and_maybe_free_read_buffer(s);
s->state.rs.read_stream_closed = true;
gpr_mu_unlock(&s->mu);
- execute_from_storage(s);
+ execute_from_storage(&exec_ctx, s);
}
+ grpc_exec_ctx_finish(&exec_ctx);
}
/*
@@ -610,7 +622,7 @@ static void on_read_completed(bidirectional_stream* stream, char* data,
static void on_response_trailers_received(
bidirectional_stream* stream,
const bidirectional_stream_header_array* trailers) {
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream,
trailers);
stream_obj* s = (stream_obj*)stream->annotation;
@@ -626,8 +638,9 @@ static void on_response_trailers_received(
trailers->headers[i].value);
GRPC_LOG_IF_ERROR("on_response_trailers_received",
grpc_chttp2_incoming_metadata_buffer_add(
- &s->state.rs.trailing_metadata,
+ &exec_ctx, &s->state.rs.trailing_metadata,
grpc_mdelem_from_slices(
+ &exec_ctx,
grpc_slice_intern(grpc_slice_from_static_string(
trailers->headers[i].key)),
grpc_slice_intern(grpc_slice_from_static_string(
@@ -657,15 +670,17 @@ static void on_response_trailers_received(
gpr_mu_unlock(&s->mu);
} else {
gpr_mu_unlock(&s->mu);
- execute_from_storage(s);
+ execute_from_storage(&exec_ctx, s);
}
+ grpc_exec_ctx_finish(&exec_ctx);
}
/*
Utility function that takes the data from s->write_slice_buffer and assembles
into a contiguous byte stream with 5 byte gRPC header prepended.
*/
-static void create_grpc_frame(grpc_slice_buffer* write_slice_buffer,
+static void create_grpc_frame(grpc_exec_ctx* exec_ctx,
+ grpc_slice_buffer* write_slice_buffer,
char** pp_write_buffer,
size_t* p_write_buffer_size, uint32_t flags) {
grpc_slice slice = grpc_slice_buffer_take_first(write_slice_buffer);
@@ -685,7 +700,7 @@ static void create_grpc_frame(grpc_slice_buffer* write_slice_buffer,
*p++ = (uint8_t)(length);
/* append actual data */
memcpy(p, GRPC_SLICE_START_PTR(slice), length);
- grpc_slice_unref_internal(slice);
+ grpc_slice_unref_internal(exec_ctx, slice);
}
/*
@@ -966,7 +981,8 @@ static bool op_can_be_run(grpc_transport_stream_op_batch* curr_op,
/*
TODO (makdharma): Break down this function in smaller chunks for readability.
*/
-static enum e_op_result execute_stream_op(struct op_and_state* oas) {
+static enum e_op_result execute_stream_op(grpc_exec_ctx* exec_ctx,
+ struct op_and_state* oas) {
grpc_transport_stream_op_batch* stream_op = &oas->op;
struct stream_obj* s = oas->s;
grpc_cronet_transport* t = (grpc_cronet_transport*)s->curr_ct;
@@ -1024,14 +1040,15 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
grpc_slice slice;
grpc_slice_buffer_init(&write_slice_buffer);
if (1 != grpc_byte_stream_next(
- stream_op->payload->send_message.send_message,
+ exec_ctx, stream_op->payload->send_message.send_message,
stream_op->payload->send_message.send_message->length,
nullptr)) {
/* Should never reach here */
GPR_ASSERT(false);
}
if (GRPC_ERROR_NONE !=
- grpc_byte_stream_pull(stream_op->payload->send_message.send_message,
+ grpc_byte_stream_pull(exec_ctx,
+ stream_op->payload->send_message.send_message,
&slice)) {
/* Should never reach here */
GPR_ASSERT(false);
@@ -1044,15 +1061,15 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
}
if (write_slice_buffer.count > 0) {
size_t write_buffer_size;
- create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer,
- &write_buffer_size,
+ create_grpc_frame(exec_ctx, &write_slice_buffer,
+ &stream_state->ws.write_buffer, &write_buffer_size,
stream_op->payload->send_message.send_message->flags);
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, %p)", s->cbs,
stream_state->ws.write_buffer);
stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer,
(int)write_buffer_size, false);
- grpc_slice_buffer_destroy_internal(&write_slice_buffer);
+ grpc_slice_buffer_destroy_internal(exec_ctx, &write_slice_buffer);
if (t->use_packet_coalescing) {
if (!stream_op->send_trailing_metadata) {
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
@@ -1095,21 +1112,25 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas);
if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
GRPC_CLOSURE_SCHED(
+ exec_ctx,
stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
GRPC_ERROR_NONE);
} else if (stream_state->state_callback_received[OP_FAILED]) {
GRPC_CLOSURE_SCHED(
+ exec_ctx,
stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
GRPC_ERROR_NONE);
} else if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
GRPC_CLOSURE_SCHED(
+ exec_ctx,
stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
GRPC_ERROR_NONE);
} else {
grpc_chttp2_incoming_metadata_buffer_publish(
- &oas->s->state.rs.initial_metadata,
+ exec_ctx, &oas->s->state.rs.initial_metadata,
stream_op->payload->recv_initial_metadata.recv_initial_metadata);
GRPC_CLOSURE_SCHED(
+ exec_ctx,
stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
GRPC_ERROR_NONE);
}
@@ -1120,14 +1141,16 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas);
if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
CRONET_LOG(GPR_DEBUG, "Stream is cancelled.");
- GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
+ GRPC_CLOSURE_SCHED(exec_ctx,
+ stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_state->state_callback_received[OP_FAILED]) {
CRONET_LOG(GPR_DEBUG, "Stream failed.");
- GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
+ GRPC_CLOSURE_SCHED(exec_ctx,
+ stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
@@ -1135,14 +1158,16 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
} else if (stream_state->rs.read_stream_closed == true) {
/* No more data will be received */
CRONET_LOG(GPR_DEBUG, "read stream closed");
- GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
+ GRPC_CLOSURE_SCHED(exec_ctx,
+ stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_state->flush_read) {
CRONET_LOG(GPR_DEBUG, "flush read");
- GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
+ GRPC_CLOSURE_SCHED(exec_ctx,
+ stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
@@ -1175,7 +1200,7 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
CRONET_LOG(GPR_DEBUG, "read operation complete. Empty response.");
/* Clean up read_slice_buffer in case there is unread data. */
grpc_slice_buffer_destroy_internal(
- &stream_state->rs.read_slice_buffer);
+ exec_ctx, &stream_state->rs.read_slice_buffer);
grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
grpc_slice_buffer_stream_init(&stream_state->rs.sbs,
&stream_state->rs.read_slice_buffer, 0);
@@ -1185,7 +1210,7 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
*((grpc_byte_buffer**)stream_op->payload->recv_message.recv_message) =
(grpc_byte_buffer*)&stream_state->rs.sbs;
GRPC_CLOSURE_SCHED(
- stream_op->payload->recv_message.recv_message_ready,
+ exec_ctx, stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
@@ -1229,7 +1254,8 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
(size_t)stream_state->rs.length_field);
null_and_maybe_free_read_buffer(s);
/* Clean up read_slice_buffer in case there is unread data. */
- grpc_slice_buffer_destroy_internal(&stream_state->rs.read_slice_buffer);
+ grpc_slice_buffer_destroy_internal(exec_ctx,
+ &stream_state->rs.read_slice_buffer);
grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
grpc_slice_buffer_add(&stream_state->rs.read_slice_buffer,
read_data_slice);
@@ -1240,7 +1266,8 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
}
*((grpc_byte_buffer**)stream_op->payload->recv_message.recv_message) =
(grpc_byte_buffer*)&stream_state->rs.sbs;
- GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
+ GRPC_CLOSURE_SCHED(exec_ctx,
+ stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
@@ -1263,7 +1290,7 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_TRAILING_METADATA", oas);
if (oas->s->state.rs.trailing_metadata_valid) {
grpc_chttp2_incoming_metadata_buffer_publish(
- &oas->s->state.rs.trailing_metadata,
+ exec_ctx, &oas->s->state.rs.trailing_metadata,
stream_op->payload->recv_trailing_metadata.recv_trailing_metadata);
stream_state->rs.trailing_metadata_valid = false;
}
@@ -1288,17 +1315,17 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
op_can_be_run(stream_op, s, &oas->state, OP_ON_COMPLETE)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_ON_COMPLETE", oas);
if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
- GRPC_CLOSURE_SCHED(stream_op->on_complete,
+ GRPC_CLOSURE_SCHED(exec_ctx, stream_op->on_complete,
GRPC_ERROR_REF(stream_state->cancel_error));
} else if (stream_state->state_callback_received[OP_FAILED]) {
GRPC_CLOSURE_SCHED(
- stream_op->on_complete,
+ exec_ctx, stream_op->on_complete,
make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."));
} else {
/* All actions in this stream_op are complete. Call the on_complete
* callback
*/
- GRPC_CLOSURE_SCHED(stream_op->on_complete, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, stream_op->on_complete, GRPC_ERROR_NONE);
}
oas->state.state_op_done[OP_ON_COMPLETE] = true;
oas->done = true;
@@ -1323,9 +1350,9 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
Functions used by upper layers to access transport functionality.
*/
-static int init_stream(grpc_transport* gt, grpc_stream* gs,
- grpc_stream_refcount* refcount, const void* server_data,
- gpr_arena* arena) {
+static int init_stream(grpc_exec_ctx* exec_ctx, grpc_transport* gt,
+ grpc_stream* gs, grpc_stream_refcount* refcount,
+ const void* server_data, gpr_arena* arena) {
stream_obj* s = (stream_obj*)gs;
s->refcount = refcount;
@@ -1356,13 +1383,15 @@ static int init_stream(grpc_transport* gt, grpc_stream* gs,
return 0;
}
-static void set_pollset_do_nothing(grpc_transport* gt, grpc_stream* gs,
- grpc_pollset* pollset) {}
+static void set_pollset_do_nothing(grpc_exec_ctx* exec_ctx, grpc_transport* gt,
+ grpc_stream* gs, grpc_pollset* pollset) {}
-static void set_pollset_set_do_nothing(grpc_transport* gt, grpc_stream* gs,
+static void set_pollset_set_do_nothing(grpc_exec_ctx* exec_ctx,
+ grpc_transport* gt, grpc_stream* gs,
grpc_pollset_set* pollset_set) {}
-static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
+static void perform_stream_op(grpc_exec_ctx* exec_ctx, grpc_transport* gt,
+ grpc_stream* gs,
grpc_transport_stream_op_batch* op) {
CRONET_LOG(GPR_DEBUG, "perform_stream_op");
if (op->send_initial_metadata &&
@@ -1372,36 +1401,42 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
this field is present in metadata */
if (op->recv_initial_metadata) {
GRPC_CLOSURE_SCHED(
+ exec_ctx,
op->payload->recv_initial_metadata.recv_initial_metadata_ready,
GRPC_ERROR_CANCELLED);
}
if (op->recv_message) {
- GRPC_CLOSURE_SCHED(op->payload->recv_message.recv_message_ready,
+ GRPC_CLOSURE_SCHED(exec_ctx, op->payload->recv_message.recv_message_ready,
GRPC_ERROR_CANCELLED);
}
- GRPC_CLOSURE_SCHED(op->on_complete, GRPC_ERROR_CANCELLED);
+ GRPC_CLOSURE_SCHED(exec_ctx, op->on_complete, GRPC_ERROR_CANCELLED);
return;
}
stream_obj* s = (stream_obj*)gs;
add_to_storage(s, op);
- execute_from_storage(s);
+ execute_from_storage(exec_ctx, s);
}
-static void destroy_stream(grpc_transport* gt, grpc_stream* gs,
+static void destroy_stream(grpc_exec_ctx* exec_ctx, grpc_transport* gt,
+ grpc_stream* gs,
grpc_closure* then_schedule_closure) {
stream_obj* s = (stream_obj*)gs;
null_and_maybe_free_read_buffer(s);
/* Clean up read_slice_buffer in case there is unread data. */
- grpc_slice_buffer_destroy_internal(&s->state.rs.read_slice_buffer);
+ grpc_slice_buffer_destroy_internal(exec_ctx, &s->state.rs.read_slice_buffer);
GRPC_ERROR_UNREF(s->state.cancel_error);
- GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE);
}
-static void destroy_transport(grpc_transport* gt) {}
+static void destroy_transport(grpc_exec_ctx* exec_ctx, grpc_transport* gt) {}
-static grpc_endpoint* get_endpoint(grpc_transport* gt) { return nullptr; }
+static grpc_endpoint* get_endpoint(grpc_exec_ctx* exec_ctx,
+ grpc_transport* gt) {
+ return nullptr;
+}
-static void perform_op(grpc_transport* gt, grpc_transport_op* op) {}
+static void perform_op(grpc_exec_ctx* exec_ctx, grpc_transport* gt,
+ grpc_transport_op* op) {}
static const grpc_transport_vtable grpc_cronet_vtable = {
sizeof(stream_obj),