aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/transport
diff options
context:
space:
mode:
authorGravatar Muxi Yan <mxyan@google.com>2016-11-28 15:07:28 -0800
committerGravatar Muxi Yan <mxyan@google.com>2016-11-28 16:16:30 -0800
commitea0d61f80666b92cf4dc5fa3b2fc6fc460e48f9e (patch)
treed206b8021be31ebf0ec365217e5f1de2d8036725 /src/core/ext/transport
parent22ea6b8b72ed81af85f01c2914f6e3431048e425 (diff)
Return correct status on cancel
Diffstat (limited to 'src/core/ext/transport')
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c52
1 files changed, 34 insertions, 18 deletions
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
index 961e0a5acc..4ac2ebf04e 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.c
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -151,6 +151,7 @@ struct op_state {
bool state_callback_received[OP_NUM_OPS];
bool fail_state;
bool flush_read;
+ grpc_error *cancel_error;
/* data structure for storing data coming from server */
struct read_state rs;
/* data structure for storing data going to the server */
@@ -250,6 +251,12 @@ static void free_read_buffer(stream_obj *s) {
}
}
+static grpc_error* make_error_with_desc(int error_code, const char *desc) {
+ grpc_error *error = GRPC_ERROR_CREATE(desc);
+ error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, error_code);
+ return error;
+}
+
/*
Add a new stream op to op storage.
*/
@@ -817,17 +824,10 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
op_can_be_run(stream_op, stream_state, &oas->state,
OP_SEND_INITIAL_METADATA)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_INITIAL_METADATA", oas);
- /* This OP is the beginning. Reset various states */
- stream_state->fail_state = stream_state->flush_read = false;
- memset(&s->header_array, 0, sizeof(s->header_array));
- memset(&stream_state->rs, 0, sizeof(stream_state->rs));
- memset(&stream_state->ws, 0, sizeof(stream_state->ws));
- memset(stream_state->state_op_done, 0, sizeof(stream_state->state_op_done));
- memset(stream_state->state_callback_received, 0,
- sizeof(stream_state->state_callback_received));
/* Start new cronet stream. It is destroyed in on_succeeded, on_canceled,
* on_failed */
GPR_ASSERT(s->cbs == NULL);
+ GPR_ASSERT(!stream_state->state_op_done[OP_SEND_INITIAL_METADATA]);
s->cbs = cronet_bidirectional_stream_create(s->curr_ct.engine, s->curr_gs,
&cronet_callbacks);
CRONET_LOG(GPR_DEBUG, "%p = cronet_bidirectional_stream_create()", s->cbs);
@@ -848,10 +848,11 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
op_can_be_run(stream_op, stream_state, &oas->state,
OP_RECV_INITIAL_METADATA)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas);
- if (stream_state->state_op_done[OP_CANCEL_ERROR] ||
- stream_state->state_callback_received[OP_FAILED]) {
+ if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready,
GRPC_ERROR_CANCELLED, NULL);
+ } else if (stream_state->state_callback_received[OP_FAILED]) {
+ grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready, make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."), NULL);
} else {
grpc_chttp2_incoming_metadata_buffer_publish(
&oas->s->state.rs.initial_metadata, stream_op->recv_initial_metadata);
@@ -905,12 +906,16 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
op_can_be_run(stream_op, stream_state, &oas->state,
OP_RECV_MESSAGE)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas);
- if (stream_state->state_op_done[OP_CANCEL_ERROR] ||
- stream_state->state_callback_received[OP_FAILED]) {
- CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed.");
+ if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
+ CRONET_LOG(GPR_DEBUG, "Stream is cancelled.");
grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready,
GRPC_ERROR_CANCELLED, NULL);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
+ } else if (stream_state->state_callback_received[OP_FAILED]) {
+ CRONET_LOG(GPR_DEBUG, "Stream failed.");
+ grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready,
+ make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."), NULL);
+ stream_state->state_op_done[OP_RECV_MESSAGE] = true;
} else if (stream_state->rs.read_stream_closed == true) {
/* No more data will be received */
CRONET_LOG(GPR_DEBUG, "read stream closed");
@@ -1031,17 +1036,23 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
CRONET_LOG(GPR_DEBUG, "W: cronet_bidirectional_stream_cancel(%p)", s->cbs);
if (s->cbs) {
cronet_bidirectional_stream_cancel(s->cbs);
+ result = ACTION_TAKEN_WITH_CALLBACK;
+ } else {
+ result = ACTION_TAKEN_NO_CALLBACK;
}
stream_state->state_op_done[OP_CANCEL_ERROR] = true;
- result = ACTION_TAKEN_WITH_CALLBACK;
+ if (!stream_state->cancel_error) {
+ stream_state->cancel_error = GRPC_ERROR_REF(stream_op->cancel_error);
+ }
} else if (stream_op->on_complete &&
op_can_be_run(stream_op, stream_state, &oas->state,
OP_ON_COMPLETE)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_ON_COMPLETE", oas);
- if (stream_state->state_op_done[OP_CANCEL_ERROR] ||
- stream_state->state_callback_received[OP_FAILED]) {
+ if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
grpc_exec_ctx_sched(exec_ctx, stream_op->on_complete,
- GRPC_ERROR_CANCELLED, NULL);
+ GRPC_ERROR_REF(stream_state->cancel_error), NULL);
+ } else if (stream_state->state_callback_received[OP_FAILED]) {
+ grpc_exec_ctx_sched(exec_ctx, stream_op->on_complete, make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."), NULL);
} else {
/* All actions in this stream_op are complete. Call the on_complete
* callback
@@ -1096,6 +1107,8 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
memset(s->state.state_op_done, 0, sizeof(s->state.state_op_done));
memset(s->state.state_callback_received, 0,
sizeof(s->state.state_callback_received));
+ s->state.fail_state = s->state.flush_read = false;
+ s->state.cancel_error = NULL;
gpr_mu_init(&s->mu);
return 0;
}
@@ -1142,7 +1155,10 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
}
static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
- grpc_stream *gs, void *and_free_memory) {}
+ grpc_stream *gs, void *and_free_memory) {
+ stream_obj *s = (stream_obj *)gs;
+ GRPC_ERROR_UNREF(s->state.cancel_error);
+}
static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {}