From a44cbfc11c7d018785ef5699b900453090df07e3 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 3 Feb 2016 16:02:49 -0800 Subject: Fix race condition in transport API Specifically: Receiving trailing and initial metadata had to be published in lock-step. => If we wanted trailing metadata, we might not get initial metadata processed until messages arrived. => Compression code had no idea what codec to use. To fix it, publish initial metadata as soon as it's ready (this is a transport API change). Requires changes to grpc_call to ensure ordering in processing initial metadata and messages (one may be delayed). Exposed at least some bugs in C++ where we never read initial metadata. I expect at least one more similar bug. --- src/core/surface/call.c | 154 +++++++++++++++++++++++++++-------------- src/core/surface/lame_client.c | 3 +- src/core/surface/server.c | 4 +- 3 files changed, 105 insertions(+), 56 deletions(-) (limited to 'src/core/surface') diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 9495e748b5..1b117aa6b8 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -159,6 +159,9 @@ struct grpc_call { uint8_t receiving_message; uint8_t received_final_op; + /* have we received initial metadata */ + bool has_initial_md_been_received; + batch_control active_batches[MAX_CONCURRENT_BATCHES]; /* first idx: is_receiving, second idx: is_trailing */ @@ -200,6 +203,7 @@ struct grpc_call { gpr_slice receiving_slice; grpc_closure receiving_slice_ready; grpc_closure receiving_stream_ready; + grpc_closure receiving_initial_metadata_ready; uint32_t test_only_last_message_flags; union { @@ -212,6 +216,11 @@ struct grpc_call { int *cancelled; } server; } final_op; + + struct { + void *bctlp; + bool success; + } saved_receiving_stream_ready_ctx; }; #define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) @@ -993,6 +1002,94 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, } } +static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl, + bool success) { + grpc_call *call = bctl->call; + if (call->receiving_stream == NULL) { + *call->receiving_buffer = NULL; + call->receiving_message = 0; + if (gpr_unref(&bctl->steps_to_complete)) { + post_batch_completion(exec_ctx, bctl); + } + } else if (call->receiving_stream->length > + grpc_channel_get_max_message_length(call->channel)) { + cancel_with_status(exec_ctx, call, GRPC_STATUS_INTERNAL, + "Max message size exceeded"); + grpc_byte_stream_destroy(exec_ctx, call->receiving_stream); + call->receiving_stream = NULL; + *call->receiving_buffer = NULL; + call->receiving_message = 0; + if (gpr_unref(&bctl->steps_to_complete)) { + post_batch_completion(exec_ctx, bctl); + } + } else { + call->test_only_last_message_flags = call->receiving_stream->flags; + if ((call->receiving_stream->flags & GRPC_WRITE_INTERNAL_COMPRESS) && + (call->compression_algorithm > GRPC_COMPRESS_NONE)) { + *call->receiving_buffer = grpc_raw_compressed_byte_buffer_create( + NULL, 0, call->compression_algorithm); + } else { + *call->receiving_buffer = grpc_raw_byte_buffer_create(NULL, 0); + } + grpc_closure_init(&call->receiving_slice_ready, receiving_slice_ready, + bctl); + continue_receiving_slices(exec_ctx, bctl); + /* early out */ + return; + } +} + +static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, + bool success) { + batch_control *bctl = bctlp; + grpc_call *call = bctl->call; + + gpr_mu_lock(&bctl->call->mu); + if (bctl->call->has_initial_md_been_received) { + gpr_mu_unlock(&bctl->call->mu); + process_data_after_md(exec_ctx, bctlp, success); + } else { + call->saved_receiving_stream_ready_ctx.bctlp = bctlp; + call->saved_receiving_stream_ready_ctx.success = success; + gpr_mu_unlock(&bctl->call->mu); + } +} + +static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, + void *bctlp, bool success) { + batch_control *bctl = bctlp; + grpc_call *call = bctl->call; + + gpr_mu_lock(&call->mu); + + grpc_metadata_batch *md = + &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; + grpc_metadata_batch_filter(md, recv_initial_filter, call); + call->has_initial_md_been_received = true; + + if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) != + 0 && + !call->is_client) { + GPR_TIMER_BEGIN("set_deadline_alarm", 0); + set_deadline_alarm(exec_ctx, call, md->deadline); + GPR_TIMER_END("set_deadline_alarm", 0); + } + + if (call->saved_receiving_stream_ready_ctx.bctlp != NULL) { + grpc_closure *saved_rsr_closure = grpc_closure_create( + receiving_stream_ready, call->saved_receiving_stream_ready_ctx.bctlp); + grpc_exec_ctx_enqueue(exec_ctx, saved_rsr_closure, + call->saved_receiving_stream_ready_ctx.success, NULL); + call->saved_receiving_stream_ready_ctx.bctlp = NULL; + } + + gpr_mu_unlock(&call->mu); + + if (gpr_unref(&bctl->steps_to_complete)) { + post_batch_completion(exec_ctx, bctl); + } +} + static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) { batch_control *bctl = bctlp; grpc_call *call = bctl->call; @@ -1011,19 +1108,6 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) { grpc_metadata_batch_destroy( &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]); } - if (bctl->recv_initial_metadata) { - grpc_metadata_batch *md = - &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; - grpc_metadata_batch_filter(md, recv_initial_filter, call); - - if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) != - 0 && - !call->is_client) { - GPR_TIMER_BEGIN("set_deadline_alarm", 0); - set_deadline_alarm(exec_ctx, call, md->deadline); - GPR_TIMER_END("set_deadline_alarm", 0); - } - } if (bctl->recv_final_op) { grpc_metadata_batch *md = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; @@ -1065,45 +1149,6 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) { } } -static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, - bool success) { - batch_control *bctl = bctlp; - grpc_call *call = bctl->call; - - if (call->receiving_stream == NULL) { - *call->receiving_buffer = NULL; - call->receiving_message = 0; - if (gpr_unref(&bctl->steps_to_complete)) { - post_batch_completion(exec_ctx, bctl); - } - } else if (call->receiving_stream->length > - grpc_channel_get_max_message_length(call->channel)) { - cancel_with_status(exec_ctx, call, GRPC_STATUS_INTERNAL, - "Max message size exceeded"); - grpc_byte_stream_destroy(exec_ctx, call->receiving_stream); - call->receiving_stream = NULL; - *call->receiving_buffer = NULL; - call->receiving_message = 0; - if (gpr_unref(&bctl->steps_to_complete)) { - post_batch_completion(exec_ctx, bctl); - } - } else { - call->test_only_last_message_flags = call->receiving_stream->flags; - if ((call->receiving_stream->flags & GRPC_WRITE_INTERNAL_COMPRESS) && - (call->compression_algorithm > GRPC_COMPRESS_NONE)) { - *call->receiving_buffer = grpc_raw_compressed_byte_buffer_create( - NULL, 0, call->compression_algorithm); - } else { - *call->receiving_buffer = grpc_raw_byte_buffer_create(NULL, 0); - } - grpc_closure_init(&call->receiving_slice_ready, receiving_slice_ready, - bctl); - continue_receiving_slices(exec_ctx, bctl); - /* early out */ - return; - } -} - static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, grpc_call *call, const grpc_op *ops, size_t nops, void *notify_tag, @@ -1273,9 +1318,14 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, } call->received_initial_metadata = 1; call->buffered_metadata[0] = op->data.recv_initial_metadata; + grpc_closure_init(&call->receiving_initial_metadata_ready, + receiving_initial_metadata_ready, bctl); bctl->recv_initial_metadata = 1; stream_op.recv_initial_metadata = &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; + stream_op.recv_initial_metadata_ready = + &call->receiving_initial_metadata_ready; + num_completion_callbacks_needed++; break; case GRPC_OP_RECV_MESSAGE: /* Flag validation: currently allow no flags */ diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index 705996cad3..537069e984 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -78,8 +78,7 @@ static void lame_start_transport_stream_op(grpc_exec_ctx *exec_ctx, } else if (op->recv_trailing_metadata != NULL) { fill_metadata(elem, op->recv_trailing_metadata); } - grpc_exec_ctx_enqueue(exec_ctx, op->on_complete, false, NULL); - grpc_exec_ctx_enqueue(exec_ctx, op->recv_message_ready, false, NULL); + grpc_transport_stream_op_finish_with_failure(exec_ctx, op); } static char *lame_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 42cffccb4c..fb5e0d4b9e 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -596,8 +596,8 @@ static void server_mutate_op(grpc_call_element *elem, if (op->recv_initial_metadata != NULL) { calld->recv_initial_metadata = op->recv_initial_metadata; - calld->on_done_recv_initial_metadata = op->on_complete; - op->on_complete = &calld->server_on_recv_initial_metadata; + calld->on_done_recv_initial_metadata = op->recv_initial_metadata_ready; + op->recv_initial_metadata_ready = &calld->server_on_recv_initial_metadata; } } -- cgit v1.2.3 From 6f87164f3c3bfd918a5bf571e2da6ec73cb9ea57 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 3 Feb 2016 16:15:31 -0800 Subject: Properly handle "." in metadata --- src/core/surface/validate_metadata.c | 2 +- test/cpp/end2end/async_end2end_test.cc | 4 ++++ tools/codegen/core/gen_legal_metadata_characters.c | 2 +- 3 files changed, 6 insertions(+), 2 deletions(-) (limited to 'src/core/surface') diff --git a/src/core/surface/validate_metadata.c b/src/core/surface/validate_metadata.c index df2e80b4b7..bf4126867f 100644 --- a/src/core/surface/validate_metadata.c +++ b/src/core/surface/validate_metadata.c @@ -50,7 +50,7 @@ static int conforms_to(const char *s, size_t len, const uint8_t *legal_bits) { int grpc_header_key_is_legal(const char *key, size_t length) { static const uint8_t legal_header_bits[256 / 8] = { - 0x00, 0x00, 0x00, 0x00, 0x00, 0x20, 0xff, 0x03, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x60, 0xff, 0x03, 0x00, 0x00, 0x00, 0x80, 0xfe, 0xff, 0xff, 0x07, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}; if (length == 0) { diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 252bda3798..a194c615cd 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -479,8 +479,10 @@ TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) { send_request.set_message("Hello"); std::pair meta1("key1", "val1"); std::pair meta2("key2", "val2"); + std::pair meta3("g.r.d-bin", "xyz"); cli_ctx.AddMetadata(meta1.first, meta1.second); cli_ctx.AddMetadata(meta2.first, meta2.second); + cli_ctx.AddMetadata(meta3.first, meta3.second); std::unique_ptr> response_reader( stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); @@ -494,6 +496,8 @@ TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) { ToString(client_initial_metadata.find(meta1.first)->second)); EXPECT_EQ(meta2.second, ToString(client_initial_metadata.find(meta2.first)->second)); + EXPECT_EQ(meta3.second, + ToString(client_initial_metadata.find(meta3.first)->second)); EXPECT_GE(client_initial_metadata.size(), static_cast(2)); send_response.set_message(recv_request.message()); diff --git a/tools/codegen/core/gen_legal_metadata_characters.c b/tools/codegen/core/gen_legal_metadata_characters.c index 3c9e1c7619..6ac32656cb 100644 --- a/tools/codegen/core/gen_legal_metadata_characters.c +++ b/tools/codegen/core/gen_legal_metadata_characters.c @@ -52,7 +52,7 @@ static void legal(int x) { static void dump(void) { int i; - printf("static const gpr_uint8 legal_header_bits[256/8] = "); + printf("static const uint8_t legal_header_bits[256/8] = "); for (i = 0; i < 256 / 8; i++) printf("%c 0x%02x", i ? ',' : '{', legal_bits[i]); printf(" };\n"); -- cgit v1.2.3 From 090c867e28c6898b5e5fe32d8fe5ef5178986721 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Thu, 11 Feb 2016 14:43:43 -0800 Subject: This testing exposed a race condition in alarm creation - the alarm needs to "begin" at the CQ before the timer should be inited. --- src/core/surface/alarm.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/core/surface') diff --git a/src/core/surface/alarm.c b/src/core/surface/alarm.c index d753023ca9..fb496f6c47 100644 --- a/src/core/surface/alarm.c +++ b/src/core/surface/alarm.c @@ -63,9 +63,9 @@ grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline, alarm->cq = cq; alarm->tag = tag; + grpc_cq_begin_op(cq, tag); grpc_timer_init(&exec_ctx, &alarm->alarm, deadline, alarm_cb, alarm, gpr_now(GPR_CLOCK_MONOTONIC)); - grpc_cq_begin_op(cq, tag); grpc_exec_ctx_finish(&exec_ctx); return alarm; } -- cgit v1.2.3