diff options
author | 2018-06-13 14:49:25 -0700 | |
---|---|---|
committer | 2018-06-13 14:49:25 -0700 | |
commit | 0159026111b4dac15ddd228baee3757554898235 (patch) | |
tree | ee14b853ed3196219b1a2b13b18cf0b5c929f73d /src/core/ext/filters/client_channel/client_channel.cc | |
parent | 057fd3f57515e373ca4d220f48d1550c45eb7709 (diff) |
Revert "Second attempt: move recv_trailing_metadata into its own callback, don't use on_complete for recv_ops"
Diffstat (limited to 'src/core/ext/filters/client_channel/client_channel.cc')
-rw-r--r-- | src/core/ext/filters/client_channel/client_channel.cc | 891 |
1 files changed, 493 insertions, 398 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 34ea97e23e..ea6775a8d8 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -817,7 +817,6 @@ typedef struct { // For intercepting recv_trailing_metadata. grpc_metadata_batch recv_trailing_metadata; grpc_transport_stream_stats collect_stats; - grpc_closure recv_trailing_metadata_ready; // For intercepting on_complete. grpc_closure on_complete; } subchannel_batch_data; @@ -1193,24 +1192,35 @@ static void pending_batches_fail(grpc_call_element* elem, grpc_error* error, "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s", elem->channel_data, calld, num_batches, grpc_error_string(error)); } - grpc_core::CallCombinerClosureList closures; + grpc_transport_stream_op_batch* + batches[GPR_ARRAY_SIZE(calld->pending_batches)]; + size_t num_batches = 0; for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { pending_batch* pending = &calld->pending_batches[i]; grpc_transport_stream_op_batch* batch = pending->batch; if (batch != nullptr) { - batch->handler_private.extra_arg = calld; - GRPC_CLOSURE_INIT(&batch->handler_private.closure, - fail_pending_batch_in_call_combiner, batch, - grpc_schedule_on_exec_ctx); - closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error), - "pending_batches_fail"); + batches[num_batches++] = batch; pending_batch_clear(calld, pending); } } + for (size_t i = yield_call_combiner ? 1 : 0; i < num_batches; ++i) { + grpc_transport_stream_op_batch* batch = batches[i]; + batch->handler_private.extra_arg = calld; + GRPC_CLOSURE_INIT(&batch->handler_private.closure, + fail_pending_batch_in_call_combiner, batch, + grpc_schedule_on_exec_ctx); + GRPC_CALL_COMBINER_START(calld->call_combiner, + &batch->handler_private.closure, + GRPC_ERROR_REF(error), "pending_batches_fail"); + } if (yield_call_combiner) { - closures.RunClosures(calld->call_combiner); - } else { - closures.RunClosuresWithoutYielding(calld->call_combiner); + if (num_batches > 0) { + // Note: This will release the call combiner. + grpc_transport_stream_op_batch_finish_with_failure( + batches[0], GRPC_ERROR_REF(error), calld->call_combiner); + } else { + GRPC_CALL_COMBINER_STOP(calld->call_combiner, "pending_batches_fail"); + } } GRPC_ERROR_UNREF(error); } @@ -1245,22 +1255,30 @@ static void pending_batches_resume(grpc_call_element* elem) { " pending batches on subchannel_call=%p", chand, calld, num_batches, calld->subchannel_call); } - grpc_core::CallCombinerClosureList closures; + grpc_transport_stream_op_batch* + batches[GPR_ARRAY_SIZE(calld->pending_batches)]; + size_t num_batches = 0; for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { pending_batch* pending = &calld->pending_batches[i]; grpc_transport_stream_op_batch* batch = pending->batch; if (batch != nullptr) { - batch->handler_private.extra_arg = calld->subchannel_call; - GRPC_CLOSURE_INIT(&batch->handler_private.closure, - resume_pending_batch_in_call_combiner, batch, - grpc_schedule_on_exec_ctx); - closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE, - "pending_batches_resume"); + batches[num_batches++] = batch; pending_batch_clear(calld, pending); } } + for (size_t i = 1; i < num_batches; ++i) { + grpc_transport_stream_op_batch* batch = batches[i]; + batch->handler_private.extra_arg = calld->subchannel_call; + GRPC_CLOSURE_INIT(&batch->handler_private.closure, + resume_pending_batch_in_call_combiner, batch, + grpc_schedule_on_exec_ctx); + GRPC_CALL_COMBINER_START(calld->call_combiner, + &batch->handler_private.closure, GRPC_ERROR_NONE, + "pending_batches_resume"); + } + GPR_ASSERT(num_batches > 0); // Note: This will release the call combiner. - closures.RunClosures(calld->call_combiner); + grpc_subchannel_call_process_op(calld->subchannel_call, batches[0]); } static void maybe_clear_pending_batch(grpc_call_element* elem, @@ -1275,10 +1293,7 @@ static void maybe_clear_pending_batch(grpc_call_element* elem, batch->payload->recv_initial_metadata.recv_initial_metadata_ready == nullptr) && (!batch->recv_message || - batch->payload->recv_message.recv_message_ready == nullptr) && - (!batch->recv_trailing_metadata || - batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready == - nullptr)) { + batch->payload->recv_message.recv_message_ready == nullptr)) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand, calld); @@ -1287,27 +1302,75 @@ static void maybe_clear_pending_batch(grpc_call_element* elem, } } -// Returns a pointer to the first pending batch for which predicate(batch) -// returns true, or null if not found. -template <typename Predicate> -static pending_batch* pending_batch_find(grpc_call_element* elem, - const char* log_message, - Predicate predicate) { - channel_data* chand = static_cast<channel_data*>(elem->channel_data); - call_data* calld = static_cast<call_data*>(elem->call_data); - for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { - pending_batch* pending = &calld->pending_batches[i]; - grpc_transport_stream_op_batch* batch = pending->batch; - if (batch != nullptr && predicate(batch)) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: %s pending batch at index %" PRIuPTR, chand, - calld, log_message, i); - } - return pending; - } +// Returns true if all ops in the pending batch have been completed. +static bool pending_batch_is_completed( + pending_batch* pending, call_data* calld, + subchannel_call_retry_state* retry_state) { + if (pending->batch == nullptr || pending->batch->on_complete == nullptr) { + return false; + } + if (pending->batch->send_initial_metadata && + !retry_state->completed_send_initial_metadata) { + return false; + } + if (pending->batch->send_message && + retry_state->completed_send_message_count < + calld->send_messages->size()) { + return false; + } + if (pending->batch->send_trailing_metadata && + !retry_state->completed_send_trailing_metadata) { + return false; + } + if (pending->batch->recv_initial_metadata && + !retry_state->completed_recv_initial_metadata) { + return false; + } + if (pending->batch->recv_message && + retry_state->completed_recv_message_count < + retry_state->started_recv_message_count) { + return false; + } + if (pending->batch->recv_trailing_metadata && + !retry_state->completed_recv_trailing_metadata) { + return false; } - return nullptr; + return true; +} + +// Returns true if any op in the batch was not yet started. +static bool pending_batch_is_unstarted( + pending_batch* pending, call_data* calld, + subchannel_call_retry_state* retry_state) { + if (pending->batch == nullptr || pending->batch->on_complete == nullptr) { + return false; + } + if (pending->batch->send_initial_metadata && + !retry_state->started_send_initial_metadata) { + return true; + } + if (pending->batch->send_message && + retry_state->started_send_message_count < calld->send_messages->size()) { + return true; + } + if (pending->batch->send_trailing_metadata && + !retry_state->started_send_trailing_metadata) { + return true; + } + if (pending->batch->recv_initial_metadata && + !retry_state->started_recv_initial_metadata) { + return true; + } + if (pending->batch->recv_message && + retry_state->completed_recv_message_count == + retry_state->started_recv_message_count) { + return true; + } + if (pending->batch->recv_trailing_metadata && + !retry_state->started_recv_trailing_metadata) { + return true; + } + return false; } // @@ -1494,13 +1557,8 @@ static bool maybe_retry(grpc_call_element* elem, // subchannel_batch_data // -// Creates a subchannel_batch_data object on the call's arena with the -// specified refcount. If set_on_complete is true, the batch's -// on_complete callback will be set to point to on_complete(); -// otherwise, the batch's on_complete callback will be null. static subchannel_batch_data* batch_data_create(grpc_call_element* elem, - int refcount, - bool set_on_complete) { + int refcount) { call_data* calld = static_cast<call_data*>(elem->call_data); subchannel_call_retry_state* retry_state = static_cast<subchannel_call_retry_state*>( @@ -1513,11 +1571,9 @@ static subchannel_batch_data* batch_data_create(grpc_call_element* elem, GRPC_SUBCHANNEL_CALL_REF(calld->subchannel_call, "batch_data_create"); batch_data->batch.payload = &retry_state->batch_payload; gpr_ref_init(&batch_data->refs, refcount); - if (set_on_complete) { - GRPC_CLOSURE_INIT(&batch_data->on_complete, on_complete, batch_data, - grpc_schedule_on_exec_ctx); - batch_data->batch.on_complete = &batch_data->on_complete; - } + GRPC_CLOSURE_INIT(&batch_data->on_complete, on_complete, batch_data, + grpc_schedule_on_exec_ctx); + batch_data->batch.on_complete = &batch_data->on_complete; GRPC_CALL_STACK_REF(calld->owning_call, "batch_data"); return batch_data; } @@ -1550,14 +1606,26 @@ static void batch_data_unref(subchannel_batch_data* batch_data) { static void invoke_recv_initial_metadata_callback(void* arg, grpc_error* error) { subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg); + channel_data* chand = + static_cast<channel_data*>(batch_data->elem->channel_data); + call_data* calld = static_cast<call_data*>(batch_data->elem->call_data); // Find pending batch. - pending_batch* pending = pending_batch_find( - batch_data->elem, "invoking recv_initial_metadata_ready for", - [](grpc_transport_stream_op_batch* batch) { - return batch->recv_initial_metadata && - batch->payload->recv_initial_metadata - .recv_initial_metadata_ready != nullptr; - }); + pending_batch* pending = nullptr; + for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { + grpc_transport_stream_op_batch* batch = calld->pending_batches[i].batch; + if (batch != nullptr && batch->recv_initial_metadata && + batch->payload->recv_initial_metadata.recv_initial_metadata_ready != + nullptr) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: invoking recv_initial_metadata_ready for " + "pending batch at index %" PRIuPTR, + chand, calld, i); + } + pending = &calld->pending_batches[i]; + break; + } + } GPR_ASSERT(pending != nullptr); // Return metadata. grpc_metadata_batch_move( @@ -1593,19 +1661,10 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) { static_cast<subchannel_call_retry_state*>( grpc_connected_subchannel_call_get_parent_data( batch_data->subchannel_call)); - retry_state->completed_recv_initial_metadata = true; - // If a retry was already dispatched, then we're not going to use the - // result of this recv_initial_metadata op, so do nothing. - if (retry_state->retry_dispatched) { - GRPC_CALL_COMBINER_STOP( - calld->call_combiner, - "recv_initial_metadata_ready after retry dispatched"); - return; - } // If we got an error or a Trailers-Only response and have not yet gotten - // the recv_trailing_metadata_ready callback, then defer propagating this - // callback back to the surface. We can evaluate whether to retry when - // recv_trailing_metadata comes back. + // the recv_trailing_metadata on_complete callback, then defer + // propagating this callback back to the surface. We can evaluate whether + // to retry when recv_trailing_metadata comes back. if (GPR_UNLIKELY((batch_data->trailing_metadata_available || error != GRPC_ERROR_NONE) && !retry_state->completed_recv_trailing_metadata)) { @@ -1630,9 +1689,9 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) { } // Received valid initial metadata, so commit the call. retry_commit(elem, retry_state); - // Invoke the callback to return the result to the surface. // Manually invoking a callback function; it does not take ownership of error. invoke_recv_initial_metadata_callback(batch_data, error); + GRPC_ERROR_UNREF(error); } // @@ -1642,13 +1701,25 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) { // Invokes recv_message_ready for a subchannel batch. static void invoke_recv_message_callback(void* arg, grpc_error* error) { subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg); + channel_data* chand = + static_cast<channel_data*>(batch_data->elem->channel_data); + call_data* calld = static_cast<call_data*>(batch_data->elem->call_data); // Find pending op. - pending_batch* pending = pending_batch_find( - batch_data->elem, "invoking recv_message_ready for", - [](grpc_transport_stream_op_batch* batch) { - return batch->recv_message && - batch->payload->recv_message.recv_message_ready != nullptr; - }); + pending_batch* pending = nullptr; + for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { + grpc_transport_stream_op_batch* batch = calld->pending_batches[i].batch; + if (batch != nullptr && batch->recv_message && + batch->payload->recv_message.recv_message_ready != nullptr) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: invoking recv_message_ready for " + "pending batch at index %" PRIuPTR, + chand, calld, i); + } + pending = &calld->pending_batches[i]; + break; + } + } GPR_ASSERT(pending != nullptr); // Return payload. *pending->batch->payload->recv_message.recv_message = @@ -1680,18 +1751,10 @@ static void recv_message_ready(void* arg, grpc_error* error) { static_cast<subchannel_call_retry_state*>( grpc_connected_subchannel_call_get_parent_data( batch_data->subchannel_call)); - ++retry_state->completed_recv_message_count; - // If a retry was already dispatched, then we're not going to use the - // result of this recv_message op, so do nothing. - if (retry_state->retry_dispatched) { - GRPC_CALL_COMBINER_STOP(calld->call_combiner, - "recv_message_ready after retry dispatched"); - return; - } // If we got an error or the payload was nullptr and we have not yet gotten - // the recv_trailing_metadata_ready callback, then defer propagating this - // callback back to the surface. We can evaluate whether to retry when - // recv_trailing_metadata comes back. + // the recv_trailing_metadata on_complete callback, then defer + // propagating this callback back to the surface. We can evaluate whether + // to retry when recv_trailing_metadata comes back. if (GPR_UNLIKELY( (batch_data->recv_message == nullptr || error != GRPC_ERROR_NONE) && !retry_state->completed_recv_trailing_metadata)) { @@ -1714,268 +1777,133 @@ static void recv_message_ready(void* arg, grpc_error* error) { } // Received a valid message, so commit the call. retry_commit(elem, retry_state); - // Invoke the callback to return the result to the surface. // Manually invoking a callback function; it does not take ownership of error. invoke_recv_message_callback(batch_data, error); + GRPC_ERROR_UNREF(error); } // -// recv_trailing_metadata handling +// list of closures to execute in call combiner // -// Sets *status and *server_pushback_md based on batch_data and error. -static void get_call_status(subchannel_batch_data* batch_data, - grpc_error* error, grpc_status_code* status, - grpc_mdelem** server_pushback_md) { - grpc_call_element* elem = batch_data->elem; +// Represents a closure that needs to run in the call combiner as part of +// starting or completing a batch. +typedef struct { + grpc_closure* closure; + grpc_error* error; + const char* reason; + bool free_reason = false; +} closure_to_execute; + +static void execute_closures_in_call_combiner(grpc_call_element* elem, + const char* caller, + closure_to_execute* closures, + size_t num_closures) { + channel_data* chand = static_cast<channel_data*>(elem->channel_data); call_data* calld = static_cast<call_data*>(elem->call_data); - if (error != GRPC_ERROR_NONE) { - grpc_error_get_status(error, calld->deadline, status, nullptr, nullptr, - nullptr); + // Note that the call combiner will be yielded for each closure that + // we schedule. We're already running in the call combiner, so one of + // the closures can be scheduled directly, but the others will + // have to re-enter the call combiner. + if (num_closures > 0) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, "chand=%p calld=%p: %s starting closure: %s", chand, + calld, caller, closures[0].reason); + } + GRPC_CLOSURE_SCHED(closures[0].closure, closures[0].error); + if (closures[0].free_reason) { + gpr_free(const_cast<char*>(closures[0].reason)); + } + for (size_t i = 1; i < num_closures; ++i) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: %s starting closure in call combiner: %s", + chand, calld, caller, closures[i].reason); + } + GRPC_CALL_COMBINER_START(calld->call_combiner, closures[i].closure, + closures[i].error, closures[i].reason); + if (closures[i].free_reason) { + gpr_free(const_cast<char*>(closures[i].reason)); + } + } } else { - grpc_metadata_batch* md_batch = - batch_data->batch.payload->recv_trailing_metadata - .recv_trailing_metadata; - GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr); - *status = - grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md); - if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) { - *server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md; + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, "chand=%p calld=%p: no closures to run for %s", chand, + calld, caller); } + GRPC_CALL_COMBINER_STOP(calld->call_combiner, "no closures to run"); } - GRPC_ERROR_UNREF(error); } -// Adds recv_trailing_metadata_ready closure to closures. -static void add_closure_for_recv_trailing_metadata_ready( - grpc_call_element* elem, subchannel_batch_data* batch_data, - grpc_error* error, grpc_core::CallCombinerClosureList* closures) { - // Find pending batch. - pending_batch* pending = pending_batch_find( - elem, "invoking recv_trailing_metadata for", - [](grpc_transport_stream_op_batch* batch) { - return batch->recv_trailing_metadata && - batch->payload->recv_trailing_metadata - .recv_trailing_metadata_ready != nullptr; - }); - // If we generated the recv_trailing_metadata op internally via - // start_internal_recv_trailing_metadata(), then there will be no - // pending batch. - if (pending == nullptr) { - GRPC_ERROR_UNREF(error); - return; +// +// on_complete callback handling +// + +// Updates retry_state to reflect the ops completed in batch_data. +static void update_retry_state_for_completed_batch( + subchannel_batch_data* batch_data, + subchannel_call_retry_state* retry_state) { + if (batch_data->batch.send_initial_metadata) { + retry_state->completed_send_initial_metadata = true; + } + if (batch_data->batch.send_message) { + ++retry_state->completed_send_message_count; + } + if (batch_data->batch.send_trailing_metadata) { + retry_state->completed_send_trailing_metadata = true; + } + if (batch_data->batch.recv_initial_metadata) { + retry_state->completed_recv_initial_metadata = true; + } + if (batch_data->batch.recv_message) { + ++retry_state->completed_recv_message_count; + } + if (batch_data->batch.recv_trailing_metadata) { + retry_state->completed_recv_trailing_metadata = true; } - // Return metadata. - grpc_metadata_batch_move( - &batch_data->recv_trailing_metadata, - pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata); - // Add closure. - closures->Add(pending->batch->payload->recv_trailing_metadata - .recv_trailing_metadata_ready, - error, "recv_trailing_metadata_ready for pending batch"); - // Update bookkeeping. - pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = - nullptr; - maybe_clear_pending_batch(elem, pending); } // Adds any necessary closures for deferred recv_initial_metadata and -// recv_message callbacks to closures. +// recv_message callbacks to closures, updating *num_closures as needed. static void add_closures_for_deferred_recv_callbacks( subchannel_batch_data* batch_data, subchannel_call_retry_state* retry_state, - grpc_core::CallCombinerClosureList* closures) { + closure_to_execute* closures, size_t* num_closures) { if (batch_data->batch.recv_trailing_metadata) { // Add closure for deferred recv_initial_metadata_ready. if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch != nullptr)) { - GRPC_CLOSURE_INIT(&batch_data->recv_initial_metadata_ready, - invoke_recv_initial_metadata_callback, - retry_state->recv_initial_metadata_ready_deferred_batch, - grpc_schedule_on_exec_ctx); - closures->Add(&batch_data->recv_initial_metadata_ready, - retry_state->recv_initial_metadata_error, - "resuming recv_initial_metadata_ready"); + closure_to_execute* closure = &closures[(*num_closures)++]; + closure->closure = GRPC_CLOSURE_INIT( + &batch_data->recv_initial_metadata_ready, + invoke_recv_initial_metadata_callback, + retry_state->recv_initial_metadata_ready_deferred_batch, + grpc_schedule_on_exec_ctx); + closure->error = retry_state->recv_initial_metadata_error; + closure->reason = "resuming recv_initial_metadata_ready"; retry_state->recv_initial_metadata_ready_deferred_batch = nullptr; } // Add closure for deferred recv_message_ready. if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch != nullptr)) { - GRPC_CLOSURE_INIT(&batch_data->recv_message_ready, - invoke_recv_message_callback, - retry_state->recv_message_ready_deferred_batch, - grpc_schedule_on_exec_ctx); - closures->Add(&batch_data->recv_message_ready, - retry_state->recv_message_error, - "resuming recv_message_ready"); + closure_to_execute* closure = &closures[(*num_closures)++]; + closure->closure = GRPC_CLOSURE_INIT( + &batch_data->recv_message_ready, invoke_recv_message_callback, + retry_state->recv_message_ready_deferred_batch, + grpc_schedule_on_exec_ctx); + closure->error = retry_state->recv_message_error; + closure->reason = "resuming recv_message_ready"; retry_state->recv_message_ready_deferred_batch = nullptr; } } } -// Returns true if any op in the batch was not yet started. -// Only looks at send ops, since recv ops are always started immediately. -static bool pending_batch_is_unstarted( - pending_batch* pending, call_data* calld, - subchannel_call_retry_state* retry_state) { - if (pending->batch == nullptr || pending->batch->on_complete == nullptr) { - return false; - } - if (pending->batch->send_initial_metadata && - !retry_state->started_send_initial_metadata) { - return true; - } - if (pending->batch->send_message && - retry_state->started_send_message_count < calld->send_messages->size()) { - return true; - } - if (pending->batch->send_trailing_metadata && - !retry_state->started_send_trailing_metadata) { - return true; - } - return false; -} - -// For any pending batch containing an op that has not yet been started, -// adds the pending batch's completion closures to closures. -static void add_closures_to_fail_unstarted_pending_batches( - grpc_call_element* elem, subchannel_call_retry_state* retry_state, - grpc_error* error, grpc_core::CallCombinerClosureList* closures) { - channel_data* chand = static_cast<channel_data*>(elem->channel_data); - call_data* calld = static_cast<call_data*>(elem->call_data); - for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { - pending_batch* pending = &calld->pending_batches[i]; - if (pending_batch_is_unstarted(pending, calld, retry_state)) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: failing unstarted pending batch at index " - "%" PRIuPTR, - chand, calld, i); - } - closures->Add(pending->batch->on_complete, GRPC_ERROR_REF(error), - "failing on_complete for pending batch"); - pending->batch->on_complete = nullptr; - maybe_clear_pending_batch(elem, pending); - } - } - GRPC_ERROR_UNREF(error); -} - -// Runs necessary closures upon completion of a call attempt. -static void run_closures_for_completed_call(subchannel_batch_data* batch_data, - grpc_error* error) { - grpc_call_element* elem = batch_data->elem; - call_data* calld = static_cast<call_data*>(elem->call_data); - subchannel_call_retry_state* retry_state = - static_cast<subchannel_call_retry_state*>( - grpc_connected_subchannel_call_get_parent_data( - batch_data->subchannel_call)); - // Construct list of closures to execute. - grpc_core::CallCombinerClosureList closures; - // First, add closure for recv_trailing_metadata_ready. - add_closure_for_recv_trailing_metadata_ready( - elem, batch_data, GRPC_ERROR_REF(error), &closures); - // If there are deferred recv_initial_metadata_ready or recv_message_ready - // callbacks, add them to closures. - add_closures_for_deferred_recv_callbacks(batch_data, retry_state, &closures); - // Add closures to fail any pending batches that have not yet been started. - add_closures_to_fail_unstarted_pending_batches( - elem, retry_state, GRPC_ERROR_REF(error), &closures); - // Don't need batch_data anymore. - batch_data_unref(batch_data); - // Schedule all of the closures identified above. - // Note: This will release the call combiner. - closures.RunClosures(calld->call_combiner); - GRPC_ERROR_UNREF(error); -} - -// Intercepts recv_trailing_metadata_ready callback for retries. -// Commits the call and returns the trailing metadata up the stack. -static void recv_trailing_metadata_ready(void* arg, grpc_error* error) { - subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg); - grpc_call_element* elem = batch_data->elem; - channel_data* chand = static_cast<channel_data*>(elem->channel_data); - call_data* calld = static_cast<call_data*>(elem->call_data); - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: got recv_trailing_metadata_ready, error=%s", - chand, calld, grpc_error_string(error)); - } - subchannel_call_retry_state* retry_state = - static_cast<subchannel_call_retry_state*>( - grpc_connected_subchannel_call_get_parent_data( - batch_data->subchannel_call)); - retry_state->completed_recv_trailing_metadata = true; - // Get the call's status and check for server pushback metadata. - grpc_status_code status = GRPC_STATUS_OK; - grpc_mdelem* server_pushback_md = nullptr; - get_call_status(batch_data, GRPC_ERROR_REF(error), &status, - &server_pushback_md); - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand, - calld, grpc_status_code_to_string(status)); - } - // Check if we should retry. - if (maybe_retry(elem, batch_data, status, server_pushback_md)) { - // Unref batch_data for deferred recv_initial_metadata_ready or - // recv_message_ready callbacks, if any. - if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) { - batch_data_unref(batch_data); - GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error); - } - if (retry_state->recv_message_ready_deferred_batch != nullptr) { - batch_data_unref(batch_data); - GRPC_ERROR_UNREF(retry_state->recv_message_error); - } - batch_data_unref(batch_data); - return; - } - // Not retrying, so commit the call. - retry_commit(elem, retry_state); - // Run any necessary closures. - run_closures_for_completed_call(batch_data, GRPC_ERROR_REF(error)); -} - -// -// on_complete callback handling -// - -// Adds the on_complete closure for the pending batch completed in -// batch_data to closures. -static void add_closure_for_completed_pending_batch( - grpc_call_element* elem, subchannel_batch_data* batch_data, - subchannel_call_retry_state* retry_state, grpc_error* error, - grpc_core::CallCombinerClosureList* closures) { - pending_batch* pending = pending_batch_find( - elem, "completed", [batch_data](grpc_transport_stream_op_batch* batch) { - // Match the pending batch with the same set of send ops as the - // subchannel batch we've just completed. - return batch->on_complete != nullptr && - batch_data->batch.send_initial_metadata == - batch->send_initial_metadata && - batch_data->batch.send_message == batch->send_message && - batch_data->batch.send_trailing_metadata == - batch->send_trailing_metadata; - }); - // If batch_data is a replay batch, then there will be no pending - // batch to complete. - if (pending == nullptr) { - GRPC_ERROR_UNREF(error); - return; - } - // Add closure. - closures->Add(pending->batch->on_complete, error, - "on_complete for pending batch"); - pending->batch->on_complete = nullptr; - maybe_clear_pending_batch(elem, pending); -} - // If there are any cached ops to replay or pending ops to start on the // subchannel call, adds a closure to closures to invoke -// start_retriable_subchannel_batches(). +// start_retriable_subchannel_batches(), updating *num_closures as needed. static void add_closures_for_replay_or_pending_send_ops( grpc_call_element* elem, subchannel_batch_data* batch_data, - subchannel_call_retry_state* retry_state, - grpc_core::CallCombinerClosureList* closures) { + subchannel_call_retry_state* retry_state, closure_to_execute* closures, + size_t* num_closures) { channel_data* chand = static_cast<channel_data*>(elem->channel_data); call_data* calld = static_cast<call_data*>(elem->call_data); bool have_pending_send_message_ops = @@ -2001,12 +1929,93 @@ static void add_closures_for_replay_or_pending_send_ops( "chand=%p calld=%p: starting next batch for pending send op(s)", chand, calld); } - GRPC_CLOSURE_INIT(&batch_data->batch.handler_private.closure, - start_retriable_subchannel_batches, elem, - grpc_schedule_on_exec_ctx); - closures->Add(&batch_data->batch.handler_private.closure, GRPC_ERROR_NONE, - "starting next batch for send_* op(s)"); + closure_to_execute* closure = &closures[(*num_closures)++]; + closure->closure = GRPC_CLOSURE_INIT( + &batch_data->batch.handler_private.closure, + start_retriable_subchannel_batches, elem, grpc_schedule_on_exec_ctx); + closure->error = GRPC_ERROR_NONE; + closure->reason = "starting next batch for send_* op(s)"; + } +} + +// For any pending batch completed in batch_data, adds the necessary +// completion closures to closures, updating *num_closures as needed. +static void add_closures_for_completed_pending_batches( + grpc_call_element* elem, subchannel_batch_data* batch_data, + subchannel_call_retry_state* retry_state, grpc_error* error, + closure_to_execute* closures, size_t* num_closures) { + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { + pending_batch* pending = &calld->pending_batches[i]; + if (pending_batch_is_completed(pending, calld, retry_state)) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: pending batch completed at index %" PRIuPTR, + chand, calld, i); + } + // Copy the trailing metadata to return it to the surface. + if (batch_data->batch.recv_trailing_metadata) { + grpc_metadata_batch_move(&batch_data->recv_trailing_metadata, + pending->batch->payload->recv_trailing_metadata + .recv_trailing_metadata); + } + closure_to_execute* closure = &closures[(*num_closures)++]; + closure->closure = pending->batch->on_complete; + closure->error = GRPC_ERROR_REF(error); + closure->reason = "on_complete for pending batch"; + pending->batch->on_complete = nullptr; + maybe_clear_pending_batch(elem, pending); + } } + GRPC_ERROR_UNREF(error); +} + +// For any pending batch containing an op that has not yet been started, +// adds the pending batch's completion closures to closures, updating +// *num_closures as needed. +static void add_closures_to_fail_unstarted_pending_batches( + grpc_call_element* elem, subchannel_call_retry_state* retry_state, + grpc_error* error, closure_to_execute* closures, size_t* num_closures) { + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { + pending_batch* pending = &calld->pending_batches[i]; + if (pending_batch_is_unstarted(pending, calld, retry_state)) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: failing unstarted pending batch at index " + "%" PRIuPTR, + chand, calld, i); + } + if (pending->batch->recv_initial_metadata) { + closure_to_execute* closure = &closures[(*num_closures)++]; + closure->closure = pending->batch->payload->recv_initial_metadata + .recv_initial_metadata_ready; + closure->error = GRPC_ERROR_REF(error); + closure->reason = + "failing recv_initial_metadata_ready for pending batch"; + pending->batch->payload->recv_initial_metadata + .recv_initial_metadata_ready = nullptr; + } + if (pending->batch->recv_message) { + *pending->batch->payload->recv_message.recv_message = nullptr; + closure_to_execute* closure = &closures[(*num_closures)++]; + closure->closure = + pending->batch->payload->recv_message.recv_message_ready; + closure->error = GRPC_ERROR_REF(error); + closure->reason = "failing recv_message_ready for pending batch"; + pending->batch->payload->recv_message.recv_message_ready = nullptr; + } + closure_to_execute* closure = &closures[(*num_closures)++]; + closure->closure = pending->batch->on_complete; + closure->error = GRPC_ERROR_REF(error); + closure->reason = "failing on_complete for pending batch"; + pending->batch->on_complete = nullptr; + maybe_clear_pending_batch(elem, pending); + } + } + GRPC_ERROR_UNREF(error); } // Callback used to intercept on_complete from subchannel calls. @@ -2026,49 +2035,136 @@ static void on_complete(void* arg, grpc_error* error) { static_cast<subchannel_call_retry_state*>( grpc_connected_subchannel_call_get_parent_data( batch_data->subchannel_call)); + // If we have previously completed recv_trailing_metadata, then the + // call is finished. + bool call_finished = retry_state->completed_recv_trailing_metadata; + // Record whether we were already committed before receiving this callback. + const bool previously_committed = calld->retry_committed; // Update bookkeeping in retry_state. - if (batch_data->batch.send_initial_metadata) { - retry_state->completed_send_initial_metadata = true; - } - if (batch_data->batch.send_message) { - ++retry_state->completed_send_message_count; - } - if (batch_data->batch.send_trailing_metadata) { - retry_state->completed_send_trailing_metadata = true; + update_retry_state_for_completed_batch(batch_data, retry_state); + if (call_finished) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, "chand=%p calld=%p: call already finished", chand, + calld); + } + } else { + // Check if this batch finished the call, and if so, get its status. + // The call is finished if either (a) this callback was invoked with + // an error or (b) we receive status. + grpc_status_code status = GRPC_STATUS_OK; + grpc_mdelem* server_pushback_md = nullptr; + if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { // Case (a). + call_finished = true; + grpc_error_get_status(error, calld->deadline, &status, nullptr, nullptr, + nullptr); + } else if (batch_data->batch.recv_trailing_metadata) { // Case (b). + call_finished = true; + grpc_metadata_batch* md_batch = + batch_data->batch.payload->recv_trailing_metadata + .recv_trailing_metadata; + GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr); + status = grpc_get_status_code_from_metadata( + md_batch->idx.named.grpc_status->md); + if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) { + server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md; + } + } + // If the call just finished, check if we should retry. + if (call_finished) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand, + calld, grpc_status_code_to_string(status)); + } + if (maybe_retry(elem, batch_data, status, server_pushback_md)) { + // Unref batch_data for deferred recv_initial_metadata_ready or + // recv_message_ready callbacks, if any. + if (batch_data->batch.recv_trailing_metadata && + retry_state->recv_initial_metadata_ready_deferred_batch != + nullptr) { + batch_data_unref(batch_data); + GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error); + } + if (batch_data->batch.recv_trailing_metadata && + retry_state->recv_message_ready_deferred_batch != nullptr) { + batch_data_unref(batch_data); + GRPC_ERROR_UNREF(retry_state->recv_message_error); + } + // Track number of pending subchannel send batches and determine if + // this was the last one. + bool last_callback_complete = false; + if (batch_data->batch.send_initial_metadata || + batch_data->batch.send_message || + batch_data->batch.send_trailing_metadata) { + --calld->num_pending_retriable_subchannel_send_batches; + last_callback_complete = + calld->num_pending_retriable_subchannel_send_batches == 0; + } + batch_data_unref(batch_data); + // If we just completed the last subchannel send batch, unref the + // call stack. + if (last_callback_complete) { + GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches"); + } + return; + } + // Not retrying, so commit the call. + retry_commit(elem, retry_state); + } } - // If the call is committed, free cached data for send ops that we've just - // completed. - if (calld->retry_committed) { + // If we were already committed before receiving this callback, free + // cached data for send ops that we've just completed. (If the call has + // just now finished, the call to retry_commit() above will have freed all + // cached send ops, so we don't need to do it here.) + if (previously_committed) { free_cached_send_op_data_for_completed_batch(elem, batch_data, retry_state); } + // Call not being retried. // Construct list of closures to execute. - grpc_core::CallCombinerClosureList closures; - // If a retry was already dispatched, that means we saw - // recv_trailing_metadata before this, so we do nothing here. - // Otherwise, invoke the callback to return the result to the surface. - if (!retry_state->retry_dispatched) { - // Add closure for the completed pending batch, if any. - add_closure_for_completed_pending_batch(elem, batch_data, retry_state, - GRPC_ERROR_REF(error), &closures); - // If needed, add a callback to start any replay or pending send ops on - // the subchannel call. - if (!retry_state->completed_recv_trailing_metadata) { - add_closures_for_replay_or_pending_send_ops(elem, batch_data, retry_state, - &closures); - } + // Max number of closures is number of pending batches plus one for + // each of: + // - recv_initial_metadata_ready (either deferred or unstarted) + // - recv_message_ready (either deferred or unstarted) + // - starting a new batch for pending send ops + closure_to_execute closures[GPR_ARRAY_SIZE(calld->pending_batches) + 3]; + size_t num_closures = 0; + // If there are deferred recv_initial_metadata_ready or recv_message_ready + // callbacks, add them to closures. + add_closures_for_deferred_recv_callbacks(batch_data, retry_state, closures, + &num_closures); + // Find pending batches whose ops are now complete and add their + // on_complete callbacks to closures. + add_closures_for_completed_pending_batches(elem, batch_data, retry_state, + GRPC_ERROR_REF(error), closures, + &num_closures); + // Add closures to handle any pending batches that have not yet been started. + // If the call is finished, we fail these batches; otherwise, we add a + // callback to start_retriable_subchannel_batches() to start them on + // the subchannel call. + if (call_finished) { + add_closures_to_fail_unstarted_pending_batches( + elem, retry_state, GRPC_ERROR_REF(error), closures, &num_closures); + } else { + add_closures_for_replay_or_pending_send_ops(elem, batch_data, retry_state, + closures, &num_closures); } // Track number of pending subchannel send batches and determine if this // was the last one. - --calld->num_pending_retriable_subchannel_send_batches; - const bool last_send_batch_complete = - calld->num_pending_retriable_subchannel_send_batches == 0; + bool last_callback_complete = false; + if (batch_data->batch.send_initial_metadata || + batch_data->batch.send_message || + batch_data->batch.send_trailing_metadata) { + --calld->num_pending_retriable_subchannel_send_batches; + last_callback_complete = + calld->num_pending_retriable_subchannel_send_batches == 0; + } // Don't need batch_data anymore. batch_data_unref(batch_data); // Schedule all of the closures identified above. // Note: This yeilds the call combiner. - closures.RunClosures(calld->call_combiner); - // If this was the last subchannel send batch, unref the call stack. - if (last_send_batch_complete) { + execute_closures_in_call_combiner(elem, "on_complete", closures, + num_closures); + // If we just completed the last subchannel send batch, unref the call stack. + if (last_callback_complete) { GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches"); } } @@ -2089,22 +2185,27 @@ static void start_batch_in_call_combiner(void* arg, grpc_error* ignored) { // Adds a closure to closures that will execute batch in the call combiner. static void add_closure_for_subchannel_batch( - grpc_call_element* elem, grpc_transport_stream_op_batch* batch, - grpc_core::CallCombinerClosureList* closures) { - channel_data* chand = static_cast<channel_data*>(elem->channel_data); - call_data* calld = static_cast<call_data*>(elem->call_data); + call_data* calld, grpc_transport_stream_op_batch* batch, + closure_to_execute* closures, size_t* num_closures) { batch->handler_private.extra_arg = calld->subchannel_call; GRPC_CLOSURE_INIT(&batch->handler_private.closure, start_batch_in_call_combiner, batch, grpc_schedule_on_exec_ctx); + closure_to_execute* closure = &closures[(*num_closures)++]; + closure->closure = &batch->handler_private.closure; + closure->error = GRPC_ERROR_NONE; + // If the tracer is enabled, we log a more detailed message, which + // requires dynamic allocation. This will be freed in + // start_retriable_subchannel_batches(). if (grpc_client_channel_trace.enabled()) { char* batch_str = grpc_transport_stream_op_batch_string(batch); - gpr_log(GPR_INFO, "chand=%p calld=%p: starting subchannel batch: %s", chand, - calld, batch_str); + gpr_asprintf(const_cast<char**>(&closure->reason), + "starting batch in call combiner: %s", batch_str); gpr_free(batch_str); + closure->free_reason = true; + } else { + closure->reason = "start_subchannel_batch"; } - closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE, - "start_subchannel_batch"); } // Adds retriable send_initial_metadata op to batch_data. @@ -2240,13 +2341,9 @@ static void add_retriable_recv_trailing_metadata_op( grpc_metadata_batch_init(&batch_data->recv_trailing_metadata); batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata = &batch_data->recv_trailing_metadata; - batch_data->batch.payload->recv_trailing_metadata.collect_stats = + batch_data->batch.collect_stats = true; + batch_data->batch.payload->collect_stats.collect_stats = &batch_data->collect_stats; - GRPC_CLOSURE_INIT(&batch_data->recv_trailing_metadata_ready, - recv_trailing_metadata_ready, batch_data, - grpc_schedule_on_exec_ctx); - batch_data->batch.payload->recv_trailing_metadata - .recv_trailing_metadata_ready = &batch_data->recv_trailing_metadata_ready; } // Helper function used to start a recv_trailing_metadata batch. This @@ -2267,11 +2364,9 @@ static void start_internal_recv_trailing_metadata(grpc_call_element* elem) { grpc_connected_subchannel_call_get_parent_data( calld->subchannel_call)); // Create batch_data with 2 refs, since this batch will be unreffed twice: - // once for the recv_trailing_metadata_ready callback when the subchannel - // batch returns, and again when we actually get a recv_trailing_metadata - // op from the surface. - subchannel_batch_data* batch_data = - batch_data_create(elem, 2, false /* set_on_complete */); + // once when the subchannel batch returns, and again when we actually get + // a recv_trailing_metadata op from the surface. + subchannel_batch_data* batch_data = batch_data_create(elem, 2); add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data); retry_state->recv_trailing_metadata_internal_batch = batch_data; // Note: This will release the call combiner. @@ -2296,7 +2391,7 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay( "send_initial_metadata op", chand, calld); } - replay_batch_data = batch_data_create(elem, 1, true /* set_on_complete */); + replay_batch_data = batch_data_create(elem, 1); add_retriable_send_initial_metadata_op(calld, retry_state, replay_batch_data); } @@ -2313,8 +2408,7 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay( chand, calld); } if (replay_batch_data == nullptr) { - replay_batch_data = - batch_data_create(elem, 1, true /* set_on_complete */); + replay_batch_data = batch_data_create(elem, 1); } add_retriable_send_message_op(elem, retry_state, replay_batch_data); } @@ -2333,8 +2427,7 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay( chand, calld); } if (replay_batch_data == nullptr) { - replay_batch_data = - batch_data_create(elem, 1, true /* set_on_complete */); + replay_batch_data = batch_data_create(elem, 1); } add_retriable_send_trailing_metadata_op(calld, retry_state, replay_batch_data); @@ -2346,7 +2439,7 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay( // *num_batches as needed. static void add_subchannel_batches_for_pending_batches( grpc_call_element* elem, subchannel_call_retry_state* retry_state, - grpc_core::CallCombinerClosureList* closures) { + closure_to_execute* closures, size_t* num_closures) { call_data* calld = static_cast<call_data*>(elem->call_data); for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { pending_batch* pending = &calld->pending_batches[i]; @@ -2402,11 +2495,13 @@ static void add_subchannel_batches_for_pending_batches( if (retry_state->completed_recv_trailing_metadata) { subchannel_batch_data* batch_data = retry_state->recv_trailing_metadata_internal_batch; + closure_to_execute* closure = &closures[(*num_closures)++]; + closure->closure = &batch_data->on_complete; // Batches containing recv_trailing_metadata always succeed. - closures->Add( - &batch_data->recv_trailing_metadata_ready, GRPC_ERROR_NONE, - "re-executing recv_trailing_metadata_ready to propagate " - "internally triggered result"); + closure->error = GRPC_ERROR_NONE; + closure->reason = + "re-executing on_complete for recv_trailing_metadata " + "to propagate internally triggered result"; } else { batch_data_unref(retry_state->recv_trailing_metadata_internal_batch); } @@ -2418,19 +2513,14 @@ static void add_subchannel_batches_for_pending_batches( if (calld->method_params == nullptr || calld->method_params->retry_policy() == nullptr || calld->retry_committed) { - add_closure_for_subchannel_batch(elem, batch, closures); + add_closure_for_subchannel_batch(calld, batch, closures, num_closures); pending_batch_clear(calld, pending); continue; } // Create batch with the right number of callbacks. - const bool has_send_ops = batch->send_initial_metadata || - batch->send_message || - batch->send_trailing_metadata; - const int num_callbacks = has_send_ops + batch->recv_initial_metadata + - batch->recv_message + - batch->recv_trailing_metadata; - subchannel_batch_data* batch_data = batch_data_create( - elem, num_callbacks, has_send_ops /* set_on_complete */); + const int num_callbacks = + 1 + batch->recv_initial_metadata + batch->recv_message; + subchannel_batch_data* batch_data = batch_data_create(elem, num_callbacks); // Cache send ops if needed. maybe_cache_send_ops_for_batch(calld, pending); // send_initial_metadata. @@ -2457,9 +2547,11 @@ static void add_subchannel_batches_for_pending_batches( } // recv_trailing_metadata. if (batch->recv_trailing_metadata) { + GPR_ASSERT(batch->collect_stats); add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data); } - add_closure_for_subchannel_batch(elem, &batch_data->batch, closures); + add_closure_for_subchannel_batch(calld, &batch_data->batch, closures, + num_closures); // Track number of pending subchannel send batches. // If this is the first one, take a ref to the call stack. if (batch->send_initial_metadata || batch->send_message || @@ -2487,13 +2579,15 @@ static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) { grpc_connected_subchannel_call_get_parent_data( calld->subchannel_call)); // Construct list of closures to execute, one for each pending batch. - grpc_core::CallCombinerClosureList closures; + // We can start up to 6 batches. + closure_to_execute closures[GPR_ARRAY_SIZE(calld->pending_batches)]; + size_t num_closures = 0; // Replay previously-returned send_* ops if needed. subchannel_batch_data* replay_batch_data = maybe_create_subchannel_batch_for_replay(elem, retry_state); if (replay_batch_data != nullptr) { - add_closure_for_subchannel_batch(elem, &replay_batch_data->batch, - &closures); + add_closure_for_subchannel_batch(calld, &replay_batch_data->batch, closures, + &num_closures); // Track number of pending subchannel send batches. // If this is the first one, take a ref to the call stack. if (calld->num_pending_retriable_subchannel_send_batches == 0) { @@ -2502,16 +2596,17 @@ static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) { ++calld->num_pending_retriable_subchannel_send_batches; } // Now add pending batches. - add_subchannel_batches_for_pending_batches(elem, retry_state, &closures); + add_subchannel_batches_for_pending_batches(elem, retry_state, closures, + &num_closures); // Start batches on subchannel call. if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: starting %" PRIuPTR " retriable batches on subchannel_call=%p", - chand, calld, closures.size(), calld->subchannel_call); + chand, calld, num_closures, calld->subchannel_call); } - // Note: This will yield the call combiner. - closures.RunClosures(calld->call_combiner); + execute_closures_in_call_combiner(elem, "start_retriable_subchannel_batches", + closures, num_closures); } // |