aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext
diff options
context:
space:
mode:
authorGravatar Mark D. Roth <roth@google.com>2018-06-11 11:34:17 -0700
committerGravatar Mark D. Roth <roth@google.com>2018-06-11 16:49:13 -0700
commit5b3824baf20ddd2a0046930a0c8e539eb7b41df6 (patch)
tree6e7e95bfbdca990de4ad43322dcbdb1ae97352c8 /src/core/ext
parent9846826b51ce20c1ecec6084898adbbf033b64c0 (diff)
Code review changes.
Diffstat (limited to 'src/core/ext')
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc95
1 files changed, 61 insertions, 34 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index 98391a13ef..fe97228bbc 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -1723,6 +1723,29 @@ static void recv_message_ready(void* arg, grpc_error* error) {
// recv_trailing_metadata handling
//
+// Sets *status and *server_pushback_md based on batch_data and error.
+static void get_call_status(subchannel_batch_data* batch_data,
+ grpc_error* error, grpc_status_code* status,
+ grpc_mdelem** server_pushback_md) {
+ grpc_call_element* elem = batch_data->elem;
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ if (error != GRPC_ERROR_NONE) {
+ grpc_error_get_status(error, calld->deadline, status, nullptr, nullptr,
+ nullptr);
+ } else {
+ grpc_metadata_batch* md_batch =
+ batch_data->batch.payload->recv_trailing_metadata
+ .recv_trailing_metadata;
+ GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
+ *status =
+ grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md);
+ if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
+ *server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
+ }
+ }
+ GRPC_ERROR_UNREF(error);
+}
+
// Adds recv_trailing_metadata_ready closure to closures.
static void add_closure_for_recv_trailing_metadata_ready(
grpc_call_element* elem, subchannel_batch_data* batch_data,
@@ -1837,6 +1860,34 @@ static void add_closures_to_fail_unstarted_pending_batches(
GRPC_ERROR_UNREF(error);
}
+// Runs necessary closures upon completion of a call attempt.
+static void run_closures_for_completed_call(subchannel_batch_data* batch_data,
+ grpc_error* error) {
+ grpc_call_element* elem = batch_data->elem;
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ subchannel_call_retry_state* retry_state =
+ static_cast<subchannel_call_retry_state*>(
+ grpc_connected_subchannel_call_get_parent_data(
+ batch_data->subchannel_call));
+ // Construct list of closures to execute.
+ grpc_core::CallCombinerClosureList closures;
+ // First, add closure for recv_trailing_metadata_ready.
+ add_closure_for_recv_trailing_metadata_ready(
+ elem, batch_data, GRPC_ERROR_REF(error), &closures);
+ // If there are deferred recv_initial_metadata_ready or recv_message_ready
+ // callbacks, add them to closures.
+ add_closures_for_deferred_recv_callbacks(batch_data, retry_state, &closures);
+ // Add closures to fail any pending batches that have not yet been started.
+ add_closures_to_fail_unstarted_pending_batches(
+ elem, retry_state, GRPC_ERROR_REF(error), &closures);
+ // Don't need batch_data anymore.
+ batch_data_unref(batch_data);
+ // Schedule all of the closures identified above.
+ // Note: This will release the call combiner.
+ closures.RunClosures(calld->call_combiner);
+ GRPC_ERROR_UNREF(error);
+}
+
// Intercepts recv_trailing_metadata_ready callback for retries.
// Commits the call and returns the trailing metadata up the stack.
static void recv_trailing_metadata_ready(void* arg, grpc_error* error) {
@@ -1857,20 +1908,8 @@ static void recv_trailing_metadata_ready(void* arg, grpc_error* error) {
// Get the call's status and check for server pushback metadata.
grpc_status_code status = GRPC_STATUS_OK;
grpc_mdelem* server_pushback_md = nullptr;
- if (error != GRPC_ERROR_NONE) {
- grpc_error_get_status(error, calld->deadline, &status, nullptr, nullptr,
- nullptr);
- } else {
- grpc_metadata_batch* md_batch =
- batch_data->batch.payload->recv_trailing_metadata
- .recv_trailing_metadata;
- GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
- status =
- grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md);
- if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
- server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
- }
- }
+ get_call_status(batch_data, GRPC_ERROR_REF(error), &status,
+ &server_pushback_md);
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand,
calld, grpc_status_code_to_string(status));
@@ -1892,36 +1931,24 @@ static void recv_trailing_metadata_ready(void* arg, grpc_error* error) {
}
// Not retrying, so commit the call.
retry_commit(elem, retry_state);
- // Construct list of closures to execute.
- grpc_core::CallCombinerClosureList closures;
- // First, add closure for recv_trailing_metadata_ready.
- add_closure_for_recv_trailing_metadata_ready(
- elem, batch_data, GRPC_ERROR_REF(error), &closures);
- // If there are deferred recv_initial_metadata_ready or recv_message_ready
- // callbacks, add them to closures.
- add_closures_for_deferred_recv_callbacks(batch_data, retry_state, &closures);
- // Add closures to fail any pending batches that have not yet been started.
- add_closures_to_fail_unstarted_pending_batches(
- elem, retry_state, GRPC_ERROR_REF(error), &closures);
- // Don't need batch_data anymore.
- batch_data_unref(batch_data);
- // Schedule all of the closures identified above.
- // Note: This will release the call combiner.
- closures.RunClosures(calld->call_combiner);
+ // Run any necessary closures.
+ run_closures_for_completed_call(batch_data, GRPC_ERROR_REF(error));
}
//
// on_complete callback handling
//
-// For any pending batch completed in batch_data, adds the necessary
-// completion closures to closures.
+// Adds the on_complete closure for the pending batch completed in
+// batch_data to closures.
static void add_closure_for_completed_pending_batch(
grpc_call_element* elem, subchannel_batch_data* batch_data,
subchannel_call_retry_state* retry_state, grpc_error* error,
grpc_core::CallCombinerClosureList* closures) {
pending_batch* pending = pending_batch_find(
elem, "completed", [batch_data](grpc_transport_stream_op_batch* batch) {
+ // Match the pending batch with the same set of send ops as the
+ // subchannel batch we've just completed.
return batch->on_complete != nullptr &&
batch_data->batch.send_initial_metadata ==
batch->send_initial_metadata &&
@@ -2033,7 +2060,7 @@ static void on_complete(void* arg, grpc_error* error) {
// Track number of pending subchannel send batches and determine if this
// was the last one.
--calld->num_pending_retriable_subchannel_send_batches;
- const bool last_callback_complete =
+ const bool last_send_batch_complete =
calld->num_pending_retriable_subchannel_send_batches == 0;
// Don't need batch_data anymore.
batch_data_unref(batch_data);
@@ -2041,7 +2068,7 @@ static void on_complete(void* arg, grpc_error* error) {
// Note: This yeilds the call combiner.
closures.RunClosures(calld->call_combiner);
// If this was the last subchannel send batch, unref the call stack.
- if (last_callback_complete) {
+ if (last_send_batch_complete) {
GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches");
}
}