diff options
-rw-r--r-- | src/core/ext/transport/chttp2/transport/chttp2_transport.c | 98 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/transport/frame_data.c | 29 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/transport/hpack_parser.c | 3 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/transport/internal.h | 11 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/transport/stream_map.c | 2 | ||||
-rw-r--r-- | test/cpp/end2end/thread_stress_test.cc | 5 | ||||
-rw-r--r-- | test/cpp/qps/client_sync.cc | 3 |
7 files changed, 100 insertions, 51 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index ec9de47725..5484d9a1ec 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -447,7 +447,6 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; *t->accepting_stream = s; grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); - s->in_stream_map = true; } GPR_TIMER_END("init_stream", 0); @@ -464,7 +463,6 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, GPR_TIMER_BEGIN("destroy_stream", 0); GPR_ASSERT((s->write_closed && s->read_closed) || s->id == 0); - GPR_ASSERT(!s->in_stream_map); if (s->id != 0) { GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, s->id) == NULL); } @@ -540,16 +538,23 @@ grpc_chttp2_stream *grpc_chttp2_parsing_accept_stream(grpc_exec_ctx *exec_ctx, static const char *write_state_name(grpc_chttp2_write_state st) { switch (st) { - case GRPC_CHTTP2_WRITE_STATE_IDLE: return "IDLE"; - case GRPC_CHTTP2_WRITE_STATE_WRITING: return "WRITING"; - case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_TO_COME: return "WRITING+MORE"; + case GRPC_CHTTP2_WRITE_STATE_IDLE: + return "IDLE"; + case GRPC_CHTTP2_WRITE_STATE_WRITING: + return "WRITING"; + case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_TO_COME: + return "WRITING+MORE"; } GPR_UNREACHABLE_CODE(return "UNKNOWN"); } -static void set_write_state(grpc_chttp2_transport *t, grpc_chttp2_write_state st) { - GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_DEBUG, "W:%p %s state %s -> %s", t, t->is_client ? "CLIENT" : "SERVER", write_state_name(t->write_state), write_state_name(st))); - t->write_state = st; +static void set_write_state(grpc_chttp2_transport *t, + grpc_chttp2_write_state st) { + GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_DEBUG, "W:%p %s state %s -> %s", t, + t->is_client ? "CLIENT" : "SERVER", + write_state_name(t->write_state), + write_state_name(st))); + t->write_state = st; } void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, @@ -718,7 +723,6 @@ static void maybe_start_some_streams(grpc_exec_ctx *exec_ctx, [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; s->max_recv_bytes = GPR_MAX(stream_incoming_window, s->max_recv_bytes); grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); - s->in_stream_map = true; grpc_chttp2_become_writable(exec_ctx, t, s, true, "new_stream"); } /* cancel out streams that will never be started */ @@ -740,7 +744,8 @@ static grpc_closure *add_closure_barrier(grpc_closure *closure) { return closure; } -static void run_closure_and_null(grpc_exec_ctx *exec_ctx, grpc_closure **closure, grpc_error *error) { +static void null_then_run_closure(grpc_exec_ctx *exec_ctx, + grpc_closure **closure, grpc_error *error) { grpc_closure *c = *closure; *closure = NULL; grpc_closure_run(exec_ctx, c, error); @@ -805,7 +810,8 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx, ssize_t notify_offset = s->fetching_slice_end_offset; if (notify_offset <= 0) { grpc_chttp2_complete_closure_step( - exec_ctx, t, s, &s->fetching_send_message_finished, GRPC_ERROR_NONE, "fetching_send_message_finished"); + exec_ctx, t, s, &s->fetching_send_message_finished, GRPC_ERROR_NONE, + "fetching_send_message_finished"); } else { grpc_chttp2_write_cb *cb = t->write_cb_pool; if (cb == NULL) { @@ -947,7 +953,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, grpc_chttp2_complete_closure_step( exec_ctx, t, s, &s->send_initial_metadata_finished, GRPC_ERROR_CREATE( - "Attempt to send initial metadata after stream was closed"), "send_initial_metadata_finished"); + "Attempt to send initial metadata after stream was closed"), + "send_initial_metadata_finished"); } } } @@ -957,7 +964,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, if (s->write_closed) { grpc_chttp2_complete_closure_step( exec_ctx, t, s, &s->fetching_send_message_finished, - GRPC_ERROR_CREATE("Attempt to send message after stream was closed"), "fetching_send_message_finished"); + GRPC_ERROR_CREATE("Attempt to send message after stream was closed"), + "fetching_send_message_finished"); } else { GPR_ASSERT(s->fetching_send_message == NULL); uint8_t *frame_hdr = @@ -1015,7 +1023,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, grpc_metadata_batch_is_empty(op->send_trailing_metadata) ? GRPC_ERROR_NONE : GRPC_ERROR_CREATE("Attempt to send trailing metadata after " - "stream was closed"), "send_trailing_metadata_finished"); + "stream was closed"), + "send_trailing_metadata_finished"); } else if (s->id != 0) { /* TODO(ctiller): check if there's flow control for any outstanding bytes before going writable */ @@ -1193,16 +1202,19 @@ void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s) { grpc_byte_stream *bs; - if (s->recv_initial_metadata_ready != NULL && s->published_metadata[0]) { + if (s->recv_initial_metadata_ready != NULL && + s->published_metadata[0] != GRPC_METADATA_NOT_PUBLISHED) { if (s->seen_error) { while ((bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames)) != NULL) { + gpr_log(GPR_DEBUG, "discard %p", bs); incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE); } } grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[0], s->recv_initial_metadata); - run_closure_and_null(exec_ctx, &s->recv_initial_metadata_ready, GRPC_ERROR_NONE); + null_then_run_closure(exec_ctx, &s->recv_initial_metadata_ready, + GRPC_ERROR_NONE); } } @@ -1214,16 +1226,22 @@ void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx *exec_ctx, while (s->final_metadata_requested && s->seen_error && (bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames)) != NULL) { + gpr_log(GPR_DEBUG, "discard %p", bs); incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE); } if (s->incoming_frames.head != NULL) { *s->recv_message = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames); GPR_ASSERT(*s->recv_message != NULL); - run_closure_and_null(exec_ctx, &s->recv_message_ready, GRPC_ERROR_NONE); - } else if (s->published_metadata[1]) { + null_then_run_closure(exec_ctx, &s->recv_message_ready, GRPC_ERROR_NONE); + } else if (s->published_metadata[1] != GRPC_METADATA_NOT_PUBLISHED) { + const char *m = grpc_error_string(s->read_closed_error); + gpr_log(GPR_ERROR, "publish null :: %s", m); + abort(); + grpc_error_free_string(m); + *s->recv_message = NULL; - run_closure_and_null(exec_ctx, &s->recv_message_ready, GRPC_ERROR_NONE); + null_then_run_closure(exec_ctx, &s->recv_message_ready, GRPC_ERROR_NONE); } } } @@ -1238,14 +1256,17 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx, if (s->seen_error) { while ((bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames)) != NULL) { + gpr_log(GPR_DEBUG, "discard %p", bs); incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE); } } - if (s->all_incoming_byte_streams_finished && s->recv_trailing_metadata_finished != NULL) { + if (s->all_incoming_byte_streams_finished && + s->recv_trailing_metadata_finished != NULL) { grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[1], s->recv_trailing_metadata); grpc_chttp2_complete_closure_step( - exec_ctx, t, s, &s->recv_trailing_metadata_finished, GRPC_ERROR_NONE, "recv_trailing_metadata_finished"); + exec_ctx, t, s, &s->recv_trailing_metadata_finished, GRPC_ERROR_NONE, + "recv_trailing_metadata_finished"); } } } @@ -1262,7 +1283,6 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, uint32_t id, grpc_error *error) { grpc_chttp2_stream *s = grpc_chttp2_stream_map_delete(&t->stream_map, id); GPR_ASSERT(s); - s->in_stream_map = false; if (t->incoming_stream == s) { t->incoming_stream = NULL; grpc_chttp2_parsing_become_skip_parser(exec_ctx, t); @@ -1359,7 +1379,8 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, to the upper layers - drop what we've got, and then publish what we want - which is safe because we haven't told anyone about the metadata yet */ - if (!s->published_metadata[1] || s->recv_trailing_metadata_finished != NULL) { + if (s->published_metadata[1] == GRPC_METADATA_NOT_PUBLISHED || + s->recv_trailing_metadata_finished != NULL) { char status_string[GPR_LTOA_MIN_BUFSIZE]; gpr_ltoa(status, status_string); grpc_chttp2_incoming_metadata_buffer_add( @@ -1373,7 +1394,8 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, GRPC_MDSTR_GRPC_MESSAGE, grpc_mdstr_from_slice(gpr_slice_ref(*slice)))); } - s->published_metadata[1] = true; + gpr_log(GPR_DEBUG, "published_metadata from fake"); + s->published_metadata[1] = GRPC_METADATA_SYNTHESIZED_FROM_FAKE; grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s); } if (slice) { @@ -1413,20 +1435,21 @@ static void fail_pending_writes(grpc_exec_ctx *exec_ctx, grpc_error *error) { error = removal_error(error, s); s->fetching_send_message = NULL; - grpc_chttp2_complete_closure_step(exec_ctx, t, s, - &s->send_initial_metadata_finished, - GRPC_ERROR_REF(error), "send_initial_metadata_finished"); - grpc_chttp2_complete_closure_step(exec_ctx, t, s, - &s->send_trailing_metadata_finished, - GRPC_ERROR_REF(error), "send_trailing_metadata_finished"); - grpc_chttp2_complete_closure_step(exec_ctx, t, s, - &s->fetching_send_message_finished, - GRPC_ERROR_REF(error), "fetching_send_message_finished"); + grpc_chttp2_complete_closure_step( + exec_ctx, t, s, &s->send_initial_metadata_finished, GRPC_ERROR_REF(error), + "send_initial_metadata_finished"); + grpc_chttp2_complete_closure_step( + exec_ctx, t, s, &s->send_trailing_metadata_finished, + GRPC_ERROR_REF(error), "send_trailing_metadata_finished"); + grpc_chttp2_complete_closure_step( + exec_ctx, t, s, &s->fetching_send_message_finished, GRPC_ERROR_REF(error), + "fetching_send_message_finished"); while (s->on_write_finished_cbs) { grpc_chttp2_write_cb *cb = s->on_write_finished_cbs; s->on_write_finished_cbs = cb->next; grpc_chttp2_complete_closure_step(exec_ctx, t, s, &cb->closure, - GRPC_ERROR_REF(error), "on_write_finished_cb"); + GRPC_ERROR_REF(error), + "on_write_finished_cb"); cb->next = t->write_cb_pool; t->write_cb_pool = cb; } @@ -1445,8 +1468,11 @@ void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx, if (close_reads && !s->read_closed) { s->read_closed_error = GRPC_ERROR_REF(error); s->read_closed = true; - s->published_metadata[0] = true; - s->published_metadata[1] = true; + for (int i = 0; i < 2; i++) { + if (s->published_metadata[i] == GRPC_METADATA_NOT_PUBLISHED) { + s->published_metadata[i] = GPRC_METADATA_PUBLISHED_AT_CLOSE; + } + } decrement_active_streams_locked(exec_ctx, t, s); grpc_chttp2_maybe_complete_recv_initial_metadata(exec_ctx, t, s); grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); diff --git a/src/core/ext/transport/chttp2/transport/frame_data.c b/src/core/ext/transport/chttp2/transport/frame_data.c index bcb0ab0f99..8668816930 100644 --- a/src/core/ext/transport/chttp2/transport/frame_data.c +++ b/src/core/ext/transport/chttp2/transport/frame_data.c @@ -140,23 +140,17 @@ void grpc_chttp2_encode_data(uint32_t id, gpr_slice_buffer *inbuf, stats->data_bytes += write_bytes; } -grpc_error *grpc_chttp2_data_parser_parse(grpc_exec_ctx *exec_ctx, void *parser, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s, - gpr_slice slice, int is_last) { +static grpc_error *parse_inner(grpc_exec_ctx *exec_ctx, + grpc_chttp2_data_parser *p, + grpc_chttp2_transport *t, grpc_chttp2_stream *s, + gpr_slice slice) { uint8_t *const beg = GPR_SLICE_START_PTR(slice); uint8_t *const end = GPR_SLICE_END_PTR(slice); uint8_t *cur = beg; - grpc_chttp2_data_parser *p = parser; uint32_t message_flags; grpc_chttp2_incoming_byte_stream *incoming_byte_stream; char *msg; - if (is_last && p->is_last_frame) { - grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, false, - GRPC_ERROR_NONE); - } - if (cur == end) { return GRPC_ERROR_NONE; } @@ -272,3 +266,18 @@ grpc_error *grpc_chttp2_data_parser_parse(grpc_exec_ctx *exec_ctx, void *parser, GPR_UNREACHABLE_CODE(return GRPC_ERROR_CREATE("Should never reach here")); } + +grpc_error *grpc_chttp2_data_parser_parse(grpc_exec_ctx *exec_ctx, void *parser, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s, + gpr_slice slice, int is_last) { + grpc_chttp2_data_parser *p = parser; + grpc_error *error = parse_inner(exec_ctx, p, t, s, slice); + + if (is_last && p->is_last_frame) { + grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, false, + GRPC_ERROR_NONE); + } + + return error; +} diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.c b/src/core/ext/transport/chttp2/transport/hpack_parser.c index bd26b81622..8180f78fc0 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.c +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.c @@ -1607,7 +1607,8 @@ grpc_error *grpc_chttp2_header_parser_parse(grpc_exec_ctx *exec_ctx, if (s->header_frames_received == GPR_ARRAY_SIZE(s->metadata_buffer)) { return GRPC_ERROR_CREATE("Too many trailer frames"); } - s->published_metadata[s->header_frames_received] = true; + s->published_metadata[s->header_frames_received] = + GRPC_METADATA_PUBLISHED_FROM_WIRE; maybe_complete_funcs[s->header_frames_received](exec_ctx, t, s); s->header_frames_received++; } diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 1ca2f7a70a..27acf6321b 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -317,6 +317,13 @@ struct grpc_chttp2_transport { grpc_chttp2_write_cb *write_cb_pool; }; +typedef enum { + GRPC_METADATA_NOT_PUBLISHED, + GRPC_METADATA_SYNTHESIZED_FROM_FAKE, + GRPC_METADATA_PUBLISHED_FROM_WIRE, + GPRC_METADATA_PUBLISHED_AT_CLOSE +} grpc_published_metadata_method; + struct grpc_chttp2_stream { grpc_chttp2_transport *t; grpc_stream_refcount *refcount; @@ -370,8 +377,6 @@ struct grpc_chttp2_stream { bool read_closed; /** Are all published incoming byte streams closed. */ bool all_incoming_byte_streams_finished; - /** Is this stream in the stream map. */ - bool in_stream_map; /** Has this stream seen an error. If true, then pending incoming frames can be thrown away. */ bool seen_error; @@ -381,7 +386,7 @@ struct grpc_chttp2_stream { /** the error that resulted in this stream being write-closed */ grpc_error *write_closed_error; - bool published_metadata[2]; + grpc_published_metadata_method published_metadata[2]; bool final_metadata_requested; grpc_chttp2_incoming_metadata_buffer metadata_buffer[2]; diff --git a/src/core/ext/transport/chttp2/transport/stream_map.c b/src/core/ext/transport/chttp2/transport/stream_map.c index f70791c422..bd07412274 100644 --- a/src/core/ext/transport/chttp2/transport/stream_map.c +++ b/src/core/ext/transport/chttp2/transport/stream_map.c @@ -77,6 +77,7 @@ void grpc_chttp2_stream_map_add(grpc_chttp2_stream_map *map, uint32_t key, GPR_ASSERT(count == 0 || keys[count - 1] < key); GPR_ASSERT(value); + GPR_ASSERT(grpc_chttp2_stream_map_find(map, key) == NULL); if (count == capacity) { if (map->free > capacity / 4) { @@ -170,6 +171,7 @@ void *grpc_chttp2_stream_map_delete(grpc_chttp2_stream_map *map, uint32_t key) { if (map->free == map->count) { map->free = map->count = 0; } + GPR_ASSERT(grpc_chttp2_stream_map_find(map, key) == NULL); } return out; } diff --git a/test/cpp/end2end/thread_stress_test.cc b/test/cpp/end2end/thread_stress_test.cc index b021b34523..ebede19a7f 100644 --- a/test/cpp/end2end/thread_stress_test.cc +++ b/test/cpp/end2end/thread_stress_test.cc @@ -339,7 +339,10 @@ static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs) { ClientContext context; Status s = stub->Echo(&context, request, &response); EXPECT_EQ(response.message(), request.message()); - EXPECT_TRUE(s.ok()); + if (!s.ok()) { + gpr_log(GPR_ERROR, "RPC error: %d: %s", s.error_code(), s.error_message().c_str()); + } + ASSERT_TRUE(s.ok()); } } diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index 8062424a1f..ef54b4b766 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -130,6 +130,9 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient { grpc::Status s = stub->UnaryCall(&context, request_, &responses_[thread_idx]); entry->set_value((UsageTimer::Now() - start) * 1e9); + if (!s.ok()) { + gpr_log(GPR_ERROR, "RPC error: %d: %s", s.error_code(), s.error_message().c_str()); + } return s.ok(); } }; |