diff options
author | 2017-01-10 03:13:30 +0000 | |
---|---|---|
committer | 2017-01-10 03:13:30 +0000 | |
commit | 217548d0f9f8b8ea671290bd990d076cc3741139 (patch) | |
tree | 0f96657c954f6ce10be2568b02e5dae7fe0bcb51 /src/core/ext/transport/cronet | |
parent | 871626ff160b6431c3b20ba906d80f4aa7edc0c1 (diff) | |
parent | a3960b98d6a1b85a1e3b89cd7797a509505fa207 (diff) |
Merge remote-tracking branch 'upstream/master' into change_cronet_interface
Diffstat (limited to 'src/core/ext/transport/cronet')
-rw-r--r-- | src/core/ext/transport/cronet/transport/cronet_transport.c | 52 |
1 files changed, 28 insertions, 24 deletions
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index 06c7e41dad..b080408565 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -426,6 +426,7 @@ static void on_response_headers_received( bidirectional_stream *stream, const bidirectional_stream_header_array *headers, const char *negotiated_protocol) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; CRONET_LOG(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream, headers, negotiated_protocol); stream_obj *s = (stream_obj *)stream->annotation; @@ -437,7 +438,7 @@ static void on_response_headers_received( grpc_chttp2_incoming_metadata_buffer_add( &s->state.rs.initial_metadata, grpc_mdelem_from_metadata_strings( - grpc_mdstr_from_string(headers->headers[i].key), + &exec_ctx, grpc_mdstr_from_string(headers->headers[i].key), grpc_mdstr_from_string(headers->headers[i].value))); } s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true; @@ -454,6 +455,7 @@ static void on_response_headers_received( s->state.rs.remaining_bytes); } gpr_mu_unlock(&s->mu); + grpc_exec_ctx_finish(&exec_ctx); execute_from_storage(s); } @@ -518,6 +520,7 @@ static void on_read_completed(bidirectional_stream *stream, char *data, static void on_response_trailers_received( bidirectional_stream *stream, const bidirectional_stream_header_array *trailers) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream, trailers); stream_obj *s = (stream_obj *)stream->annotation; @@ -532,7 +535,7 @@ static void on_response_trailers_received( grpc_chttp2_incoming_metadata_buffer_add( &s->state.rs.trailing_metadata, grpc_mdelem_from_metadata_strings( - grpc_mdstr_from_string(trailers->headers[i].key), + &exec_ctx, grpc_mdstr_from_string(trailers->headers[i].key), grpc_mdstr_from_string(trailers->headers[i].value))); s->state.rs.trailing_metadata_valid = true; if (0 == strcmp(trailers->headers[i].key, "grpc-status") && @@ -552,8 +555,10 @@ static void on_response_trailers_received( s->state.state_op_done[OP_SEND_TRAILING_METADATA] = true; gpr_mu_unlock(&s->mu); + grpc_exec_ctx_finish(&exec_ctx); } else { gpr_mu_unlock(&s->mu); + grpc_exec_ctx_finish(&exec_ctx); execute_from_storage(s); } } @@ -845,17 +850,17 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, OP_RECV_INITIAL_METADATA)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas); if (stream_state->state_op_done[OP_CANCEL_ERROR]) { - grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready, - GRPC_ERROR_CANCELLED, NULL); + grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, + GRPC_ERROR_CANCELLED); } else if (stream_state->state_callback_received[OP_FAILED]) { - grpc_exec_ctx_sched( + grpc_closure_sched( exec_ctx, stream_op->recv_initial_metadata_ready, - make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."), NULL); + make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable.")); } else { grpc_chttp2_incoming_metadata_buffer_publish( &oas->s->state.rs.initial_metadata, stream_op->recv_initial_metadata); - grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready, - GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, + GRPC_ERROR_NONE); } stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true; result = ACTION_TAKEN_NO_CALLBACK; @@ -906,22 +911,22 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas); if (stream_state->state_op_done[OP_CANCEL_ERROR]) { CRONET_LOG(GPR_DEBUG, "Stream is cancelled."); - grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready, - GRPC_ERROR_CANCELLED, NULL); + grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, + GRPC_ERROR_CANCELLED); stream_state->state_op_done[OP_RECV_MESSAGE] = true; result = ACTION_TAKEN_NO_CALLBACK; } else if (stream_state->state_callback_received[OP_FAILED]) { CRONET_LOG(GPR_DEBUG, "Stream failed."); - grpc_exec_ctx_sched( + grpc_closure_sched( exec_ctx, stream_op->recv_message_ready, - make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."), NULL); + make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable.")); stream_state->state_op_done[OP_RECV_MESSAGE] = true; result = ACTION_TAKEN_NO_CALLBACK; } else if (stream_state->rs.read_stream_closed == true) { /* No more data will be received */ CRONET_LOG(GPR_DEBUG, "read stream closed"); - grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready, - GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, + GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; result = ACTION_TAKEN_NO_CALLBACK; @@ -954,8 +959,8 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, &stream_state->rs.read_slice_buffer, 0); *((grpc_byte_buffer **)stream_op->recv_message) = (grpc_byte_buffer *)&stream_state->rs.sbs; - grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready, - GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, + GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; result = ACTION_TAKEN_NO_CALLBACK; @@ -989,8 +994,8 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, &stream_state->rs.read_slice_buffer, 0); *((grpc_byte_buffer **)stream_op->recv_message) = (grpc_byte_buffer *)&stream_state->rs.sbs; - grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready, - GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, + GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; /* Do an extra read to trigger on_succeeded() callback in case connection @@ -1050,18 +1055,17 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, OP_ON_COMPLETE)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_ON_COMPLETE", oas); if (stream_state->state_op_done[OP_CANCEL_ERROR]) { - grpc_exec_ctx_sched(exec_ctx, stream_op->on_complete, - GRPC_ERROR_REF(stream_state->cancel_error), NULL); + grpc_closure_sched(exec_ctx, stream_op->on_complete, + GRPC_ERROR_REF(stream_state->cancel_error)); } else if (stream_state->state_callback_received[OP_FAILED]) { - grpc_exec_ctx_sched( + grpc_closure_sched( exec_ctx, stream_op->on_complete, - make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."), NULL); + make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable.")); } else { /* All actions in this stream_op are complete. Call the on_complete * callback */ - grpc_exec_ctx_sched(exec_ctx, stream_op->on_complete, GRPC_ERROR_NONE, - NULL); + grpc_closure_sched(exec_ctx, stream_op->on_complete, GRPC_ERROR_NONE); } oas->state.state_op_done[OP_ON_COMPLETE] = true; oas->done = true; |