From 5bacf2e4f6c77a2c90f41b6f6c2b62321e5ce432 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Tue, 19 Jun 2018 08:19:58 -0700 Subject: Allocate retry payload fields with subchannel call instead of with each batch. --- .../ext/filters/client_channel/client_channel.cc | 129 ++++++++++++--------- 1 file changed, 72 insertions(+), 57 deletions(-) (limited to 'src') diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 34ea97e23e..520431e63b 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -794,6 +794,15 @@ typedef struct { // The batch to use in the subchannel call. // Its payload field points to subchannel_call_retry_state.batch_payload. grpc_transport_stream_op_batch batch; + // For intercepting on_complete. + grpc_closure on_complete; +} subchannel_batch_data; + +// Retry state associated with a subchannel call. +// Stored in the parent_data of the subchannel call object. +typedef struct { + // subchannel_batch_data.batch.payload points to this. + grpc_transport_stream_op_batch_payload batch_payload; // For send_initial_metadata. // Note that we need to make a copy of the initial metadata for each // subchannel call instead of just referring to the copy in call_data, @@ -818,15 +827,6 @@ typedef struct { grpc_metadata_batch recv_trailing_metadata; grpc_transport_stream_stats collect_stats; grpc_closure recv_trailing_metadata_ready; - // For intercepting on_complete. - grpc_closure on_complete; -} subchannel_batch_data; - -// Retry state associated with a subchannel call. -// Stored in the parent_data of the subchannel call object. -typedef struct { - // subchannel_batch_data.batch.payload points to this. - grpc_transport_stream_op_batch_payload batch_payload; // These fields indicate which ops have been started and completed on // this subchannel call. size_t started_send_message_count; @@ -1524,17 +1524,21 @@ static subchannel_batch_data* batch_data_create(grpc_call_element* elem, static void batch_data_unref(subchannel_batch_data* batch_data) { if (gpr_unref(&batch_data->refs)) { - if (batch_data->send_initial_metadata_storage != nullptr) { - grpc_metadata_batch_destroy(&batch_data->send_initial_metadata); + subchannel_call_retry_state* retry_state = + static_cast( + grpc_connected_subchannel_call_get_parent_data( + batch_data->subchannel_call)); + if (batch_data->batch.send_initial_metadata) { + grpc_metadata_batch_destroy(&retry_state->send_initial_metadata); } - if (batch_data->send_trailing_metadata_storage != nullptr) { - grpc_metadata_batch_destroy(&batch_data->send_trailing_metadata); + if (batch_data->batch.send_trailing_metadata) { + grpc_metadata_batch_destroy(&retry_state->send_trailing_metadata); } if (batch_data->batch.recv_initial_metadata) { - grpc_metadata_batch_destroy(&batch_data->recv_initial_metadata); + grpc_metadata_batch_destroy(&retry_state->recv_initial_metadata); } if (batch_data->batch.recv_trailing_metadata) { - grpc_metadata_batch_destroy(&batch_data->recv_trailing_metadata); + grpc_metadata_batch_destroy(&retry_state->recv_trailing_metadata); } GRPC_SUBCHANNEL_CALL_UNREF(batch_data->subchannel_call, "batch_data_unref"); call_data* calld = static_cast(batch_data->elem->call_data); @@ -1560,8 +1564,12 @@ static void invoke_recv_initial_metadata_callback(void* arg, }); GPR_ASSERT(pending != nullptr); // Return metadata. + subchannel_call_retry_state* retry_state = + static_cast( + grpc_connected_subchannel_call_get_parent_data( + batch_data->subchannel_call)); grpc_metadata_batch_move( - &batch_data->recv_initial_metadata, + &retry_state->recv_initial_metadata, pending->batch->payload->recv_initial_metadata.recv_initial_metadata); // Update bookkeeping. // Note: Need to do this before invoking the callback, since invoking @@ -1606,7 +1614,7 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) { // the recv_trailing_metadata_ready callback, then defer propagating this // callback back to the surface. We can evaluate whether to retry when // recv_trailing_metadata comes back. - if (GPR_UNLIKELY((batch_data->trailing_metadata_available || + if (GPR_UNLIKELY((retry_state->trailing_metadata_available || error != GRPC_ERROR_NONE) && !retry_state->completed_recv_trailing_metadata)) { if (grpc_client_channel_trace.enabled()) { @@ -1651,8 +1659,12 @@ static void invoke_recv_message_callback(void* arg, grpc_error* error) { }); GPR_ASSERT(pending != nullptr); // Return payload. + subchannel_call_retry_state* retry_state = + static_cast( + grpc_connected_subchannel_call_get_parent_data( + batch_data->subchannel_call)); *pending->batch->payload->recv_message.recv_message = - std::move(batch_data->recv_message); + std::move(retry_state->recv_message); // Update bookkeeping. // Note: Need to do this before invoking the callback, since invoking // the callback will result in yielding the call combiner. @@ -1693,7 +1705,7 @@ static void recv_message_ready(void* arg, grpc_error* error) { // callback back to the surface. We can evaluate whether to retry when // recv_trailing_metadata comes back. if (GPR_UNLIKELY( - (batch_data->recv_message == nullptr || error != GRPC_ERROR_NONE) && + (retry_state->recv_message == nullptr || error != GRPC_ERROR_NONE) && !retry_state->completed_recv_trailing_metadata)) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, @@ -1766,8 +1778,12 @@ static void add_closure_for_recv_trailing_metadata_ready( return; } // Return metadata. + subchannel_call_retry_state* retry_state = + static_cast( + grpc_connected_subchannel_call_get_parent_data( + batch_data->subchannel_call)); grpc_metadata_batch_move( - &batch_data->recv_trailing_metadata, + &retry_state->recv_trailing_metadata, pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata); // Add closure. closures->Add(pending->batch->payload->recv_trailing_metadata @@ -1788,11 +1804,11 @@ static void add_closures_for_deferred_recv_callbacks( // Add closure for deferred recv_initial_metadata_ready. if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch != nullptr)) { - GRPC_CLOSURE_INIT(&batch_data->recv_initial_metadata_ready, + GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready, invoke_recv_initial_metadata_callback, retry_state->recv_initial_metadata_ready_deferred_batch, grpc_schedule_on_exec_ctx); - closures->Add(&batch_data->recv_initial_metadata_ready, + closures->Add(&retry_state->recv_initial_metadata_ready, retry_state->recv_initial_metadata_error, "resuming recv_initial_metadata_ready"); retry_state->recv_initial_metadata_ready_deferred_batch = nullptr; @@ -1800,11 +1816,11 @@ static void add_closures_for_deferred_recv_callbacks( // Add closure for deferred recv_message_ready. if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch != nullptr)) { - GRPC_CLOSURE_INIT(&batch_data->recv_message_ready, + GRPC_CLOSURE_INIT(&retry_state->recv_message_ready, invoke_recv_message_callback, retry_state->recv_message_ready_deferred_batch, grpc_schedule_on_exec_ctx); - closures->Add(&batch_data->recv_message_ready, + closures->Add(&retry_state->recv_message_ready, retry_state->recv_message_error, "resuming recv_message_ready"); retry_state->recv_message_ready_deferred_batch = nullptr; @@ -2120,28 +2136,28 @@ static void add_retriable_send_initial_metadata_op( // // If we've already completed one or more attempts, add the // grpc-retry-attempts header. - batch_data->send_initial_metadata_storage = + retry_state->send_initial_metadata_storage = static_cast(gpr_arena_alloc( calld->arena, sizeof(grpc_linked_mdelem) * (calld->send_initial_metadata.list.count + (calld->num_attempts_completed > 0)))); grpc_metadata_batch_copy(&calld->send_initial_metadata, - &batch_data->send_initial_metadata, - batch_data->send_initial_metadata_storage); - if (GPR_UNLIKELY(batch_data->send_initial_metadata.idx.named + &retry_state->send_initial_metadata, + retry_state->send_initial_metadata_storage); + if (GPR_UNLIKELY(retry_state->send_initial_metadata.idx.named .grpc_previous_rpc_attempts != nullptr)) { - grpc_metadata_batch_remove( - &batch_data->send_initial_metadata, - batch_data->send_initial_metadata.idx.named.grpc_previous_rpc_attempts); + grpc_metadata_batch_remove(&retry_state->send_initial_metadata, + retry_state->send_initial_metadata.idx.named + .grpc_previous_rpc_attempts); } if (GPR_UNLIKELY(calld->num_attempts_completed > 0)) { grpc_mdelem retry_md = grpc_mdelem_from_slices( GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS, *retry_count_strings[calld->num_attempts_completed - 1]); grpc_error* error = grpc_metadata_batch_add_tail( - &batch_data->send_initial_metadata, - &batch_data->send_initial_metadata_storage[calld->send_initial_metadata - .list.count], + &retry_state->send_initial_metadata, + &retry_state->send_initial_metadata_storage[calld->send_initial_metadata + .list.count], retry_md); if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { gpr_log(GPR_ERROR, "error adding retry metadata: %s", @@ -2152,7 +2168,7 @@ static void add_retriable_send_initial_metadata_op( retry_state->started_send_initial_metadata = true; batch_data->batch.send_initial_metadata = true; batch_data->batch.payload->send_initial_metadata.send_initial_metadata = - &batch_data->send_initial_metadata; + &retry_state->send_initial_metadata; batch_data->batch.payload->send_initial_metadata.send_initial_metadata_flags = calld->send_initial_metadata_flags; batch_data->batch.payload->send_initial_metadata.peer_string = @@ -2173,10 +2189,10 @@ static void add_retriable_send_message_op( grpc_core::ByteStreamCache* cache = (*calld->send_messages)[retry_state->started_send_message_count]; ++retry_state->started_send_message_count; - batch_data->send_message.Init(cache); + retry_state->send_message.Init(cache); batch_data->batch.send_message = true; batch_data->batch.payload->send_message.send_message.reset( - batch_data->send_message.get()); + retry_state->send_message.get()); } // Adds retriable send_trailing_metadata op to batch_data. @@ -2186,17 +2202,17 @@ static void add_retriable_send_trailing_metadata_op( // We need to make a copy of the metadata batch for each attempt, since // the filters in the subchannel stack may modify this batch, and we don't // want those modifications to be passed forward to subsequent attempts. - batch_data->send_trailing_metadata_storage = + retry_state->send_trailing_metadata_storage = static_cast(gpr_arena_alloc( calld->arena, sizeof(grpc_linked_mdelem) * calld->send_trailing_metadata.list.count)); grpc_metadata_batch_copy(&calld->send_trailing_metadata, - &batch_data->send_trailing_metadata, - batch_data->send_trailing_metadata_storage); + &retry_state->send_trailing_metadata, + retry_state->send_trailing_metadata_storage); retry_state->started_send_trailing_metadata = true; batch_data->batch.send_trailing_metadata = true; batch_data->batch.payload->send_trailing_metadata.send_trailing_metadata = - &batch_data->send_trailing_metadata; + &retry_state->send_trailing_metadata; } // Adds retriable recv_initial_metadata op to batch_data. @@ -2205,16 +2221,16 @@ static void add_retriable_recv_initial_metadata_op( subchannel_batch_data* batch_data) { retry_state->started_recv_initial_metadata = true; batch_data->batch.recv_initial_metadata = true; - grpc_metadata_batch_init(&batch_data->recv_initial_metadata); + grpc_metadata_batch_init(&retry_state->recv_initial_metadata); batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata = - &batch_data->recv_initial_metadata; + &retry_state->recv_initial_metadata; batch_data->batch.payload->recv_initial_metadata.trailing_metadata_available = - &batch_data->trailing_metadata_available; - GRPC_CLOSURE_INIT(&batch_data->recv_initial_metadata_ready, + &retry_state->trailing_metadata_available; + GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready, recv_initial_metadata_ready, batch_data, grpc_schedule_on_exec_ctx); batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata_ready = - &batch_data->recv_initial_metadata_ready; + &retry_state->recv_initial_metadata_ready; } // Adds retriable recv_message op to batch_data. @@ -2224,11 +2240,11 @@ static void add_retriable_recv_message_op( ++retry_state->started_recv_message_count; batch_data->batch.recv_message = true; batch_data->batch.payload->recv_message.recv_message = - &batch_data->recv_message; - GRPC_CLOSURE_INIT(&batch_data->recv_message_ready, recv_message_ready, + &retry_state->recv_message; + GRPC_CLOSURE_INIT(&retry_state->recv_message_ready, recv_message_ready, batch_data, grpc_schedule_on_exec_ctx); batch_data->batch.payload->recv_message.recv_message_ready = - &batch_data->recv_message_ready; + &retry_state->recv_message_ready; } // Adds retriable recv_trailing_metadata op to batch_data. @@ -2237,16 +2253,17 @@ static void add_retriable_recv_trailing_metadata_op( subchannel_batch_data* batch_data) { retry_state->started_recv_trailing_metadata = true; batch_data->batch.recv_trailing_metadata = true; - grpc_metadata_batch_init(&batch_data->recv_trailing_metadata); + grpc_metadata_batch_init(&retry_state->recv_trailing_metadata); batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata = - &batch_data->recv_trailing_metadata; + &retry_state->recv_trailing_metadata; batch_data->batch.payload->recv_trailing_metadata.collect_stats = - &batch_data->collect_stats; - GRPC_CLOSURE_INIT(&batch_data->recv_trailing_metadata_ready, + &retry_state->collect_stats; + GRPC_CLOSURE_INIT(&retry_state->recv_trailing_metadata_ready, recv_trailing_metadata_ready, batch_data, grpc_schedule_on_exec_ctx); batch_data->batch.payload->recv_trailing_metadata - .recv_trailing_metadata_ready = &batch_data->recv_trailing_metadata_ready; + .recv_trailing_metadata_ready = + &retry_state->recv_trailing_metadata_ready; } // Helper function used to start a recv_trailing_metadata batch. This @@ -2400,11 +2417,9 @@ static void add_subchannel_batches_for_pending_batches( // started subchannel batch, since we'll propagate the // completion when it completes. if (retry_state->completed_recv_trailing_metadata) { - subchannel_batch_data* batch_data = - retry_state->recv_trailing_metadata_internal_batch; // Batches containing recv_trailing_metadata always succeed. closures->Add( - &batch_data->recv_trailing_metadata_ready, GRPC_ERROR_NONE, + &retry_state->recv_trailing_metadata_ready, GRPC_ERROR_NONE, "re-executing recv_trailing_metadata_ready to propagate " "internally triggered result"); } else { -- cgit v1.2.3