diff options
author | 2016-02-11 18:16:11 -0800 | |
---|---|---|
committer | 2016-02-11 18:16:11 -0800 | |
commit | 3f19f427714c08d1fce449cf8904fa1494602285 (patch) | |
tree | f80a1816796fb93aaca615ffe3b3403500635113 /src | |
parent | cf36536a30786ce0ed3ad87a56f5f691d530a7b3 (diff) | |
parent | c0473cc8235ab94f38a9d9891e2352e76960fb96 (diff) |
Merge pull request #5052 from ctiller/compress
Fix race condition in transport API
Diffstat (limited to 'src')
-rw-r--r-- | src/core/census/grpc_filter.c | 4 | ||||
-rw-r--r-- | src/core/channel/http_client_filter.c | 4 | ||||
-rw-r--r-- | src/core/channel/http_server_filter.c | 4 | ||||
-rw-r--r-- | src/core/channel/subchannel_call_holder.c | 6 | ||||
-rw-r--r-- | src/core/security/server_auth_filter.c | 4 | ||||
-rw-r--r-- | src/core/surface/call.c | 154 | ||||
-rw-r--r-- | src/core/surface/lame_client.c | 3 | ||||
-rw-r--r-- | src/core/surface/server.c | 4 | ||||
-rw-r--r-- | src/core/transport/chttp2/internal.h | 2 | ||||
-rw-r--r-- | src/core/transport/chttp2_transport.c | 15 | ||||
-rw-r--r-- | src/core/transport/transport.c | 1 | ||||
-rw-r--r-- | src/core/transport/transport.h | 5 | ||||
-rw-r--r-- | src/ruby/spec/client_server_spec.rb | 3 |
13 files changed, 131 insertions, 78 deletions
diff --git a/src/core/census/grpc_filter.c b/src/core/census/grpc_filter.c index a8db32b9d5..c8aaf31e2d 100644 --- a/src/core/census/grpc_filter.c +++ b/src/core/census/grpc_filter.c @@ -107,8 +107,8 @@ static void server_mutate_op(grpc_call_element *elem, if (op->recv_initial_metadata) { /* substitute our callback for the op callback */ calld->recv_initial_metadata = op->recv_initial_metadata; - calld->on_done_recv = op->on_complete; - op->on_complete = &calld->finish_recv; + calld->on_done_recv = op->recv_initial_metadata_ready; + op->recv_initial_metadata_ready = &calld->finish_recv; } } diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c index 43eee046b8..1aa27208c2 100644 --- a/src/core/channel/http_client_filter.c +++ b/src/core/channel/http_client_filter.c @@ -127,8 +127,8 @@ static void hc_mutate_op(grpc_call_element *elem, if (op->recv_initial_metadata != NULL) { /* substitute our callback for the higher callback */ calld->recv_initial_metadata = op->recv_initial_metadata; - calld->on_done_recv = op->on_complete; - op->on_complete = &calld->hc_on_recv; + calld->on_done_recv = op->recv_initial_metadata_ready; + op->recv_initial_metadata_ready = &calld->hc_on_recv; } } diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c index bb75323933..370f8dbe42 100644 --- a/src/core/channel/http_server_filter.c +++ b/src/core/channel/http_server_filter.c @@ -186,8 +186,8 @@ static void hs_mutate_op(grpc_call_element *elem, if (op->recv_initial_metadata) { /* substitute our callback for the higher callback */ calld->recv_initial_metadata = op->recv_initial_metadata; - calld->on_done_recv = op->on_complete; - op->on_complete = &calld->hs_on_recv; + calld->on_done_recv = op->recv_initial_metadata_ready; + op->recv_initial_metadata_ready = &calld->hs_on_recv; } } diff --git a/src/core/channel/subchannel_call_holder.c b/src/core/channel/subchannel_call_holder.c index 3ad9fd9efb..81297c8d44 100644 --- a/src/core/channel/subchannel_call_holder.c +++ b/src/core/channel/subchannel_call_holder.c @@ -241,10 +241,8 @@ static void fail_locked(grpc_exec_ctx *exec_ctx, grpc_subchannel_call_holder *holder) { size_t i; for (i = 0; i < holder->waiting_ops_count; i++) { - grpc_exec_ctx_enqueue(exec_ctx, holder->waiting_ops[i].on_complete, false, - NULL); - grpc_exec_ctx_enqueue(exec_ctx, holder->waiting_ops[i].recv_message_ready, - false, NULL); + grpc_transport_stream_op_finish_with_failure(exec_ctx, + &holder->waiting_ops[i]); } holder->waiting_ops_count = 0; } diff --git a/src/core/security/server_auth_filter.c b/src/core/security/server_auth_filter.c index 4c78711387..3d8e5e8d35 100644 --- a/src/core/security/server_auth_filter.c +++ b/src/core/security/server_auth_filter.c @@ -176,8 +176,8 @@ static void set_recv_ops_md_callbacks(grpc_call_element *elem, if (op->recv_initial_metadata != NULL) { /* substitute our callback for the higher callback */ calld->recv_initial_metadata = op->recv_initial_metadata; - calld->on_done_recv = op->on_complete; - op->on_complete = &calld->auth_on_recv; + calld->on_done_recv = op->recv_initial_metadata_ready; + op->recv_initial_metadata_ready = &calld->auth_on_recv; calld->transport_op = *op; } } diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 9495e748b5..1b117aa6b8 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -159,6 +159,9 @@ struct grpc_call { uint8_t receiving_message; uint8_t received_final_op; + /* have we received initial metadata */ + bool has_initial_md_been_received; + batch_control active_batches[MAX_CONCURRENT_BATCHES]; /* first idx: is_receiving, second idx: is_trailing */ @@ -200,6 +203,7 @@ struct grpc_call { gpr_slice receiving_slice; grpc_closure receiving_slice_ready; grpc_closure receiving_stream_ready; + grpc_closure receiving_initial_metadata_ready; uint32_t test_only_last_message_flags; union { @@ -212,6 +216,11 @@ struct grpc_call { int *cancelled; } server; } final_op; + + struct { + void *bctlp; + bool success; + } saved_receiving_stream_ready_ctx; }; #define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) @@ -993,6 +1002,94 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, } } +static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl, + bool success) { + grpc_call *call = bctl->call; + if (call->receiving_stream == NULL) { + *call->receiving_buffer = NULL; + call->receiving_message = 0; + if (gpr_unref(&bctl->steps_to_complete)) { + post_batch_completion(exec_ctx, bctl); + } + } else if (call->receiving_stream->length > + grpc_channel_get_max_message_length(call->channel)) { + cancel_with_status(exec_ctx, call, GRPC_STATUS_INTERNAL, + "Max message size exceeded"); + grpc_byte_stream_destroy(exec_ctx, call->receiving_stream); + call->receiving_stream = NULL; + *call->receiving_buffer = NULL; + call->receiving_message = 0; + if (gpr_unref(&bctl->steps_to_complete)) { + post_batch_completion(exec_ctx, bctl); + } + } else { + call->test_only_last_message_flags = call->receiving_stream->flags; + if ((call->receiving_stream->flags & GRPC_WRITE_INTERNAL_COMPRESS) && + (call->compression_algorithm > GRPC_COMPRESS_NONE)) { + *call->receiving_buffer = grpc_raw_compressed_byte_buffer_create( + NULL, 0, call->compression_algorithm); + } else { + *call->receiving_buffer = grpc_raw_byte_buffer_create(NULL, 0); + } + grpc_closure_init(&call->receiving_slice_ready, receiving_slice_ready, + bctl); + continue_receiving_slices(exec_ctx, bctl); + /* early out */ + return; + } +} + +static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, + bool success) { + batch_control *bctl = bctlp; + grpc_call *call = bctl->call; + + gpr_mu_lock(&bctl->call->mu); + if (bctl->call->has_initial_md_been_received) { + gpr_mu_unlock(&bctl->call->mu); + process_data_after_md(exec_ctx, bctlp, success); + } else { + call->saved_receiving_stream_ready_ctx.bctlp = bctlp; + call->saved_receiving_stream_ready_ctx.success = success; + gpr_mu_unlock(&bctl->call->mu); + } +} + +static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, + void *bctlp, bool success) { + batch_control *bctl = bctlp; + grpc_call *call = bctl->call; + + gpr_mu_lock(&call->mu); + + grpc_metadata_batch *md = + &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; + grpc_metadata_batch_filter(md, recv_initial_filter, call); + call->has_initial_md_been_received = true; + + if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) != + 0 && + !call->is_client) { + GPR_TIMER_BEGIN("set_deadline_alarm", 0); + set_deadline_alarm(exec_ctx, call, md->deadline); + GPR_TIMER_END("set_deadline_alarm", 0); + } + + if (call->saved_receiving_stream_ready_ctx.bctlp != NULL) { + grpc_closure *saved_rsr_closure = grpc_closure_create( + receiving_stream_ready, call->saved_receiving_stream_ready_ctx.bctlp); + grpc_exec_ctx_enqueue(exec_ctx, saved_rsr_closure, + call->saved_receiving_stream_ready_ctx.success, NULL); + call->saved_receiving_stream_ready_ctx.bctlp = NULL; + } + + gpr_mu_unlock(&call->mu); + + if (gpr_unref(&bctl->steps_to_complete)) { + post_batch_completion(exec_ctx, bctl); + } +} + static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) { batch_control *bctl = bctlp; grpc_call *call = bctl->call; @@ -1011,19 +1108,6 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) { grpc_metadata_batch_destroy( &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]); } - if (bctl->recv_initial_metadata) { - grpc_metadata_batch *md = - &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; - grpc_metadata_batch_filter(md, recv_initial_filter, call); - - if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) != - 0 && - !call->is_client) { - GPR_TIMER_BEGIN("set_deadline_alarm", 0); - set_deadline_alarm(exec_ctx, call, md->deadline); - GPR_TIMER_END("set_deadline_alarm", 0); - } - } if (bctl->recv_final_op) { grpc_metadata_batch *md = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; @@ -1065,45 +1149,6 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) { } } -static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, - bool success) { - batch_control *bctl = bctlp; - grpc_call *call = bctl->call; - - if (call->receiving_stream == NULL) { - *call->receiving_buffer = NULL; - call->receiving_message = 0; - if (gpr_unref(&bctl->steps_to_complete)) { - post_batch_completion(exec_ctx, bctl); - } - } else if (call->receiving_stream->length > - grpc_channel_get_max_message_length(call->channel)) { - cancel_with_status(exec_ctx, call, GRPC_STATUS_INTERNAL, - "Max message size exceeded"); - grpc_byte_stream_destroy(exec_ctx, call->receiving_stream); - call->receiving_stream = NULL; - *call->receiving_buffer = NULL; - call->receiving_message = 0; - if (gpr_unref(&bctl->steps_to_complete)) { - post_batch_completion(exec_ctx, bctl); - } - } else { - call->test_only_last_message_flags = call->receiving_stream->flags; - if ((call->receiving_stream->flags & GRPC_WRITE_INTERNAL_COMPRESS) && - (call->compression_algorithm > GRPC_COMPRESS_NONE)) { - *call->receiving_buffer = grpc_raw_compressed_byte_buffer_create( - NULL, 0, call->compression_algorithm); - } else { - *call->receiving_buffer = grpc_raw_byte_buffer_create(NULL, 0); - } - grpc_closure_init(&call->receiving_slice_ready, receiving_slice_ready, - bctl); - continue_receiving_slices(exec_ctx, bctl); - /* early out */ - return; - } -} - static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, grpc_call *call, const grpc_op *ops, size_t nops, void *notify_tag, @@ -1273,9 +1318,14 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, } call->received_initial_metadata = 1; call->buffered_metadata[0] = op->data.recv_initial_metadata; + grpc_closure_init(&call->receiving_initial_metadata_ready, + receiving_initial_metadata_ready, bctl); bctl->recv_initial_metadata = 1; stream_op.recv_initial_metadata = &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; + stream_op.recv_initial_metadata_ready = + &call->receiving_initial_metadata_ready; + num_completion_callbacks_needed++; break; case GRPC_OP_RECV_MESSAGE: /* Flag validation: currently allow no flags */ diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index 705996cad3..537069e984 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -78,8 +78,7 @@ static void lame_start_transport_stream_op(grpc_exec_ctx *exec_ctx, } else if (op->recv_trailing_metadata != NULL) { fill_metadata(elem, op->recv_trailing_metadata); } - grpc_exec_ctx_enqueue(exec_ctx, op->on_complete, false, NULL); - grpc_exec_ctx_enqueue(exec_ctx, op->recv_message_ready, false, NULL); + grpc_transport_stream_op_finish_with_failure(exec_ctx, op); } static char *lame_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 42cffccb4c..fb5e0d4b9e 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -596,8 +596,8 @@ static void server_mutate_op(grpc_call_element *elem, if (op->recv_initial_metadata != NULL) { calld->recv_initial_metadata = op->recv_initial_metadata; - calld->on_done_recv_initial_metadata = op->on_complete; - op->on_complete = &calld->server_on_recv_initial_metadata; + calld->on_done_recv_initial_metadata = op->recv_initial_metadata_ready; + op->recv_initial_metadata_ready = &calld->server_on_recv_initial_metadata; } } diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index c611496e7e..0e1e2c4265 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -385,7 +385,7 @@ typedef struct { grpc_closure *send_trailing_metadata_finished; grpc_metadata_batch *recv_initial_metadata; - grpc_closure *recv_initial_metadata_finished; + grpc_closure *recv_initial_metadata_ready; grpc_byte_stream **recv_message; grpc_closure *recv_message_ready; grpc_metadata_batch *recv_trailing_metadata; diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 9298573c7f..617d98875c 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -544,7 +544,7 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, GPR_ASSERT(s->global.send_initial_metadata_finished == NULL); GPR_ASSERT(s->global.send_message_finished == NULL); GPR_ASSERT(s->global.send_trailing_metadata_finished == NULL); - GPR_ASSERT(s->global.recv_initial_metadata_finished == NULL); + GPR_ASSERT(s->global.recv_initial_metadata_ready == NULL); GPR_ASSERT(s->global.recv_message_ready == NULL); GPR_ASSERT(s->global.recv_trailing_metadata_finished == NULL); grpc_chttp2_data_parser_destroy(exec_ctx, &s->parsing.data_parser); @@ -863,9 +863,9 @@ static void perform_stream_op_locked( } if (op->recv_initial_metadata != NULL) { - GPR_ASSERT(stream_global->recv_initial_metadata_finished == NULL); - stream_global->recv_initial_metadata_finished = - add_closure_barrier(on_complete); + GPR_ASSERT(stream_global->recv_initial_metadata_ready == NULL); + stream_global->recv_initial_metadata_ready = + op->recv_initial_metadata_ready; stream_global->recv_initial_metadata = op->recv_initial_metadata; grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); } @@ -1009,13 +1009,14 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, grpc_byte_stream *bs; while ( grpc_chttp2_list_pop_check_read_ops(transport_global, &stream_global)) { - if (stream_global->recv_initial_metadata_finished != NULL && + if (stream_global->recv_initial_metadata_ready != NULL && stream_global->published_initial_metadata) { grpc_chttp2_incoming_metadata_buffer_publish( &stream_global->received_initial_metadata, stream_global->recv_initial_metadata); - grpc_chttp2_complete_closure_step( - exec_ctx, &stream_global->recv_initial_metadata_finished, 1); + grpc_exec_ctx_enqueue( + exec_ctx, stream_global->recv_initial_metadata_ready, true, NULL); + stream_global->recv_initial_metadata_ready = NULL; } if (stream_global->recv_message_ready != NULL) { if (stream_global->incoming_frames.head != NULL) { diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c index 08d685668c..6e154b629a 100644 --- a/src/core/transport/transport.c +++ b/src/core/transport/transport.c @@ -126,6 +126,7 @@ char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx, void grpc_transport_stream_op_finish_with_failure( grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *op) { grpc_exec_ctx_enqueue(exec_ctx, op->recv_message_ready, false, NULL); + grpc_exec_ctx_enqueue(exec_ctx, op->recv_initial_metadata_ready, false, NULL); grpc_exec_ctx_enqueue(exec_ctx, op->on_complete, false, NULL); } diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index f5cac77adc..8902c5d2f6 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -92,6 +92,8 @@ typedef struct grpc_transport_stream_op { /** Receive initial metadata from the stream, into provided metadata batch. */ grpc_metadata_batch *recv_initial_metadata; + /** Should be enqueued when initial metadata is ready to be processed. */ + grpc_closure *recv_initial_metadata_ready; /** Receive message data from the stream, into provided byte stream. */ grpc_byte_stream **recv_message; @@ -103,7 +105,8 @@ typedef struct grpc_transport_stream_op { grpc_metadata_batch *recv_trailing_metadata; /** Should be enqueued when all requested operations (excluding recv_message - which has its own closure) in a given batch have been completed. */ + and recv_initial_metadata which have their own closures) in a given batch + have been completed. */ grpc_closure *on_complete; /** If != GRPC_STATUS_OK, cancel this stream */ diff --git a/src/ruby/spec/client_server_spec.rb b/src/ruby/spec/client_server_spec.rb index 594fda1cd3..7ef534571f 100644 --- a/src/ruby/spec/client_server_spec.rb +++ b/src/ruby/spec/client_server_spec.rb @@ -1,4 +1,4 @@ -# Copyright 2015, Google Inc. +# Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -198,6 +198,7 @@ shared_examples 'basic GRPC message delivery is OK' do # confirm the client can receive the server response and status. client_ops = { CallOps::SEND_CLOSE_FROM_CLIENT => nil, + CallOps::RECV_INITIAL_METADATA => nil, CallOps::RECV_MESSAGE => nil, CallOps::RECV_STATUS_ON_CLIENT => nil } |