aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Muxi Yan <muxi@users.noreply.github.com>2017-06-26 10:12:51 -0700
committerGravatar GitHub <noreply@github.com>2017-06-26 10:12:51 -0700
commit8399ff6a176126f68f4febea08c5866498a52d51 (patch)
treea6028c22f5c91b60abd45a2619828a4ee5b1f0e6
parent9a69478498232b6b42169f8a1a389b51fb4e03ec (diff)
parent366270eee58df0c59dd3c7453e5ffd0d3d9eebc5 (diff)
Merge pull request #11584 from muxi/fix-cronet-lock-abort
Handle cancel correctly
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c52
1 files changed, 42 insertions, 10 deletions
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
index ce72fc3d08..29dfa885de 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.c
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -766,20 +766,50 @@ static bool op_can_be_run(grpc_transport_stream_op_batch *curr_op,
bool is_canceled_or_failed = stream_state->state_op_done[OP_CANCEL_ERROR] ||
stream_state->state_callback_received[OP_FAILED];
if (is_canceled_or_failed) {
- if (op_id == OP_SEND_INITIAL_METADATA) result = false;
- if (op_id == OP_SEND_MESSAGE) result = false;
- if (op_id == OP_SEND_TRAILING_METADATA) result = false;
- if (op_id == OP_CANCEL_ERROR) result = false;
+ if (op_id == OP_SEND_INITIAL_METADATA) {
+ CRONET_LOG(GPR_DEBUG, "Because");
+ result = false;
+ }
+ if (op_id == OP_SEND_MESSAGE) {
+ CRONET_LOG(GPR_DEBUG, "Because");
+ result = false;
+ }
+ if (op_id == OP_SEND_TRAILING_METADATA) {
+ CRONET_LOG(GPR_DEBUG, "Because");
+ result = false;
+ }
+ if (op_id == OP_CANCEL_ERROR) {
+ CRONET_LOG(GPR_DEBUG, "Because");
+ result = false;
+ }
/* already executed */
if (op_id == OP_RECV_INITIAL_METADATA &&
- stream_state->state_op_done[OP_RECV_INITIAL_METADATA])
+ stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
+ CRONET_LOG(GPR_DEBUG, "Because");
result = false;
- if (op_id == OP_RECV_MESSAGE &&
- stream_state->state_op_done[OP_RECV_MESSAGE])
+ }
+ if (op_id == OP_RECV_MESSAGE && op_state->state_op_done[OP_RECV_MESSAGE]) {
+ CRONET_LOG(GPR_DEBUG, "Because");
result = false;
+ }
if (op_id == OP_RECV_TRAILING_METADATA &&
- stream_state->state_op_done[OP_RECV_TRAILING_METADATA])
+ stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
+ CRONET_LOG(GPR_DEBUG, "Because");
result = false;
+ }
+ /* ON_COMPLETE can be processed if one of the following conditions is met:
+ * 1. the stream failed
+ * 2. the stream is cancelled, and the callback is received
+ * 3. the stream succeeded before cancel is effective
+ * 4. the stream is cancelled, and the stream is never started */
+ if (op_id == OP_ON_COMPLETE &&
+ !(stream_state->state_callback_received[OP_FAILED] ||
+ stream_state->state_callback_received[OP_CANCELED] ||
+ stream_state->state_callback_received[OP_SUCCEEDED] ||
+ !stream_state->state_op_done[OP_SEND_INITIAL_METADATA])) {
+ CRONET_LOG(GPR_DEBUG, "Because");
+ result = false;
+ }
} else if (op_id == OP_SEND_INITIAL_METADATA) {
/* already executed */
if (stream_state->state_op_done[OP_SEND_INITIAL_METADATA]) result = false;
@@ -868,7 +898,7 @@ static bool op_can_be_run(grpc_transport_stream_op_batch *curr_op,
CRONET_LOG(GPR_DEBUG, "Because");
result = false;
} else if (curr_op->recv_message &&
- !stream_state->state_op_done[OP_RECV_MESSAGE]) {
+ !op_state->state_op_done[OP_RECV_MESSAGE]) {
CRONET_LOG(GPR_DEBUG, "Because");
result = false;
} else if (curr_op->cancel_stream &&
@@ -1067,6 +1097,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
+ oas->state.state_op_done[OP_RECV_MESSAGE] = true;
result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_state->state_callback_received[OP_FAILED]) {
CRONET_LOG(GPR_DEBUG, "Stream failed.");
@@ -1074,6 +1105,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
+ oas->state.state_op_done[OP_RECV_MESSAGE] = true;
result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_state->rs.read_stream_closed == true) {
/* No more data will be received */
@@ -1214,8 +1246,8 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
} else if (stream_op->cancel_stream &&
op_can_be_run(stream_op, s, &oas->state, OP_CANCEL_ERROR)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_CANCEL_ERROR", oas);
- CRONET_LOG(GPR_DEBUG, "W: bidirectional_stream_cancel(%p)", s->cbs);
if (s->cbs) {
+ CRONET_LOG(GPR_DEBUG, "W: bidirectional_stream_cancel(%p)", s->cbs);
bidirectional_stream_cancel(s->cbs);
result = ACTION_TAKEN_WITH_CALLBACK;
} else {