diff options
author | 2018-06-11 11:34:17 -0700 | |
---|---|---|
committer | 2018-06-11 16:49:13 -0700 | |
commit | 5b3824baf20ddd2a0046930a0c8e539eb7b41df6 (patch) | |
tree | 6e7e95bfbdca990de4ad43322dcbdb1ae97352c8 /src/core/ext | |
parent | 9846826b51ce20c1ecec6084898adbbf033b64c0 (diff) |
Code review changes.
Diffstat (limited to 'src/core/ext')
-rw-r--r-- | src/core/ext/filters/client_channel/client_channel.cc | 95 |
1 files changed, 61 insertions, 34 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 98391a13ef..fe97228bbc 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -1723,6 +1723,29 @@ static void recv_message_ready(void* arg, grpc_error* error) { // recv_trailing_metadata handling // +// Sets *status and *server_pushback_md based on batch_data and error. +static void get_call_status(subchannel_batch_data* batch_data, + grpc_error* error, grpc_status_code* status, + grpc_mdelem** server_pushback_md) { + grpc_call_element* elem = batch_data->elem; + call_data* calld = static_cast<call_data*>(elem->call_data); + if (error != GRPC_ERROR_NONE) { + grpc_error_get_status(error, calld->deadline, status, nullptr, nullptr, + nullptr); + } else { + grpc_metadata_batch* md_batch = + batch_data->batch.payload->recv_trailing_metadata + .recv_trailing_metadata; + GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr); + *status = + grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md); + if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) { + *server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md; + } + } + GRPC_ERROR_UNREF(error); +} + // Adds recv_trailing_metadata_ready closure to closures. static void add_closure_for_recv_trailing_metadata_ready( grpc_call_element* elem, subchannel_batch_data* batch_data, @@ -1837,6 +1860,34 @@ static void add_closures_to_fail_unstarted_pending_batches( GRPC_ERROR_UNREF(error); } +// Runs necessary closures upon completion of a call attempt. +static void run_closures_for_completed_call(subchannel_batch_data* batch_data, + grpc_error* error) { + grpc_call_element* elem = batch_data->elem; + call_data* calld = static_cast<call_data*>(elem->call_data); + subchannel_call_retry_state* retry_state = + static_cast<subchannel_call_retry_state*>( + grpc_connected_subchannel_call_get_parent_data( + batch_data->subchannel_call)); + // Construct list of closures to execute. + grpc_core::CallCombinerClosureList closures; + // First, add closure for recv_trailing_metadata_ready. + add_closure_for_recv_trailing_metadata_ready( + elem, batch_data, GRPC_ERROR_REF(error), &closures); + // If there are deferred recv_initial_metadata_ready or recv_message_ready + // callbacks, add them to closures. + add_closures_for_deferred_recv_callbacks(batch_data, retry_state, &closures); + // Add closures to fail any pending batches that have not yet been started. + add_closures_to_fail_unstarted_pending_batches( + elem, retry_state, GRPC_ERROR_REF(error), &closures); + // Don't need batch_data anymore. + batch_data_unref(batch_data); + // Schedule all of the closures identified above. + // Note: This will release the call combiner. + closures.RunClosures(calld->call_combiner); + GRPC_ERROR_UNREF(error); +} + // Intercepts recv_trailing_metadata_ready callback for retries. // Commits the call and returns the trailing metadata up the stack. static void recv_trailing_metadata_ready(void* arg, grpc_error* error) { @@ -1857,20 +1908,8 @@ static void recv_trailing_metadata_ready(void* arg, grpc_error* error) { // Get the call's status and check for server pushback metadata. grpc_status_code status = GRPC_STATUS_OK; grpc_mdelem* server_pushback_md = nullptr; - if (error != GRPC_ERROR_NONE) { - grpc_error_get_status(error, calld->deadline, &status, nullptr, nullptr, - nullptr); - } else { - grpc_metadata_batch* md_batch = - batch_data->batch.payload->recv_trailing_metadata - .recv_trailing_metadata; - GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr); - status = - grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md); - if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) { - server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md; - } - } + get_call_status(batch_data, GRPC_ERROR_REF(error), &status, + &server_pushback_md); if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand, calld, grpc_status_code_to_string(status)); @@ -1892,36 +1931,24 @@ static void recv_trailing_metadata_ready(void* arg, grpc_error* error) { } // Not retrying, so commit the call. retry_commit(elem, retry_state); - // Construct list of closures to execute. - grpc_core::CallCombinerClosureList closures; - // First, add closure for recv_trailing_metadata_ready. - add_closure_for_recv_trailing_metadata_ready( - elem, batch_data, GRPC_ERROR_REF(error), &closures); - // If there are deferred recv_initial_metadata_ready or recv_message_ready - // callbacks, add them to closures. - add_closures_for_deferred_recv_callbacks(batch_data, retry_state, &closures); - // Add closures to fail any pending batches that have not yet been started. - add_closures_to_fail_unstarted_pending_batches( - elem, retry_state, GRPC_ERROR_REF(error), &closures); - // Don't need batch_data anymore. - batch_data_unref(batch_data); - // Schedule all of the closures identified above. - // Note: This will release the call combiner. - closures.RunClosures(calld->call_combiner); + // Run any necessary closures. + run_closures_for_completed_call(batch_data, GRPC_ERROR_REF(error)); } // // on_complete callback handling // -// For any pending batch completed in batch_data, adds the necessary -// completion closures to closures. +// Adds the on_complete closure for the pending batch completed in +// batch_data to closures. static void add_closure_for_completed_pending_batch( grpc_call_element* elem, subchannel_batch_data* batch_data, subchannel_call_retry_state* retry_state, grpc_error* error, grpc_core::CallCombinerClosureList* closures) { pending_batch* pending = pending_batch_find( elem, "completed", [batch_data](grpc_transport_stream_op_batch* batch) { + // Match the pending batch with the same set of send ops as the + // subchannel batch we've just completed. return batch->on_complete != nullptr && batch_data->batch.send_initial_metadata == batch->send_initial_metadata && @@ -2033,7 +2060,7 @@ static void on_complete(void* arg, grpc_error* error) { // Track number of pending subchannel send batches and determine if this // was the last one. --calld->num_pending_retriable_subchannel_send_batches; - const bool last_callback_complete = + const bool last_send_batch_complete = calld->num_pending_retriable_subchannel_send_batches == 0; // Don't need batch_data anymore. batch_data_unref(batch_data); @@ -2041,7 +2068,7 @@ static void on_complete(void* arg, grpc_error* error) { // Note: This yeilds the call combiner. closures.RunClosures(calld->call_combiner); // If this was the last subchannel send batch, unref the call stack. - if (last_callback_complete) { + if (last_send_batch_complete) { GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches"); } } |