aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Mark D. Roth <roth@google.com>2018-05-24 14:24:21 -0700
committerGravatar GitHub <noreply@github.com>2018-05-24 14:24:21 -0700
commit6f173799ab0f9bf5b8e33172f47e1d00676585b3 (patch)
treee4c088fa06dcae927c05d77b301750849b8c2a12
parent374a5e8bce3668c232e8d7f3fde674970148e5bc (diff)
parent4f9e003c2d93582732103123ffd024236ef2a3d4 (diff)
Merge pull request #15536 from markdroth/retry_call_stack_refcount_fix
Hold ref to call stack while replay batches are pending.
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc54
1 files changed, 54 insertions, 0 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index bc6f733e15..2b7ad3facc 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -911,6 +911,15 @@ typedef struct client_channel_call_data {
grpc_core::ManualConstructor<grpc_core::BackOff> retry_backoff;
grpc_timer retry_timer;
+ // The number of pending retriable subchannel batches containing send ops.
+ // We hold a ref to the call stack while this is non-zero, since replay
+ // batches may not complete until after all callbacks have been returned
+ // to the surface, and we need to make sure that the call is not destroyed
+ // until all of these batches have completed.
+ // Note that we actually only need to track replay batches, but it's
+ // easier to track all batches with send ops.
+ int num_pending_retriable_subchannel_send_batches;
+
// Cached data for retrying send ops.
// send_initial_metadata
bool seen_send_initial_metadata;
@@ -2075,7 +2084,22 @@ static void on_complete(void* arg, grpc_error* error) {
batch_data_unref(batch_data);
GRPC_ERROR_UNREF(retry_state->recv_message_error);
}
+ // Track number of pending subchannel send batches and determine if
+ // this was the last one.
+ bool last_callback_complete = false;
+ if (batch_data->batch.send_initial_metadata ||
+ batch_data->batch.send_message ||
+ batch_data->batch.send_trailing_metadata) {
+ --calld->num_pending_retriable_subchannel_send_batches;
+ last_callback_complete =
+ calld->num_pending_retriable_subchannel_send_batches == 0;
+ }
batch_data_unref(batch_data);
+ // If we just completed the last subchannel send batch, unref the
+ // call stack.
+ if (last_callback_complete) {
+ GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches");
+ }
return;
}
// Not retrying, so commit the call.
@@ -2118,11 +2142,26 @@ static void on_complete(void* arg, grpc_error* error) {
add_closures_for_replay_or_pending_send_ops(elem, batch_data, retry_state,
closures, &num_closures);
}
+ // Track number of pending subchannel send batches and determine if this
+ // was the last one.
+ bool last_callback_complete = false;
+ if (batch_data->batch.send_initial_metadata ||
+ batch_data->batch.send_message ||
+ batch_data->batch.send_trailing_metadata) {
+ --calld->num_pending_retriable_subchannel_send_batches;
+ last_callback_complete =
+ calld->num_pending_retriable_subchannel_send_batches == 0;
+ }
// Don't need batch_data anymore.
batch_data_unref(batch_data);
// Schedule all of the closures identified above.
+ // Note: This yeilds the call combiner.
execute_closures_in_call_combiner(elem, "on_complete", closures,
num_closures);
+ // If we just completed the last subchannel send batch, unref the call stack.
+ if (last_callback_complete) {
+ GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches");
+ }
}
//
@@ -2507,6 +2546,15 @@ static void add_subchannel_batches_for_pending_batches(
}
add_closure_for_subchannel_batch(calld, &batch_data->batch, closures,
num_closures);
+ // Track number of pending subchannel send batches.
+ // If this is the first one, take a ref to the call stack.
+ if (batch->send_initial_metadata || batch->send_message ||
+ batch->send_trailing_metadata) {
+ if (calld->num_pending_retriable_subchannel_send_batches == 0) {
+ GRPC_CALL_STACK_REF(calld->owning_call, "subchannel_send_batches");
+ }
+ ++calld->num_pending_retriable_subchannel_send_batches;
+ }
}
}
@@ -2534,6 +2582,12 @@ static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) {
if (replay_batch_data != nullptr) {
add_closure_for_subchannel_batch(calld, &replay_batch_data->batch, closures,
&num_closures);
+ // Track number of pending subchannel send batches.
+ // If this is the first one, take a ref to the call stack.
+ if (calld->num_pending_retriable_subchannel_send_batches == 0) {
+ GRPC_CALL_STACK_REF(calld->owning_call, "subchannel_send_batches");
+ }
+ ++calld->num_pending_retriable_subchannel_send_batches;
}
// Now add pending batches.
add_subchannel_batches_for_pending_batches(elem, retry_state, closures,