diff options
author | Muxi Yan <muxi@users.noreply.github.com> | 2017-06-26 10:12:51 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-06-26 10:12:51 -0700 |
commit | 8399ff6a176126f68f4febea08c5866498a52d51 (patch) | |
tree | a6028c22f5c91b60abd45a2619828a4ee5b1f0e6 /src | |
parent | 9a69478498232b6b42169f8a1a389b51fb4e03ec (diff) | |
parent | 366270eee58df0c59dd3c7453e5ffd0d3d9eebc5 (diff) |
Merge pull request #11584 from muxi/fix-cronet-lock-abort
Handle cancel correctly
Diffstat (limited to 'src')
-rw-r--r-- | src/core/ext/transport/cronet/transport/cronet_transport.c | 52 |
1 files changed, 42 insertions, 10 deletions
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index ce72fc3d08..29dfa885de 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -766,20 +766,50 @@ static bool op_can_be_run(grpc_transport_stream_op_batch *curr_op, bool is_canceled_or_failed = stream_state->state_op_done[OP_CANCEL_ERROR] || stream_state->state_callback_received[OP_FAILED]; if (is_canceled_or_failed) { - if (op_id == OP_SEND_INITIAL_METADATA) result = false; - if (op_id == OP_SEND_MESSAGE) result = false; - if (op_id == OP_SEND_TRAILING_METADATA) result = false; - if (op_id == OP_CANCEL_ERROR) result = false; + if (op_id == OP_SEND_INITIAL_METADATA) { + CRONET_LOG(GPR_DEBUG, "Because"); + result = false; + } + if (op_id == OP_SEND_MESSAGE) { + CRONET_LOG(GPR_DEBUG, "Because"); + result = false; + } + if (op_id == OP_SEND_TRAILING_METADATA) { + CRONET_LOG(GPR_DEBUG, "Because"); + result = false; + } + if (op_id == OP_CANCEL_ERROR) { + CRONET_LOG(GPR_DEBUG, "Because"); + result = false; + } /* already executed */ if (op_id == OP_RECV_INITIAL_METADATA && - stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) + stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) { + CRONET_LOG(GPR_DEBUG, "Because"); result = false; - if (op_id == OP_RECV_MESSAGE && - stream_state->state_op_done[OP_RECV_MESSAGE]) + } + if (op_id == OP_RECV_MESSAGE && op_state->state_op_done[OP_RECV_MESSAGE]) { + CRONET_LOG(GPR_DEBUG, "Because"); result = false; + } if (op_id == OP_RECV_TRAILING_METADATA && - stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) + stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) { + CRONET_LOG(GPR_DEBUG, "Because"); result = false; + } + /* ON_COMPLETE can be processed if one of the following conditions is met: + * 1. the stream failed + * 2. the stream is cancelled, and the callback is received + * 3. the stream succeeded before cancel is effective + * 4. the stream is cancelled, and the stream is never started */ + if (op_id == OP_ON_COMPLETE && + !(stream_state->state_callback_received[OP_FAILED] || + stream_state->state_callback_received[OP_CANCELED] || + stream_state->state_callback_received[OP_SUCCEEDED] || + !stream_state->state_op_done[OP_SEND_INITIAL_METADATA])) { + CRONET_LOG(GPR_DEBUG, "Because"); + result = false; + } } else if (op_id == OP_SEND_INITIAL_METADATA) { /* already executed */ if (stream_state->state_op_done[OP_SEND_INITIAL_METADATA]) result = false; @@ -868,7 +898,7 @@ static bool op_can_be_run(grpc_transport_stream_op_batch *curr_op, CRONET_LOG(GPR_DEBUG, "Because"); result = false; } else if (curr_op->recv_message && - !stream_state->state_op_done[OP_RECV_MESSAGE]) { + !op_state->state_op_done[OP_RECV_MESSAGE]) { CRONET_LOG(GPR_DEBUG, "Because"); result = false; } else if (curr_op->cancel_stream && @@ -1067,6 +1097,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, stream_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; + oas->state.state_op_done[OP_RECV_MESSAGE] = true; result = ACTION_TAKEN_NO_CALLBACK; } else if (stream_state->state_callback_received[OP_FAILED]) { CRONET_LOG(GPR_DEBUG, "Stream failed."); @@ -1074,6 +1105,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, stream_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; + oas->state.state_op_done[OP_RECV_MESSAGE] = true; result = ACTION_TAKEN_NO_CALLBACK; } else if (stream_state->rs.read_stream_closed == true) { /* No more data will be received */ @@ -1214,8 +1246,8 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, } else if (stream_op->cancel_stream && op_can_be_run(stream_op, s, &oas->state, OP_CANCEL_ERROR)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_CANCEL_ERROR", oas); - CRONET_LOG(GPR_DEBUG, "W: bidirectional_stream_cancel(%p)", s->cbs); if (s->cbs) { + CRONET_LOG(GPR_DEBUG, "W: bidirectional_stream_cancel(%p)", s->cbs); bidirectional_stream_cancel(s->cbs); result = ACTION_TAKEN_WITH_CALLBACK; } else { |