diff options
Diffstat (limited to 'src/core/ext/transport/cronet/transport/cronet_transport.c')
-rw-r--r-- | src/core/ext/transport/cronet/transport/cronet_transport.c | 121 |
1 files changed, 69 insertions, 52 deletions
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index 67974b0b6a..29dfa885de 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -1,33 +1,18 @@ /* * - * Copyright 2016, Google Inc. - * All rights reserved. + * Copyright 2016 gRPC authors. * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. + * http://www.apache.org/licenses/LICENSE-2.0 * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. * */ @@ -781,20 +766,50 @@ static bool op_can_be_run(grpc_transport_stream_op_batch *curr_op, bool is_canceled_or_failed = stream_state->state_op_done[OP_CANCEL_ERROR] || stream_state->state_callback_received[OP_FAILED]; if (is_canceled_or_failed) { - if (op_id == OP_SEND_INITIAL_METADATA) result = false; - if (op_id == OP_SEND_MESSAGE) result = false; - if (op_id == OP_SEND_TRAILING_METADATA) result = false; - if (op_id == OP_CANCEL_ERROR) result = false; + if (op_id == OP_SEND_INITIAL_METADATA) { + CRONET_LOG(GPR_DEBUG, "Because"); + result = false; + } + if (op_id == OP_SEND_MESSAGE) { + CRONET_LOG(GPR_DEBUG, "Because"); + result = false; + } + if (op_id == OP_SEND_TRAILING_METADATA) { + CRONET_LOG(GPR_DEBUG, "Because"); + result = false; + } + if (op_id == OP_CANCEL_ERROR) { + CRONET_LOG(GPR_DEBUG, "Because"); + result = false; + } /* already executed */ if (op_id == OP_RECV_INITIAL_METADATA && - stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) + stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) { + CRONET_LOG(GPR_DEBUG, "Because"); result = false; - if (op_id == OP_RECV_MESSAGE && - stream_state->state_op_done[OP_RECV_MESSAGE]) + } + if (op_id == OP_RECV_MESSAGE && op_state->state_op_done[OP_RECV_MESSAGE]) { + CRONET_LOG(GPR_DEBUG, "Because"); result = false; + } if (op_id == OP_RECV_TRAILING_METADATA && - stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) + stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) { + CRONET_LOG(GPR_DEBUG, "Because"); result = false; + } + /* ON_COMPLETE can be processed if one of the following conditions is met: + * 1. the stream failed + * 2. the stream is cancelled, and the callback is received + * 3. the stream succeeded before cancel is effective + * 4. the stream is cancelled, and the stream is never started */ + if (op_id == OP_ON_COMPLETE && + !(stream_state->state_callback_received[OP_FAILED] || + stream_state->state_callback_received[OP_CANCELED] || + stream_state->state_callback_received[OP_SUCCEEDED] || + !stream_state->state_op_done[OP_SEND_INITIAL_METADATA])) { + CRONET_LOG(GPR_DEBUG, "Because"); + result = false; + } } else if (op_id == OP_SEND_INITIAL_METADATA) { /* already executed */ if (stream_state->state_op_done[OP_SEND_INITIAL_METADATA]) result = false; @@ -883,7 +898,7 @@ static bool op_can_be_run(grpc_transport_stream_op_batch *curr_op, CRONET_LOG(GPR_DEBUG, "Because"); result = false; } else if (curr_op->recv_message && - !stream_state->state_op_done[OP_RECV_MESSAGE]) { + !op_state->state_op_done[OP_RECV_MESSAGE]) { CRONET_LOG(GPR_DEBUG, "Because"); result = false; } else if (curr_op->cancel_stream && @@ -1048,17 +1063,17 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, OP_RECV_INITIAL_METADATA)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas); if (stream_state->state_op_done[OP_CANCEL_ERROR]) { - grpc_closure_sched( + GRPC_CLOSURE_SCHED( exec_ctx, stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready, GRPC_ERROR_NONE); } else if (stream_state->state_callback_received[OP_FAILED]) { - grpc_closure_sched( + GRPC_CLOSURE_SCHED( exec_ctx, stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready, GRPC_ERROR_NONE); } else if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) { - grpc_closure_sched( + GRPC_CLOSURE_SCHED( exec_ctx, stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready, GRPC_ERROR_NONE); @@ -1066,7 +1081,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer_publish( exec_ctx, &oas->s->state.rs.initial_metadata, stream_op->payload->recv_initial_metadata.recv_initial_metadata); - grpc_closure_sched( + GRPC_CLOSURE_SCHED( exec_ctx, stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready, GRPC_ERROR_NONE); @@ -1078,22 +1093,24 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas); if (stream_state->state_op_done[OP_CANCEL_ERROR]) { CRONET_LOG(GPR_DEBUG, "Stream is cancelled."); - grpc_closure_sched(exec_ctx, + GRPC_CLOSURE_SCHED(exec_ctx, stream_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; + oas->state.state_op_done[OP_RECV_MESSAGE] = true; result = ACTION_TAKEN_NO_CALLBACK; } else if (stream_state->state_callback_received[OP_FAILED]) { CRONET_LOG(GPR_DEBUG, "Stream failed."); - grpc_closure_sched(exec_ctx, + GRPC_CLOSURE_SCHED(exec_ctx, stream_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; + oas->state.state_op_done[OP_RECV_MESSAGE] = true; result = ACTION_TAKEN_NO_CALLBACK; } else if (stream_state->rs.read_stream_closed == true) { /* No more data will be received */ CRONET_LOG(GPR_DEBUG, "read stream closed"); - grpc_closure_sched(exec_ctx, + GRPC_CLOSURE_SCHED(exec_ctx, stream_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; @@ -1101,7 +1118,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, result = ACTION_TAKEN_NO_CALLBACK; } else if (stream_state->flush_read) { CRONET_LOG(GPR_DEBUG, "flush read"); - grpc_closure_sched(exec_ctx, + GRPC_CLOSURE_SCHED(exec_ctx, stream_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; @@ -1142,7 +1159,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, *((grpc_byte_buffer **) stream_op->payload->recv_message.recv_message) = (grpc_byte_buffer *)&stream_state->rs.sbs; - grpc_closure_sched( + GRPC_CLOSURE_SCHED( exec_ctx, stream_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; @@ -1196,7 +1213,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, } *((grpc_byte_buffer **)stream_op->payload->recv_message.recv_message) = (grpc_byte_buffer *)&stream_state->rs.sbs; - grpc_closure_sched(exec_ctx, + GRPC_CLOSURE_SCHED(exec_ctx, stream_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; @@ -1229,8 +1246,8 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, } else if (stream_op->cancel_stream && op_can_be_run(stream_op, s, &oas->state, OP_CANCEL_ERROR)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_CANCEL_ERROR", oas); - CRONET_LOG(GPR_DEBUG, "W: bidirectional_stream_cancel(%p)", s->cbs); if (s->cbs) { + CRONET_LOG(GPR_DEBUG, "W: bidirectional_stream_cancel(%p)", s->cbs); bidirectional_stream_cancel(s->cbs); result = ACTION_TAKEN_WITH_CALLBACK; } else { @@ -1245,17 +1262,17 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, op_can_be_run(stream_op, s, &oas->state, OP_ON_COMPLETE)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_ON_COMPLETE", oas); if (stream_state->state_op_done[OP_CANCEL_ERROR]) { - grpc_closure_sched(exec_ctx, stream_op->on_complete, + GRPC_CLOSURE_SCHED(exec_ctx, stream_op->on_complete, GRPC_ERROR_REF(stream_state->cancel_error)); } else if (stream_state->state_callback_received[OP_FAILED]) { - grpc_closure_sched( + GRPC_CLOSURE_SCHED( exec_ctx, stream_op->on_complete, make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable.")); } else { /* All actions in this stream_op are complete. Call the on_complete * callback */ - grpc_closure_sched(exec_ctx, stream_op->on_complete, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, stream_op->on_complete, GRPC_ERROR_NONE); } oas->state.state_op_done[OP_ON_COMPLETE] = true; oas->done = true; @@ -1327,16 +1344,16 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, /* Cronet does not support :authority header field. We cancel the call when this field is present in metadata */ if (op->recv_initial_metadata) { - grpc_closure_sched( + GRPC_CLOSURE_SCHED( exec_ctx, op->payload->recv_initial_metadata.recv_initial_metadata_ready, GRPC_ERROR_CANCELLED); } if (op->recv_message) { - grpc_closure_sched(exec_ctx, op->payload->recv_message.recv_message_ready, + GRPC_CLOSURE_SCHED(exec_ctx, op->payload->recv_message.recv_message_ready, GRPC_ERROR_CANCELLED); } - grpc_closure_sched(exec_ctx, op->on_complete, GRPC_ERROR_CANCELLED); + GRPC_CLOSURE_SCHED(exec_ctx, op->on_complete, GRPC_ERROR_CANCELLED); return; } stream_obj *s = (stream_obj *)gs; @@ -1350,7 +1367,7 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, stream_obj *s = (stream_obj *)gs; null_and_maybe_free_read_buffer(s); GRPC_ERROR_UNREF(s->state.cancel_error); - grpc_closure_sched(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE); } static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {} |