From 33aa40ab2b493e0dd40e3a7363e2f62d7d125727 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Fri, 9 Sep 2016 14:16:51 -0700 Subject: Add an extra Cronet read after a message read is complete and when trailing metadata is received --- .../transport/cronet/transport/cronet_transport.c | 32 ++++++++++++++++------ 1 file changed, 24 insertions(+), 8 deletions(-) (limited to 'src/core') diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index 25ad40b935..5154fdf3de 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -58,7 +58,7 @@ } while (0) /* TODO (makdharma): Hook up into the wider tracing mechanism */ -int grpc_cronet_trace = 0; +int grpc_cronet_trace = 1; enum e_op_result { ACTION_TAKEN_WITH_CALLBACK, @@ -509,8 +509,20 @@ static void on_response_trailers_received( s->state.rs.trailing_metadata_valid = true; } s->state.state_callback_received[OP_RECV_TRAILING_METADATA] = true; - gpr_mu_unlock(&s->mu); - execute_from_storage(s); + if (!s->state.state_op_done[OP_READ_REQ_MADE]) { + /* Do an extra read to trigger on_succeeded() callback in case connection + is closed */ + s->state.rs.received_bytes = 0; + s->state.rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES; + s->state.rs.length_field_received = false; + CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs); + cronet_bidirectional_stream_read(s->cbs, s->state.rs.read_buffer, + s->state.rs.remaining_bytes); + gpr_mu_unlock(&s->mu); + } else { + gpr_mu_unlock(&s->mu); + execute_from_storage(s); + } } /* @@ -935,11 +947,15 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, GRPC_ERROR_NONE, NULL); stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; - /* Clear read state of the stream, so next read op (if it were to come) - * will work */ - stream_state->rs.received_bytes = stream_state->rs.remaining_bytes = - stream_state->rs.length_field_received = 0; - result = ACTION_TAKEN_NO_CALLBACK; + /* Do an extra read to trigger on_succeeded() callback in case connection + is closed */ + stream_state->rs.received_bytes = 0; + stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES; + stream_state->rs.length_field_received = false; + CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs); + cronet_bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer, + stream_state->rs.remaining_bytes); + result = ACTION_TAKEN_WITH_CALLBACK; } } else if (stream_op->recv_trailing_metadata && op_can_be_run(stream_op, stream_state, &oas->state, -- cgit v1.2.3 From 6e0d5676ea4559fc23b4a0d865b8072502b012ec Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Fri, 9 Sep 2016 17:35:34 -0700 Subject: Minor bug fix --- src/core/ext/transport/cronet/transport/cronet_transport.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/core') diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index 5154fdf3de..444873988b 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -955,7 +955,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs); cronet_bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer, stream_state->rs.remaining_bytes); - result = ACTION_TAKEN_WITH_CALLBACK; + result = ACTION_TAKEN_NO_CALLBACK; } } else if (stream_op->recv_trailing_metadata && op_can_be_run(stream_op, stream_state, &oas->state, -- cgit v1.2.3 From f2f3bb89514e4453b5c26458fac333d58a07e85d Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Thu, 15 Sep 2016 16:00:54 -0700 Subject: Add an extra Cronet read after response header is received --- .../transport/cronet/transport/cronet_transport.c | 30 ++++++++++------------ 1 file changed, 14 insertions(+), 16 deletions(-) (limited to 'src/core') diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index 444873988b..8b074f4ea8 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -432,6 +432,15 @@ static void on_response_headers_received( grpc_mdstr_from_string(headers->headers[i].value))); } s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true; + /* Do an extra read to trigger on_succeeded() callback in case connection + is closed */ + GPR_ASSERT(s->state.rs.length_field_received == false); + s->state.rs.read_buffer = s->state.rs.grpc_header_bytes; + s->state.rs.received_bytes = 0; + s->state.rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES; + CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs); + cronet_bidirectional_stream_read(s->cbs, s->state.rs.read_buffer, + s->state.rs.remaining_bytes); gpr_mu_unlock(&s->mu); execute_from_storage(s); } @@ -509,20 +518,8 @@ static void on_response_trailers_received( s->state.rs.trailing_metadata_valid = true; } s->state.state_callback_received[OP_RECV_TRAILING_METADATA] = true; - if (!s->state.state_op_done[OP_READ_REQ_MADE]) { - /* Do an extra read to trigger on_succeeded() callback in case connection - is closed */ - s->state.rs.received_bytes = 0; - s->state.rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES; - s->state.rs.length_field_received = false; - CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs); - cronet_bidirectional_stream_read(s->cbs, s->state.rs.read_buffer, - s->state.rs.remaining_bytes); - gpr_mu_unlock(&s->mu); - } else { - gpr_mu_unlock(&s->mu); - execute_from_storage(s); - } + gpr_mu_unlock(&s->mu); + execute_from_storage(s); } /* @@ -633,9 +630,9 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op, /* When call is canceled, every op can be run, except under following conditions */ - bool is_canceled_of_failed = stream_state->state_op_done[OP_CANCEL_ERROR] || + bool is_canceled_or_failed = stream_state->state_op_done[OP_CANCEL_ERROR] || stream_state->state_callback_received[OP_FAILED]; - if (is_canceled_of_failed) { + if (is_canceled_or_failed) { if (op_id == OP_SEND_INITIAL_METADATA) result = false; if (op_id == OP_SEND_MESSAGE) result = false; if (op_id == OP_SEND_TRAILING_METADATA) result = false; @@ -949,6 +946,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, oas->state.state_op_done[OP_RECV_MESSAGE] = true; /* Do an extra read to trigger on_succeeded() callback in case connection is closed */ + stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes; stream_state->rs.received_bytes = 0; stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES; stream_state->rs.length_field_received = false; -- cgit v1.2.3 From 3ec218ba2620fdfd8e8f5c12c84edad9c909e8fc Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Tue, 20 Sep 2016 18:36:57 -0700 Subject: Fix a bug that causes cronet read when cancelled or failed --- .../transport/cronet/transport/cronet_transport.c | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) (limited to 'src/core') diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index 8b074f4ea8..eefe1a889c 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -432,15 +432,18 @@ static void on_response_headers_received( grpc_mdstr_from_string(headers->headers[i].value))); } s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true; - /* Do an extra read to trigger on_succeeded() callback in case connection - is closed */ - GPR_ASSERT(s->state.rs.length_field_received == false); - s->state.rs.read_buffer = s->state.rs.grpc_header_bytes; - s->state.rs.received_bytes = 0; - s->state.rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES; - CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs); - cronet_bidirectional_stream_read(s->cbs, s->state.rs.read_buffer, - s->state.rs.remaining_bytes); + if (!(s->state.state_op_done[OP_CANCEL_ERROR] || + s->state.state_callback_received[OP_FAILED])) { + /* Do an extra read to trigger on_succeeded() callback in case connection + is closed */ + GPR_ASSERT(s->state.rs.length_field_received == false); + s->state.rs.read_buffer = s->state.rs.grpc_header_bytes; + s->state.rs.received_bytes = 0; + s->state.rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES; + CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs); + cronet_bidirectional_stream_read(s->cbs, s->state.rs.read_buffer, + s->state.rs.remaining_bytes); + } gpr_mu_unlock(&s->mu); execute_from_storage(s); } -- cgit v1.2.3 From 106af4995f936944c581ce8026913fe452ea830a Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Thu, 22 Sep 2016 16:21:31 -0700 Subject: Fix a bug that causes cronet stuck when server initiate termination of stream --- src/core/ext/transport/cronet/transport/cronet_transport.c | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) (limited to 'src/core') diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index eefe1a889c..7457b30d25 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -521,8 +521,18 @@ static void on_response_trailers_received( s->state.rs.trailing_metadata_valid = true; } s->state.state_callback_received[OP_RECV_TRAILING_METADATA] = true; - gpr_mu_unlock(&s->mu); - execute_from_storage(s); + /* Send a EOS when server terminates the stream to trigger on_succeeded */ + if (!s->state.state_op_done[OP_SEND_TRAILING_METADATA]) { + CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, 0)", s->cbs); + s->state.state_callback_received[OP_SEND_MESSAGE] = false; + cronet_bidirectional_stream_write(s->cbs, "", 0, true); + s->state.state_op_done[OP_SEND_TRAILING_METADATA] = true; + + gpr_mu_unlock(&s->mu); + } else { + gpr_mu_unlock(&s->mu); + execute_from_storage(s); + } } /* -- cgit v1.2.3 From 7faef2188d46a8db60cafc5751712227867e826e Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Fri, 23 Sep 2016 16:31:25 -0700 Subject: Add cronet read buffer flush when error grpc-status is received --- .../transport/cronet/transport/cronet_transport.c | 26 +++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) (limited to 'src/core') diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index 7457b30d25..5e202d0ee2 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -148,6 +148,8 @@ struct write_state { struct op_state { bool state_op_done[OP_NUM_OPS]; bool state_callback_received[OP_NUM_OPS]; + bool fail_state; + bool flush_read; /* data structure for storing data coming from server */ struct read_state rs; /* data structure for storing data going to the server */ @@ -475,7 +477,11 @@ static void on_read_completed(cronet_bidirectional_stream *stream, char *data, count); gpr_mu_lock(&s->mu); s->state.state_callback_received[OP_RECV_MESSAGE] = true; - if (count > 0) { + if (count > 0 && s->state.flush_read) { + CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs); + cronet_bidirectional_stream_read(s->cbs, s->state.rs.read_buffer, 4096); + gpr_mu_unlock(&s->mu); + } else if (count > 0) { s->state.rs.received_bytes += count; s->state.rs.remaining_bytes -= count; if (s->state.rs.remaining_bytes > 0) { @@ -490,6 +496,10 @@ static void on_read_completed(cronet_bidirectional_stream *stream, char *data, execute_from_storage(s); } } else { + if (s->state.flush_read) { + gpr_free(s->state.rs.read_buffer); + s->state.rs.read_buffer = NULL; + } s->state.rs.read_stream_closed = true; gpr_mu_unlock(&s->mu); execute_from_storage(s); @@ -519,6 +529,10 @@ static void on_response_trailers_received( grpc_mdstr_from_string(trailers->headers[i].key), grpc_mdstr_from_string(trailers->headers[i].value))); s->state.rs.trailing_metadata_valid = true; + if (0 == strcmp(trailers->headers[i].key, "grpc-status") && + 0 != strcmp(trailers->headers[i].value, "0")) { + s->state.fail_state = true; + } } s->state.state_callback_received[OP_RECV_TRAILING_METADATA] = true; /* Send a EOS when server terminates the stream to trigger on_succeeded */ @@ -790,6 +804,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, OP_SEND_INITIAL_METADATA)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_INITIAL_METADATA", oas); /* This OP is the beginning. Reset various states */ + stream_state->fail_state = stream_state->flush_read = false; memset(&s->header_array, 0, sizeof(s->header_array)); memset(&stream_state->rs, 0, sizeof(stream_state->rs)); memset(&stream_state->ws, 0, sizeof(stream_state->ws)); @@ -1026,6 +1041,15 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, make a note */ if (stream_op->recv_message) stream_state->state_op_done[OP_RECV_MESSAGE_AND_ON_COMPLETE] = true; + } else if (stream_state->fail_state && !stream_state->flush_read) { + CRONET_LOG(GPR_DEBUG, "running: %p flush read", oas); + if (stream_state->rs.read_buffer && + stream_state->rs.read_buffer != stream_state->rs.grpc_header_bytes) { + gpr_free(stream_state->rs.read_buffer); + stream_state->rs.read_buffer = NULL; + } + stream_state->rs.read_buffer = gpr_malloc(4096); + stream_state->flush_read = true; } else { result = NO_ACTION_POSSIBLE; } -- cgit v1.2.3 From 638d03b197c562291024b5f8a1ed9ad08db1b922 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Fri, 21 Oct 2016 18:07:48 -0700 Subject: Minor bug fix --- src/core/ext/transport/cronet/transport/cronet_transport.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'src/core') diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index 5e202d0ee2..71481c0539 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -535,8 +535,9 @@ static void on_response_trailers_received( } } s->state.state_callback_received[OP_RECV_TRAILING_METADATA] = true; - /* Send a EOS when server terminates the stream to trigger on_succeeded */ - if (!s->state.state_op_done[OP_SEND_TRAILING_METADATA]) { + /* Send a EOS when server terminates the stream (testServerFinishesRequest) to trigger on_succeeded */ + if (!s->state.state_op_done[OP_SEND_TRAILING_METADATA] && + !(s->state.state_op_done[OP_CANCEL_ERROR] || s->state.state_callback_received[OP_FAILED])) { CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, 0)", s->cbs); s->state.state_callback_received[OP_SEND_MESSAGE] = false; cronet_bidirectional_stream_write(s->cbs, "", 0, true); -- cgit v1.2.3 From c08f53ab353912ea41c9e342cc597d7604c99816 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Mon, 24 Oct 2016 10:25:15 -0700 Subject: Turn off cronet_transport logs and clang-format --- src/core/ext/transport/cronet/transport/cronet_transport.c | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) (limited to 'src/core') diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index 71481c0539..5d012921ee 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -58,7 +58,7 @@ } while (0) /* TODO (makdharma): Hook up into the wider tracing mechanism */ -int grpc_cronet_trace = 1; +int grpc_cronet_trace = 0; enum e_op_result { ACTION_TAKEN_WITH_CALLBACK, @@ -535,9 +535,11 @@ static void on_response_trailers_received( } } s->state.state_callback_received[OP_RECV_TRAILING_METADATA] = true; - /* Send a EOS when server terminates the stream (testServerFinishesRequest) to trigger on_succeeded */ + /* Send a EOS when server terminates the stream (testServerFinishesRequest) to + * trigger on_succeeded */ if (!s->state.state_op_done[OP_SEND_TRAILING_METADATA] && - !(s->state.state_op_done[OP_CANCEL_ERROR] || s->state.state_callback_received[OP_FAILED])) { + !(s->state.state_op_done[OP_CANCEL_ERROR] || + s->state.state_callback_received[OP_FAILED])) { CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, 0)", s->cbs); s->state.state_callback_received[OP_SEND_MESSAGE] = false; cronet_bidirectional_stream_write(s->cbs, "", 0, true); -- cgit v1.2.3 From ea0d61f80666b92cf4dc5fa3b2fc6fc460e48f9e Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Mon, 28 Nov 2016 15:07:28 -0800 Subject: Return correct status on cancel --- .../transport/cronet/transport/cronet_transport.c | 52 ++++++++++++++-------- 1 file changed, 34 insertions(+), 18 deletions(-) (limited to 'src/core') diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index 961e0a5acc..4ac2ebf04e 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -151,6 +151,7 @@ struct op_state { bool state_callback_received[OP_NUM_OPS]; bool fail_state; bool flush_read; + grpc_error *cancel_error; /* data structure for storing data coming from server */ struct read_state rs; /* data structure for storing data going to the server */ @@ -250,6 +251,12 @@ static void free_read_buffer(stream_obj *s) { } } +static grpc_error* make_error_with_desc(int error_code, const char *desc) { + grpc_error *error = GRPC_ERROR_CREATE(desc); + error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, error_code); + return error; +} + /* Add a new stream op to op storage. */ @@ -817,17 +824,10 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, op_can_be_run(stream_op, stream_state, &oas->state, OP_SEND_INITIAL_METADATA)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_INITIAL_METADATA", oas); - /* This OP is the beginning. Reset various states */ - stream_state->fail_state = stream_state->flush_read = false; - memset(&s->header_array, 0, sizeof(s->header_array)); - memset(&stream_state->rs, 0, sizeof(stream_state->rs)); - memset(&stream_state->ws, 0, sizeof(stream_state->ws)); - memset(stream_state->state_op_done, 0, sizeof(stream_state->state_op_done)); - memset(stream_state->state_callback_received, 0, - sizeof(stream_state->state_callback_received)); /* Start new cronet stream. It is destroyed in on_succeeded, on_canceled, * on_failed */ GPR_ASSERT(s->cbs == NULL); + GPR_ASSERT(!stream_state->state_op_done[OP_SEND_INITIAL_METADATA]); s->cbs = cronet_bidirectional_stream_create(s->curr_ct.engine, s->curr_gs, &cronet_callbacks); CRONET_LOG(GPR_DEBUG, "%p = cronet_bidirectional_stream_create()", s->cbs); @@ -848,10 +848,11 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, op_can_be_run(stream_op, stream_state, &oas->state, OP_RECV_INITIAL_METADATA)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas); - if (stream_state->state_op_done[OP_CANCEL_ERROR] || - stream_state->state_callback_received[OP_FAILED]) { + if (stream_state->state_op_done[OP_CANCEL_ERROR]) { grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready, GRPC_ERROR_CANCELLED, NULL); + } else if (stream_state->state_callback_received[OP_FAILED]) { + grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready, make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."), NULL); } else { grpc_chttp2_incoming_metadata_buffer_publish( &oas->s->state.rs.initial_metadata, stream_op->recv_initial_metadata); @@ -905,12 +906,16 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, op_can_be_run(stream_op, stream_state, &oas->state, OP_RECV_MESSAGE)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas); - if (stream_state->state_op_done[OP_CANCEL_ERROR] || - stream_state->state_callback_received[OP_FAILED]) { - CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed."); + if (stream_state->state_op_done[OP_CANCEL_ERROR]) { + CRONET_LOG(GPR_DEBUG, "Stream is cancelled."); grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready, GRPC_ERROR_CANCELLED, NULL); stream_state->state_op_done[OP_RECV_MESSAGE] = true; + } else if (stream_state->state_callback_received[OP_FAILED]) { + CRONET_LOG(GPR_DEBUG, "Stream failed."); + grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready, + make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."), NULL); + stream_state->state_op_done[OP_RECV_MESSAGE] = true; } else if (stream_state->rs.read_stream_closed == true) { /* No more data will be received */ CRONET_LOG(GPR_DEBUG, "read stream closed"); @@ -1031,17 +1036,23 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, CRONET_LOG(GPR_DEBUG, "W: cronet_bidirectional_stream_cancel(%p)", s->cbs); if (s->cbs) { cronet_bidirectional_stream_cancel(s->cbs); + result = ACTION_TAKEN_WITH_CALLBACK; + } else { + result = ACTION_TAKEN_NO_CALLBACK; } stream_state->state_op_done[OP_CANCEL_ERROR] = true; - result = ACTION_TAKEN_WITH_CALLBACK; + if (!stream_state->cancel_error) { + stream_state->cancel_error = GRPC_ERROR_REF(stream_op->cancel_error); + } } else if (stream_op->on_complete && op_can_be_run(stream_op, stream_state, &oas->state, OP_ON_COMPLETE)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_ON_COMPLETE", oas); - if (stream_state->state_op_done[OP_CANCEL_ERROR] || - stream_state->state_callback_received[OP_FAILED]) { + if (stream_state->state_op_done[OP_CANCEL_ERROR]) { grpc_exec_ctx_sched(exec_ctx, stream_op->on_complete, - GRPC_ERROR_CANCELLED, NULL); + GRPC_ERROR_REF(stream_state->cancel_error), NULL); + } else if (stream_state->state_callback_received[OP_FAILED]) { + grpc_exec_ctx_sched(exec_ctx, stream_op->on_complete, make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."), NULL); } else { /* All actions in this stream_op are complete. Call the on_complete * callback @@ -1096,6 +1107,8 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, memset(s->state.state_op_done, 0, sizeof(s->state.state_op_done)); memset(s->state.state_callback_received, 0, sizeof(s->state.state_callback_received)); + s->state.fail_state = s->state.flush_read = false; + s->state.cancel_error = NULL; gpr_mu_init(&s->mu); return 0; } @@ -1142,7 +1155,10 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, } static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, void *and_free_memory) {} + grpc_stream *gs, void *and_free_memory) { + stream_obj *s = (stream_obj *)gs; + GRPC_ERROR_UNREF(s->state.cancel_error); +} static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {} -- cgit v1.2.3 From a6b88dfcd642784d6566b342d086f862c717d138 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Mon, 28 Nov 2016 16:19:28 -0800 Subject: Change behavior of RECV_MESSAGE --- src/core/ext/transport/cronet/transport/cronet_transport.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'src/core') diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index 4ac2ebf04e..69e2e27c08 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -59,7 +59,7 @@ } while (0) /* TODO (makdharma): Hook up into the wider tracing mechanism */ -int grpc_cronet_trace = 0; +int grpc_cronet_trace = 1; enum e_op_result { ACTION_TAKEN_WITH_CALLBACK, @@ -911,11 +911,13 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready, GRPC_ERROR_CANCELLED, NULL); stream_state->state_op_done[OP_RECV_MESSAGE] = true; + result = ACTION_TAKEN_NO_CALLBACK; } else if (stream_state->state_callback_received[OP_FAILED]) { CRONET_LOG(GPR_DEBUG, "Stream failed."); grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready, make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."), NULL); stream_state->state_op_done[OP_RECV_MESSAGE] = true; + result = ACTION_TAKEN_NO_CALLBACK; } else if (stream_state->rs.read_stream_closed == true) { /* No more data will be received */ CRONET_LOG(GPR_DEBUG, "read stream closed"); @@ -923,6 +925,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, GRPC_ERROR_NONE, NULL); stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; + result = ACTION_TAKEN_NO_CALLBACK; } else if (stream_state->rs.length_field_received == false) { if (stream_state->rs.received_bytes == GRPC_HEADER_SIZE_IN_BYTES && stream_state->rs.remaining_bytes == 0) { -- cgit v1.2.3 From 72e409655ece13b84f8b110a7bcb904187737254 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Tue, 29 Nov 2016 10:07:43 -0800 Subject: clang-format --- .../ext/transport/cronet/transport/cronet_transport.c | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) (limited to 'src/core') diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index 69e2e27c08..c3abb47735 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -251,7 +251,7 @@ static void free_read_buffer(stream_obj *s) { } } -static grpc_error* make_error_with_desc(int error_code, const char *desc) { +static grpc_error *make_error_with_desc(int error_code, const char *desc) { grpc_error *error = GRPC_ERROR_CREATE(desc); error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, error_code); return error; @@ -852,7 +852,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready, GRPC_ERROR_CANCELLED, NULL); } else if (stream_state->state_callback_received[OP_FAILED]) { - grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready, make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."), NULL); + grpc_exec_ctx_sched( + exec_ctx, stream_op->recv_initial_metadata_ready, + make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."), NULL); } else { grpc_chttp2_incoming_metadata_buffer_publish( &oas->s->state.rs.initial_metadata, stream_op->recv_initial_metadata); @@ -914,8 +916,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, result = ACTION_TAKEN_NO_CALLBACK; } else if (stream_state->state_callback_received[OP_FAILED]) { CRONET_LOG(GPR_DEBUG, "Stream failed."); - grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready, - make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."), NULL); + grpc_exec_ctx_sched( + exec_ctx, stream_op->recv_message_ready, + make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."), NULL); stream_state->state_op_done[OP_RECV_MESSAGE] = true; result = ACTION_TAKEN_NO_CALLBACK; } else if (stream_state->rs.read_stream_closed == true) { @@ -1055,7 +1058,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, grpc_exec_ctx_sched(exec_ctx, stream_op->on_complete, GRPC_ERROR_REF(stream_state->cancel_error), NULL); } else if (stream_state->state_callback_received[OP_FAILED]) { - grpc_exec_ctx_sched(exec_ctx, stream_op->on_complete, make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."), NULL); + grpc_exec_ctx_sched( + exec_ctx, stream_op->on_complete, + make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."), NULL); } else { /* All actions in this stream_op are complete. Call the on_complete * callback -- cgit v1.2.3 From 99080d1488ab1d286d2019a15b3f18f0a6e630fb Mon Sep 17 00:00:00 2001 From: Robbie Shade Date: Tue, 29 Nov 2016 14:59:57 -0500 Subject: Fix TSAN failure when running DEBUG mode. --- src/core/lib/surface/completion_queue.c | 2 ++ 1 file changed, 2 insertions(+) (limited to 'src/core') diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index 4e0feb56ac..184c1a1a16 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -354,11 +354,13 @@ static void dump_pending_tags(grpc_completion_queue *cc) { gpr_strvec v; gpr_strvec_init(&v); gpr_strvec_add(&v, gpr_strdup("PENDING TAGS:")); + gpr_mu_lock(cc->mu); for (size_t i = 0; i < cc->outstanding_tag_count; i++) { char *s; gpr_asprintf(&s, " %p", cc->outstanding_tags[i]); gpr_strvec_add(&v, s); } + gpr_mu_unlock(cc->mu); char *out = gpr_strvec_flatten(&v, NULL); gpr_strvec_destroy(&v); gpr_log(GPR_DEBUG, "%s", out); -- cgit v1.2.3 From db25f083c2ec0e2912aecaed4ae2826865305320 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 2 Dec 2016 12:45:24 -0800 Subject: Make TCP error messages more descriptive --- src/core/lib/iomgr/tcp_posix.c | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) (limited to 'src/core') diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index 12a4797e6f..fd80779519 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -107,6 +107,12 @@ typedef struct { grpc_resource_user_slice_allocator slice_allocator; } grpc_tcp; +static grpc_error *tcp_annotate_error(grpc_error *src_error, grpc_tcp *tcp) { + return grpc_error_set_str( + grpc_error_set_int(src_error, GRPC_ERROR_INT_FD, tcp->fd), + GRPC_ERROR_STR_TARGET_ADDRESS, tcp->peer_string); +} + static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, grpc_error *error); static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, @@ -230,13 +236,15 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure); } else { grpc_slice_buffer_reset_and_unref(tcp->incoming_buffer); - call_read_cb(exec_ctx, tcp, GRPC_OS_ERROR(errno, "recvmsg")); + call_read_cb(exec_ctx, tcp, + tcp_annotate_error(GRPC_OS_ERROR(errno, "recvmsg"), tcp)); TCP_UNREF(exec_ctx, tcp, "read"); } } else if (read_bytes == 0) { /* 0 read size ==> end of stream */ grpc_slice_buffer_reset_and_unref(tcp->incoming_buffer); - call_read_cb(exec_ctx, tcp, GRPC_ERROR_CREATE("Socket closed")); + call_read_cb(exec_ctx, tcp, + tcp_annotate_error(GRPC_ERROR_CREATE("Socket closed"), tcp)); TCP_UNREF(exec_ctx, tcp, "read"); } else { GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length); @@ -366,7 +374,7 @@ static bool tcp_flush(grpc_tcp *tcp, grpc_error **error) { tcp->outgoing_byte_idx = unwind_byte_idx; return false; } else { - *error = GRPC_OS_ERROR(errno, "sendmsg"); + *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp); return true; } } @@ -447,9 +455,10 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, if (buf->length == 0) { GPR_TIMER_END("tcp_write", 0); - grpc_exec_ctx_sched(exec_ctx, cb, grpc_fd_is_shutdown(tcp->em_fd) - ? GRPC_ERROR_CREATE("EOF") - : GRPC_ERROR_NONE, + grpc_exec_ctx_sched(exec_ctx, cb, + grpc_fd_is_shutdown(tcp->em_fd) + ? tcp_annotate_error(GRPC_ERROR_CREATE("EOF"), tcp) + : GRPC_ERROR_NONE, NULL); return; } -- cgit v1.2.3 From 58d68f0d49fb2eaf03f19f6775f4f19a611899bc Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Wed, 7 Dec 2016 11:20:57 -0800 Subject: Turn off Cronet logging --- src/core/ext/transport/cronet/transport/cronet_transport.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/core') diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index c3abb47735..afc59f4b12 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -59,7 +59,7 @@ } while (0) /* TODO (makdharma): Hook up into the wider tracing mechanism */ -int grpc_cronet_trace = 1; +int grpc_cronet_trace = 0; enum e_op_result { ACTION_TAKEN_WITH_CALLBACK, -- cgit v1.2.3 From 3da4188176eb76a1a786249ac840b0a83242e397 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Tue, 6 Dec 2016 15:40:19 -0800 Subject: Fix handling of streams waiting for concurrency --- src/core/ext/transport/chttp2/transport/chttp2_transport.c | 1 + src/core/ext/transport/chttp2/transport/internal.h | 2 ++ src/core/ext/transport/chttp2/transport/stream_lists.c | 5 +++++ 3 files changed, 8 insertions(+) (limited to 'src/core') diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 3b84898fee..72711d69dc 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -1623,6 +1623,7 @@ void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx, remove_stream(exec_ctx, t, s->id, removal_error(GRPC_ERROR_REF(error), s, "Stream removed")); } + grpc_chttp2_list_remove_waiting_for_concurrency(t, s); GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2"); } GRPC_ERROR_UNREF(error); diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 31eb1e01ac..b727965d43 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -496,6 +496,8 @@ void grpc_chttp2_list_add_waiting_for_concurrency(grpc_chttp2_transport *t, grpc_chttp2_stream *s); int grpc_chttp2_list_pop_waiting_for_concurrency(grpc_chttp2_transport *t, grpc_chttp2_stream **s); +void grpc_chttp2_list_remove_waiting_for_concurrency(grpc_chttp2_transport *t, + grpc_chttp2_stream *s); void grpc_chttp2_list_add_stalled_by_transport(grpc_chttp2_transport *t, grpc_chttp2_stream *s); diff --git a/src/core/ext/transport/chttp2/transport/stream_lists.c b/src/core/ext/transport/chttp2/transport/stream_lists.c index 6d25b3ae57..a60264cc51 100644 --- a/src/core/ext/transport/chttp2/transport/stream_lists.c +++ b/src/core/ext/transport/chttp2/transport/stream_lists.c @@ -158,6 +158,11 @@ int grpc_chttp2_list_pop_waiting_for_concurrency(grpc_chttp2_transport *t, return stream_list_pop(t, s, GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY); } +void grpc_chttp2_list_remove_waiting_for_concurrency(grpc_chttp2_transport *t, + grpc_chttp2_stream *s) { + stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY); +} + void grpc_chttp2_list_add_stalled_by_transport(grpc_chttp2_transport *t, grpc_chttp2_stream *s) { stream_list_add(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT); -- cgit v1.2.3 From ec7ba3931741f349b59f531d35677891a0a24baf Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Wed, 7 Dec 2016 10:50:30 -0800 Subject: PR comments --- src/core/ext/transport/chttp2/transport/chttp2_transport.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'src/core') diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 72711d69dc..6bc054866b 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -1622,8 +1622,10 @@ void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx, if (s->id != 0) { remove_stream(exec_ctx, t, s->id, removal_error(GRPC_ERROR_REF(error), s, "Stream removed")); + } else { + /* Purge streams waiting on concurrency still waiting for id assignment */ + grpc_chttp2_list_remove_waiting_for_concurrency(t, s); } - grpc_chttp2_list_remove_waiting_for_concurrency(t, s); GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2"); } GRPC_ERROR_UNREF(error); -- cgit v1.2.3