diff options
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/ext/filters/client_channel/client_channel.cc | 54 |
1 files changed, 54 insertions, 0 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index bc6f733e15..2b7ad3facc 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -911,6 +911,15 @@ typedef struct client_channel_call_data { grpc_core::ManualConstructor<grpc_core::BackOff> retry_backoff; grpc_timer retry_timer; + // The number of pending retriable subchannel batches containing send ops. + // We hold a ref to the call stack while this is non-zero, since replay + // batches may not complete until after all callbacks have been returned + // to the surface, and we need to make sure that the call is not destroyed + // until all of these batches have completed. + // Note that we actually only need to track replay batches, but it's + // easier to track all batches with send ops. + int num_pending_retriable_subchannel_send_batches; + // Cached data for retrying send ops. // send_initial_metadata bool seen_send_initial_metadata; @@ -2075,7 +2084,22 @@ static void on_complete(void* arg, grpc_error* error) { batch_data_unref(batch_data); GRPC_ERROR_UNREF(retry_state->recv_message_error); } + // Track number of pending subchannel send batches and determine if + // this was the last one. + bool last_callback_complete = false; + if (batch_data->batch.send_initial_metadata || + batch_data->batch.send_message || + batch_data->batch.send_trailing_metadata) { + --calld->num_pending_retriable_subchannel_send_batches; + last_callback_complete = + calld->num_pending_retriable_subchannel_send_batches == 0; + } batch_data_unref(batch_data); + // If we just completed the last subchannel send batch, unref the + // call stack. + if (last_callback_complete) { + GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches"); + } return; } // Not retrying, so commit the call. @@ -2118,11 +2142,26 @@ static void on_complete(void* arg, grpc_error* error) { add_closures_for_replay_or_pending_send_ops(elem, batch_data, retry_state, closures, &num_closures); } + // Track number of pending subchannel send batches and determine if this + // was the last one. + bool last_callback_complete = false; + if (batch_data->batch.send_initial_metadata || + batch_data->batch.send_message || + batch_data->batch.send_trailing_metadata) { + --calld->num_pending_retriable_subchannel_send_batches; + last_callback_complete = + calld->num_pending_retriable_subchannel_send_batches == 0; + } // Don't need batch_data anymore. batch_data_unref(batch_data); // Schedule all of the closures identified above. + // Note: This yeilds the call combiner. execute_closures_in_call_combiner(elem, "on_complete", closures, num_closures); + // If we just completed the last subchannel send batch, unref the call stack. + if (last_callback_complete) { + GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches"); + } } // @@ -2507,6 +2546,15 @@ static void add_subchannel_batches_for_pending_batches( } add_closure_for_subchannel_batch(calld, &batch_data->batch, closures, num_closures); + // Track number of pending subchannel send batches. + // If this is the first one, take a ref to the call stack. + if (batch->send_initial_metadata || batch->send_message || + batch->send_trailing_metadata) { + if (calld->num_pending_retriable_subchannel_send_batches == 0) { + GRPC_CALL_STACK_REF(calld->owning_call, "subchannel_send_batches"); + } + ++calld->num_pending_retriable_subchannel_send_batches; + } } } @@ -2534,6 +2582,12 @@ static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) { if (replay_batch_data != nullptr) { add_closure_for_subchannel_batch(calld, &replay_batch_data->batch, closures, &num_closures); + // Track number of pending subchannel send batches. + // If this is the first one, take a ref to the call stack. + if (calld->num_pending_retriable_subchannel_send_batches == 0) { + GRPC_CALL_STACK_REF(calld->owning_call, "subchannel_send_batches"); + } + ++calld->num_pending_retriable_subchannel_send_batches; } // Now add pending batches. add_subchannel_batches_for_pending_batches(elem, retry_state, closures, |