diff options
Diffstat (limited to 'src/core/ext/filters/client_channel/client_channel.cc')
-rw-r--r-- | src/core/ext/filters/client_channel/client_channel.cc | 891 |
1 files changed, 398 insertions, 493 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index f141aabe70..8a29c80729 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -804,6 +804,7 @@ typedef struct { // For intercepting recv_trailing_metadata. grpc_metadata_batch recv_trailing_metadata; grpc_transport_stream_stats collect_stats; + grpc_closure recv_trailing_metadata_ready; // For intercepting on_complete. grpc_closure on_complete; } subchannel_batch_data; @@ -1179,35 +1180,24 @@ static void pending_batches_fail(grpc_call_element* elem, grpc_error* error, "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s", elem->channel_data, calld, num_batches, grpc_error_string(error)); } - grpc_transport_stream_op_batch* - batches[GPR_ARRAY_SIZE(calld->pending_batches)]; - size_t num_batches = 0; + grpc_core::CallCombinerClosureList closures; for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { pending_batch* pending = &calld->pending_batches[i]; grpc_transport_stream_op_batch* batch = pending->batch; if (batch != nullptr) { - batches[num_batches++] = batch; + batch->handler_private.extra_arg = calld; + GRPC_CLOSURE_INIT(&batch->handler_private.closure, + fail_pending_batch_in_call_combiner, batch, + grpc_schedule_on_exec_ctx); + closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error), + "pending_batches_fail"); pending_batch_clear(calld, pending); } } - for (size_t i = yield_call_combiner ? 1 : 0; i < num_batches; ++i) { - grpc_transport_stream_op_batch* batch = batches[i]; - batch->handler_private.extra_arg = calld; - GRPC_CLOSURE_INIT(&batch->handler_private.closure, - fail_pending_batch_in_call_combiner, batch, - grpc_schedule_on_exec_ctx); - GRPC_CALL_COMBINER_START(calld->call_combiner, - &batch->handler_private.closure, - GRPC_ERROR_REF(error), "pending_batches_fail"); - } if (yield_call_combiner) { - if (num_batches > 0) { - // Note: This will release the call combiner. - grpc_transport_stream_op_batch_finish_with_failure( - batches[0], GRPC_ERROR_REF(error), calld->call_combiner); - } else { - GRPC_CALL_COMBINER_STOP(calld->call_combiner, "pending_batches_fail"); - } + closures.RunClosures(calld->call_combiner); + } else { + closures.RunClosuresWithoutYielding(calld->call_combiner); } GRPC_ERROR_UNREF(error); } @@ -1242,30 +1232,22 @@ static void pending_batches_resume(grpc_call_element* elem) { " pending batches on subchannel_call=%p", chand, calld, num_batches, calld->subchannel_call); } - grpc_transport_stream_op_batch* - batches[GPR_ARRAY_SIZE(calld->pending_batches)]; - size_t num_batches = 0; + grpc_core::CallCombinerClosureList closures; for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { pending_batch* pending = &calld->pending_batches[i]; grpc_transport_stream_op_batch* batch = pending->batch; if (batch != nullptr) { - batches[num_batches++] = batch; + batch->handler_private.extra_arg = calld->subchannel_call; + GRPC_CLOSURE_INIT(&batch->handler_private.closure, + resume_pending_batch_in_call_combiner, batch, + grpc_schedule_on_exec_ctx); + closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE, + "pending_batches_resume"); pending_batch_clear(calld, pending); } } - for (size_t i = 1; i < num_batches; ++i) { - grpc_transport_stream_op_batch* batch = batches[i]; - batch->handler_private.extra_arg = calld->subchannel_call; - GRPC_CLOSURE_INIT(&batch->handler_private.closure, - resume_pending_batch_in_call_combiner, batch, - grpc_schedule_on_exec_ctx); - GRPC_CALL_COMBINER_START(calld->call_combiner, - &batch->handler_private.closure, GRPC_ERROR_NONE, - "pending_batches_resume"); - } - GPR_ASSERT(num_batches > 0); // Note: This will release the call combiner. - grpc_subchannel_call_process_op(calld->subchannel_call, batches[0]); + closures.RunClosures(calld->call_combiner); } static void maybe_clear_pending_batch(grpc_call_element* elem, @@ -1280,7 +1262,10 @@ static void maybe_clear_pending_batch(grpc_call_element* elem, batch->payload->recv_initial_metadata.recv_initial_metadata_ready == nullptr) && (!batch->recv_message || - batch->payload->recv_message.recv_message_ready == nullptr)) { + batch->payload->recv_message.recv_message_ready == nullptr) && + (!batch->recv_trailing_metadata || + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready == + nullptr)) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand, calld); @@ -1289,75 +1274,27 @@ static void maybe_clear_pending_batch(grpc_call_element* elem, } } -// Returns true if all ops in the pending batch have been completed. -static bool pending_batch_is_completed( - pending_batch* pending, call_data* calld, - subchannel_call_retry_state* retry_state) { - if (pending->batch == nullptr || pending->batch->on_complete == nullptr) { - return false; - } - if (pending->batch->send_initial_metadata && - !retry_state->completed_send_initial_metadata) { - return false; - } - if (pending->batch->send_message && - retry_state->completed_send_message_count < - calld->send_messages->size()) { - return false; - } - if (pending->batch->send_trailing_metadata && - !retry_state->completed_send_trailing_metadata) { - return false; - } - if (pending->batch->recv_initial_metadata && - !retry_state->completed_recv_initial_metadata) { - return false; - } - if (pending->batch->recv_message && - retry_state->completed_recv_message_count < - retry_state->started_recv_message_count) { - return false; - } - if (pending->batch->recv_trailing_metadata && - !retry_state->completed_recv_trailing_metadata) { - return false; - } - return true; -} - -// Returns true if any op in the batch was not yet started. -static bool pending_batch_is_unstarted( - pending_batch* pending, call_data* calld, - subchannel_call_retry_state* retry_state) { - if (pending->batch == nullptr || pending->batch->on_complete == nullptr) { - return false; - } - if (pending->batch->send_initial_metadata && - !retry_state->started_send_initial_metadata) { - return true; - } - if (pending->batch->send_message && - retry_state->started_send_message_count < calld->send_messages->size()) { - return true; - } - if (pending->batch->send_trailing_metadata && - !retry_state->started_send_trailing_metadata) { - return true; - } - if (pending->batch->recv_initial_metadata && - !retry_state->started_recv_initial_metadata) { - return true; - } - if (pending->batch->recv_message && - retry_state->completed_recv_message_count == - retry_state->started_recv_message_count) { - return true; - } - if (pending->batch->recv_trailing_metadata && - !retry_state->started_recv_trailing_metadata) { - return true; +// Returns a pointer to the first pending batch for which predicate(batch) +// returns true, or null if not found. +template <typename Predicate> +static pending_batch* pending_batch_find(grpc_call_element* elem, + const char* log_message, + Predicate predicate) { + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { + pending_batch* pending = &calld->pending_batches[i]; + grpc_transport_stream_op_batch* batch = pending->batch; + if (batch != nullptr && predicate(batch)) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: %s pending batch at index %" PRIuPTR, chand, + calld, log_message, i); + } + return pending; + } } - return false; + return nullptr; } // @@ -1544,8 +1481,13 @@ static bool maybe_retry(grpc_call_element* elem, // subchannel_batch_data // +// Creates a subchannel_batch_data object on the call's arena with the +// specified refcount. If set_on_complete is true, the batch's +// on_complete callback will be set to point to on_complete(); +// otherwise, the batch's on_complete callback will be null. static subchannel_batch_data* batch_data_create(grpc_call_element* elem, - int refcount) { + int refcount, + bool set_on_complete) { call_data* calld = static_cast<call_data*>(elem->call_data); subchannel_call_retry_state* retry_state = static_cast<subchannel_call_retry_state*>( @@ -1558,9 +1500,11 @@ static subchannel_batch_data* batch_data_create(grpc_call_element* elem, GRPC_SUBCHANNEL_CALL_REF(calld->subchannel_call, "batch_data_create"); batch_data->batch.payload = &retry_state->batch_payload; gpr_ref_init(&batch_data->refs, refcount); - GRPC_CLOSURE_INIT(&batch_data->on_complete, on_complete, batch_data, - grpc_schedule_on_exec_ctx); - batch_data->batch.on_complete = &batch_data->on_complete; + if (set_on_complete) { + GRPC_CLOSURE_INIT(&batch_data->on_complete, on_complete, batch_data, + grpc_schedule_on_exec_ctx); + batch_data->batch.on_complete = &batch_data->on_complete; + } GRPC_CALL_STACK_REF(calld->owning_call, "batch_data"); return batch_data; } @@ -1593,26 +1537,14 @@ static void batch_data_unref(subchannel_batch_data* batch_data) { static void invoke_recv_initial_metadata_callback(void* arg, grpc_error* error) { subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg); - channel_data* chand = - static_cast<channel_data*>(batch_data->elem->channel_data); - call_data* calld = static_cast<call_data*>(batch_data->elem->call_data); // Find pending batch. - pending_batch* pending = nullptr; - for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { - grpc_transport_stream_op_batch* batch = calld->pending_batches[i].batch; - if (batch != nullptr && batch->recv_initial_metadata && - batch->payload->recv_initial_metadata.recv_initial_metadata_ready != - nullptr) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: invoking recv_initial_metadata_ready for " - "pending batch at index %" PRIuPTR, - chand, calld, i); - } - pending = &calld->pending_batches[i]; - break; - } - } + pending_batch* pending = pending_batch_find( + batch_data->elem, "invoking recv_initial_metadata_ready for", + [](grpc_transport_stream_op_batch* batch) { + return batch->recv_initial_metadata && + batch->payload->recv_initial_metadata + .recv_initial_metadata_ready != nullptr; + }); GPR_ASSERT(pending != nullptr); // Return metadata. grpc_metadata_batch_move( @@ -1648,10 +1580,19 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) { static_cast<subchannel_call_retry_state*>( grpc_connected_subchannel_call_get_parent_data( batch_data->subchannel_call)); + retry_state->completed_recv_initial_metadata = true; + // If a retry was already dispatched, then we're not going to use the + // result of this recv_initial_metadata op, so do nothing. + if (retry_state->retry_dispatched) { + GRPC_CALL_COMBINER_STOP( + calld->call_combiner, + "recv_initial_metadata_ready after retry dispatched"); + return; + } // If we got an error or a Trailers-Only response and have not yet gotten - // the recv_trailing_metadata on_complete callback, then defer - // propagating this callback back to the surface. We can evaluate whether - // to retry when recv_trailing_metadata comes back. + // the recv_trailing_metadata_ready callback, then defer propagating this + // callback back to the surface. We can evaluate whether to retry when + // recv_trailing_metadata comes back. if (GPR_UNLIKELY((batch_data->trailing_metadata_available || error != GRPC_ERROR_NONE) && !retry_state->completed_recv_trailing_metadata)) { @@ -1676,9 +1617,9 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) { } // Received valid initial metadata, so commit the call. retry_commit(elem, retry_state); + // Invoke the callback to return the result to the surface. // Manually invoking a callback function; it does not take ownership of error. invoke_recv_initial_metadata_callback(batch_data, error); - GRPC_ERROR_UNREF(error); } // @@ -1688,25 +1629,13 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) { // Invokes recv_message_ready for a subchannel batch. static void invoke_recv_message_callback(void* arg, grpc_error* error) { subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg); - channel_data* chand = - static_cast<channel_data*>(batch_data->elem->channel_data); - call_data* calld = static_cast<call_data*>(batch_data->elem->call_data); // Find pending op. - pending_batch* pending = nullptr; - for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { - grpc_transport_stream_op_batch* batch = calld->pending_batches[i].batch; - if (batch != nullptr && batch->recv_message && - batch->payload->recv_message.recv_message_ready != nullptr) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: invoking recv_message_ready for " - "pending batch at index %" PRIuPTR, - chand, calld, i); - } - pending = &calld->pending_batches[i]; - break; - } - } + pending_batch* pending = pending_batch_find( + batch_data->elem, "invoking recv_message_ready for", + [](grpc_transport_stream_op_batch* batch) { + return batch->recv_message && + batch->payload->recv_message.recv_message_ready != nullptr; + }); GPR_ASSERT(pending != nullptr); // Return payload. *pending->batch->payload->recv_message.recv_message = @@ -1738,10 +1667,18 @@ static void recv_message_ready(void* arg, grpc_error* error) { static_cast<subchannel_call_retry_state*>( grpc_connected_subchannel_call_get_parent_data( batch_data->subchannel_call)); + ++retry_state->completed_recv_message_count; + // If a retry was already dispatched, then we're not going to use the + // result of this recv_message op, so do nothing. + if (retry_state->retry_dispatched) { + GRPC_CALL_COMBINER_STOP(calld->call_combiner, + "recv_message_ready after retry dispatched"); + return; + } // If we got an error or the payload was nullptr and we have not yet gotten - // the recv_trailing_metadata on_complete callback, then defer - // propagating this callback back to the surface. We can evaluate whether - // to retry when recv_trailing_metadata comes back. + // the recv_trailing_metadata_ready callback, then defer propagating this + // callback back to the surface. We can evaluate whether to retry when + // recv_trailing_metadata comes back. if (GPR_UNLIKELY( (batch_data->recv_message == nullptr || error != GRPC_ERROR_NONE) && !retry_state->completed_recv_trailing_metadata)) { @@ -1764,133 +1701,268 @@ static void recv_message_ready(void* arg, grpc_error* error) { } // Received a valid message, so commit the call. retry_commit(elem, retry_state); + // Invoke the callback to return the result to the surface. // Manually invoking a callback function; it does not take ownership of error. invoke_recv_message_callback(batch_data, error); - GRPC_ERROR_UNREF(error); } // -// list of closures to execute in call combiner +// recv_trailing_metadata handling // -// Represents a closure that needs to run in the call combiner as part of -// starting or completing a batch. -typedef struct { - grpc_closure* closure; - grpc_error* error; - const char* reason; - bool free_reason = false; -} closure_to_execute; - -static void execute_closures_in_call_combiner(grpc_call_element* elem, - const char* caller, - closure_to_execute* closures, - size_t num_closures) { - channel_data* chand = static_cast<channel_data*>(elem->channel_data); +// Sets *status and *server_pushback_md based on batch_data and error. +static void get_call_status(subchannel_batch_data* batch_data, + grpc_error* error, grpc_status_code* status, + grpc_mdelem** server_pushback_md) { + grpc_call_element* elem = batch_data->elem; call_data* calld = static_cast<call_data*>(elem->call_data); - // Note that the call combiner will be yielded for each closure that - // we schedule. We're already running in the call combiner, so one of - // the closures can be scheduled directly, but the others will - // have to re-enter the call combiner. - if (num_closures > 0) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p calld=%p: %s starting closure: %s", chand, - calld, caller, closures[0].reason); - } - GRPC_CLOSURE_SCHED(closures[0].closure, closures[0].error); - if (closures[0].free_reason) { - gpr_free(const_cast<char*>(closures[0].reason)); - } - for (size_t i = 1; i < num_closures; ++i) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: %s starting closure in call combiner: %s", - chand, calld, caller, closures[i].reason); - } - GRPC_CALL_COMBINER_START(calld->call_combiner, closures[i].closure, - closures[i].error, closures[i].reason); - if (closures[i].free_reason) { - gpr_free(const_cast<char*>(closures[i].reason)); - } - } + if (error != GRPC_ERROR_NONE) { + grpc_error_get_status(error, calld->deadline, status, nullptr, nullptr, + nullptr); } else { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p calld=%p: no closures to run for %s", chand, - calld, caller); + grpc_metadata_batch* md_batch = + batch_data->batch.payload->recv_trailing_metadata + .recv_trailing_metadata; + GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr); + *status = + grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md); + if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) { + *server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md; } - GRPC_CALL_COMBINER_STOP(calld->call_combiner, "no closures to run"); } + GRPC_ERROR_UNREF(error); } -// -// on_complete callback handling -// - -// Updates retry_state to reflect the ops completed in batch_data. -static void update_retry_state_for_completed_batch( - subchannel_batch_data* batch_data, - subchannel_call_retry_state* retry_state) { - if (batch_data->batch.send_initial_metadata) { - retry_state->completed_send_initial_metadata = true; - } - if (batch_data->batch.send_message) { - ++retry_state->completed_send_message_count; - } - if (batch_data->batch.send_trailing_metadata) { - retry_state->completed_send_trailing_metadata = true; - } - if (batch_data->batch.recv_initial_metadata) { - retry_state->completed_recv_initial_metadata = true; - } - if (batch_data->batch.recv_message) { - ++retry_state->completed_recv_message_count; - } - if (batch_data->batch.recv_trailing_metadata) { - retry_state->completed_recv_trailing_metadata = true; +// Adds recv_trailing_metadata_ready closure to closures. +static void add_closure_for_recv_trailing_metadata_ready( + grpc_call_element* elem, subchannel_batch_data* batch_data, + grpc_error* error, grpc_core::CallCombinerClosureList* closures) { + // Find pending batch. + pending_batch* pending = pending_batch_find( + elem, "invoking recv_trailing_metadata for", + [](grpc_transport_stream_op_batch* batch) { + return batch->recv_trailing_metadata && + batch->payload->recv_trailing_metadata + .recv_trailing_metadata_ready != nullptr; + }); + // If we generated the recv_trailing_metadata op internally via + // start_internal_recv_trailing_metadata(), then there will be no + // pending batch. + if (pending == nullptr) { + GRPC_ERROR_UNREF(error); + return; } + // Return metadata. + grpc_metadata_batch_move( + &batch_data->recv_trailing_metadata, + pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata); + // Add closure. + closures->Add(pending->batch->payload->recv_trailing_metadata + .recv_trailing_metadata_ready, + error, "recv_trailing_metadata_ready for pending batch"); + // Update bookkeeping. + pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = + nullptr; + maybe_clear_pending_batch(elem, pending); } // Adds any necessary closures for deferred recv_initial_metadata and -// recv_message callbacks to closures, updating *num_closures as needed. +// recv_message callbacks to closures. static void add_closures_for_deferred_recv_callbacks( subchannel_batch_data* batch_data, subchannel_call_retry_state* retry_state, - closure_to_execute* closures, size_t* num_closures) { + grpc_core::CallCombinerClosureList* closures) { if (batch_data->batch.recv_trailing_metadata) { // Add closure for deferred recv_initial_metadata_ready. if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch != nullptr)) { - closure_to_execute* closure = &closures[(*num_closures)++]; - closure->closure = GRPC_CLOSURE_INIT( - &batch_data->recv_initial_metadata_ready, - invoke_recv_initial_metadata_callback, - retry_state->recv_initial_metadata_ready_deferred_batch, - grpc_schedule_on_exec_ctx); - closure->error = retry_state->recv_initial_metadata_error; - closure->reason = "resuming recv_initial_metadata_ready"; + GRPC_CLOSURE_INIT(&batch_data->recv_initial_metadata_ready, + invoke_recv_initial_metadata_callback, + retry_state->recv_initial_metadata_ready_deferred_batch, + grpc_schedule_on_exec_ctx); + closures->Add(&batch_data->recv_initial_metadata_ready, + retry_state->recv_initial_metadata_error, + "resuming recv_initial_metadata_ready"); retry_state->recv_initial_metadata_ready_deferred_batch = nullptr; } // Add closure for deferred recv_message_ready. if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch != nullptr)) { - closure_to_execute* closure = &closures[(*num_closures)++]; - closure->closure = GRPC_CLOSURE_INIT( - &batch_data->recv_message_ready, invoke_recv_message_callback, - retry_state->recv_message_ready_deferred_batch, - grpc_schedule_on_exec_ctx); - closure->error = retry_state->recv_message_error; - closure->reason = "resuming recv_message_ready"; + GRPC_CLOSURE_INIT(&batch_data->recv_message_ready, + invoke_recv_message_callback, + retry_state->recv_message_ready_deferred_batch, + grpc_schedule_on_exec_ctx); + closures->Add(&batch_data->recv_message_ready, + retry_state->recv_message_error, + "resuming recv_message_ready"); retry_state->recv_message_ready_deferred_batch = nullptr; } } } +// Returns true if any op in the batch was not yet started. +// Only looks at send ops, since recv ops are always started immediately. +static bool pending_batch_is_unstarted( + pending_batch* pending, call_data* calld, + subchannel_call_retry_state* retry_state) { + if (pending->batch == nullptr || pending->batch->on_complete == nullptr) { + return false; + } + if (pending->batch->send_initial_metadata && + !retry_state->started_send_initial_metadata) { + return true; + } + if (pending->batch->send_message && + retry_state->started_send_message_count < calld->send_messages->size()) { + return true; + } + if (pending->batch->send_trailing_metadata && + !retry_state->started_send_trailing_metadata) { + return true; + } + return false; +} + +// For any pending batch containing an op that has not yet been started, +// adds the pending batch's completion closures to closures. +static void add_closures_to_fail_unstarted_pending_batches( + grpc_call_element* elem, subchannel_call_retry_state* retry_state, + grpc_error* error, grpc_core::CallCombinerClosureList* closures) { + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { + pending_batch* pending = &calld->pending_batches[i]; + if (pending_batch_is_unstarted(pending, calld, retry_state)) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: failing unstarted pending batch at index " + "%" PRIuPTR, + chand, calld, i); + } + closures->Add(pending->batch->on_complete, GRPC_ERROR_REF(error), + "failing on_complete for pending batch"); + pending->batch->on_complete = nullptr; + maybe_clear_pending_batch(elem, pending); + } + } + GRPC_ERROR_UNREF(error); +} + +// Runs necessary closures upon completion of a call attempt. +static void run_closures_for_completed_call(subchannel_batch_data* batch_data, + grpc_error* error) { + grpc_call_element* elem = batch_data->elem; + call_data* calld = static_cast<call_data*>(elem->call_data); + subchannel_call_retry_state* retry_state = + static_cast<subchannel_call_retry_state*>( + grpc_connected_subchannel_call_get_parent_data( + batch_data->subchannel_call)); + // Construct list of closures to execute. + grpc_core::CallCombinerClosureList closures; + // First, add closure for recv_trailing_metadata_ready. + add_closure_for_recv_trailing_metadata_ready( + elem, batch_data, GRPC_ERROR_REF(error), &closures); + // If there are deferred recv_initial_metadata_ready or recv_message_ready + // callbacks, add them to closures. + add_closures_for_deferred_recv_callbacks(batch_data, retry_state, &closures); + // Add closures to fail any pending batches that have not yet been started. + add_closures_to_fail_unstarted_pending_batches( + elem, retry_state, GRPC_ERROR_REF(error), &closures); + // Don't need batch_data anymore. + batch_data_unref(batch_data); + // Schedule all of the closures identified above. + // Note: This will release the call combiner. + closures.RunClosures(calld->call_combiner); + GRPC_ERROR_UNREF(error); +} + +// Intercepts recv_trailing_metadata_ready callback for retries. +// Commits the call and returns the trailing metadata up the stack. +static void recv_trailing_metadata_ready(void* arg, grpc_error* error) { + subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg); + grpc_call_element* elem = batch_data->elem; + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: got recv_trailing_metadata_ready, error=%s", + chand, calld, grpc_error_string(error)); + } + subchannel_call_retry_state* retry_state = + static_cast<subchannel_call_retry_state*>( + grpc_connected_subchannel_call_get_parent_data( + batch_data->subchannel_call)); + retry_state->completed_recv_trailing_metadata = true; + // Get the call's status and check for server pushback metadata. + grpc_status_code status = GRPC_STATUS_OK; + grpc_mdelem* server_pushback_md = nullptr; + get_call_status(batch_data, GRPC_ERROR_REF(error), &status, + &server_pushback_md); + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand, + calld, grpc_status_code_to_string(status)); + } + // Check if we should retry. + if (maybe_retry(elem, batch_data, status, server_pushback_md)) { + // Unref batch_data for deferred recv_initial_metadata_ready or + // recv_message_ready callbacks, if any. + if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) { + batch_data_unref(batch_data); + GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error); + } + if (retry_state->recv_message_ready_deferred_batch != nullptr) { + batch_data_unref(batch_data); + GRPC_ERROR_UNREF(retry_state->recv_message_error); + } + batch_data_unref(batch_data); + return; + } + // Not retrying, so commit the call. + retry_commit(elem, retry_state); + // Run any necessary closures. + run_closures_for_completed_call(batch_data, GRPC_ERROR_REF(error)); +} + +// +// on_complete callback handling +// + +// Adds the on_complete closure for the pending batch completed in +// batch_data to closures. +static void add_closure_for_completed_pending_batch( + grpc_call_element* elem, subchannel_batch_data* batch_data, + subchannel_call_retry_state* retry_state, grpc_error* error, + grpc_core::CallCombinerClosureList* closures) { + pending_batch* pending = pending_batch_find( + elem, "completed", [batch_data](grpc_transport_stream_op_batch* batch) { + // Match the pending batch with the same set of send ops as the + // subchannel batch we've just completed. + return batch->on_complete != nullptr && + batch_data->batch.send_initial_metadata == + batch->send_initial_metadata && + batch_data->batch.send_message == batch->send_message && + batch_data->batch.send_trailing_metadata == + batch->send_trailing_metadata; + }); + // If batch_data is a replay batch, then there will be no pending + // batch to complete. + if (pending == nullptr) { + GRPC_ERROR_UNREF(error); + return; + } + // Add closure. + closures->Add(pending->batch->on_complete, error, + "on_complete for pending batch"); + pending->batch->on_complete = nullptr; + maybe_clear_pending_batch(elem, pending); +} + // If there are any cached ops to replay or pending ops to start on the // subchannel call, adds a closure to closures to invoke -// start_retriable_subchannel_batches(), updating *num_closures as needed. +// start_retriable_subchannel_batches(). static void add_closures_for_replay_or_pending_send_ops( grpc_call_element* elem, subchannel_batch_data* batch_data, - subchannel_call_retry_state* retry_state, closure_to_execute* closures, - size_t* num_closures) { + subchannel_call_retry_state* retry_state, + grpc_core::CallCombinerClosureList* closures) { channel_data* chand = static_cast<channel_data*>(elem->channel_data); call_data* calld = static_cast<call_data*>(elem->call_data); bool have_pending_send_message_ops = @@ -1916,93 +1988,12 @@ static void add_closures_for_replay_or_pending_send_ops( "chand=%p calld=%p: starting next batch for pending send op(s)", chand, calld); } - closure_to_execute* closure = &closures[(*num_closures)++]; - closure->closure = GRPC_CLOSURE_INIT( - &batch_data->batch.handler_private.closure, - start_retriable_subchannel_batches, elem, grpc_schedule_on_exec_ctx); - closure->error = GRPC_ERROR_NONE; - closure->reason = "starting next batch for send_* op(s)"; - } -} - -// For any pending batch completed in batch_data, adds the necessary -// completion closures to closures, updating *num_closures as needed. -static void add_closures_for_completed_pending_batches( - grpc_call_element* elem, subchannel_batch_data* batch_data, - subchannel_call_retry_state* retry_state, grpc_error* error, - closure_to_execute* closures, size_t* num_closures) { - channel_data* chand = static_cast<channel_data*>(elem->channel_data); - call_data* calld = static_cast<call_data*>(elem->call_data); - for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { - pending_batch* pending = &calld->pending_batches[i]; - if (pending_batch_is_completed(pending, calld, retry_state)) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: pending batch completed at index %" PRIuPTR, - chand, calld, i); - } - // Copy the trailing metadata to return it to the surface. - if (batch_data->batch.recv_trailing_metadata) { - grpc_metadata_batch_move(&batch_data->recv_trailing_metadata, - pending->batch->payload->recv_trailing_metadata - .recv_trailing_metadata); - } - closure_to_execute* closure = &closures[(*num_closures)++]; - closure->closure = pending->batch->on_complete; - closure->error = GRPC_ERROR_REF(error); - closure->reason = "on_complete for pending batch"; - pending->batch->on_complete = nullptr; - maybe_clear_pending_batch(elem, pending); - } + GRPC_CLOSURE_INIT(&batch_data->batch.handler_private.closure, + start_retriable_subchannel_batches, elem, + grpc_schedule_on_exec_ctx); + closures->Add(&batch_data->batch.handler_private.closure, GRPC_ERROR_NONE, + "starting next batch for send_* op(s)"); } - GRPC_ERROR_UNREF(error); -} - -// For any pending batch containing an op that has not yet been started, -// adds the pending batch's completion closures to closures, updating -// *num_closures as needed. -static void add_closures_to_fail_unstarted_pending_batches( - grpc_call_element* elem, subchannel_call_retry_state* retry_state, - grpc_error* error, closure_to_execute* closures, size_t* num_closures) { - channel_data* chand = static_cast<channel_data*>(elem->channel_data); - call_data* calld = static_cast<call_data*>(elem->call_data); - for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { - pending_batch* pending = &calld->pending_batches[i]; - if (pending_batch_is_unstarted(pending, calld, retry_state)) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: failing unstarted pending batch at index " - "%" PRIuPTR, - chand, calld, i); - } - if (pending->batch->recv_initial_metadata) { - closure_to_execute* closure = &closures[(*num_closures)++]; - closure->closure = pending->batch->payload->recv_initial_metadata - .recv_initial_metadata_ready; - closure->error = GRPC_ERROR_REF(error); - closure->reason = - "failing recv_initial_metadata_ready for pending batch"; - pending->batch->payload->recv_initial_metadata - .recv_initial_metadata_ready = nullptr; - } - if (pending->batch->recv_message) { - *pending->batch->payload->recv_message.recv_message = nullptr; - closure_to_execute* closure = &closures[(*num_closures)++]; - closure->closure = - pending->batch->payload->recv_message.recv_message_ready; - closure->error = GRPC_ERROR_REF(error); - closure->reason = "failing recv_message_ready for pending batch"; - pending->batch->payload->recv_message.recv_message_ready = nullptr; - } - closure_to_execute* closure = &closures[(*num_closures)++]; - closure->closure = pending->batch->on_complete; - closure->error = GRPC_ERROR_REF(error); - closure->reason = "failing on_complete for pending batch"; - pending->batch->on_complete = nullptr; - maybe_clear_pending_batch(elem, pending); - } - } - GRPC_ERROR_UNREF(error); } // Callback used to intercept on_complete from subchannel calls. @@ -2022,136 +2013,49 @@ static void on_complete(void* arg, grpc_error* error) { static_cast<subchannel_call_retry_state*>( grpc_connected_subchannel_call_get_parent_data( batch_data->subchannel_call)); - // If we have previously completed recv_trailing_metadata, then the - // call is finished. - bool call_finished = retry_state->completed_recv_trailing_metadata; - // Record whether we were already committed before receiving this callback. - const bool previously_committed = calld->retry_committed; // Update bookkeeping in retry_state. - update_retry_state_for_completed_batch(batch_data, retry_state); - if (call_finished) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p calld=%p: call already finished", chand, - calld); - } - } else { - // Check if this batch finished the call, and if so, get its status. - // The call is finished if either (a) this callback was invoked with - // an error or (b) we receive status. - grpc_status_code status = GRPC_STATUS_OK; - grpc_mdelem* server_pushback_md = nullptr; - if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { // Case (a). - call_finished = true; - grpc_error_get_status(error, calld->deadline, &status, nullptr, nullptr, - nullptr); - } else if (batch_data->batch.recv_trailing_metadata) { // Case (b). - call_finished = true; - grpc_metadata_batch* md_batch = - batch_data->batch.payload->recv_trailing_metadata - .recv_trailing_metadata; - GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr); - status = grpc_get_status_code_from_metadata( - md_batch->idx.named.grpc_status->md); - if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) { - server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md; - } - } - // If the call just finished, check if we should retry. - if (call_finished) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand, - calld, grpc_status_code_to_string(status)); - } - if (maybe_retry(elem, batch_data, status, server_pushback_md)) { - // Unref batch_data for deferred recv_initial_metadata_ready or - // recv_message_ready callbacks, if any. - if (batch_data->batch.recv_trailing_metadata && - retry_state->recv_initial_metadata_ready_deferred_batch != - nullptr) { - batch_data_unref(batch_data); - GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error); - } - if (batch_data->batch.recv_trailing_metadata && - retry_state->recv_message_ready_deferred_batch != nullptr) { - batch_data_unref(batch_data); - GRPC_ERROR_UNREF(retry_state->recv_message_error); - } - // Track number of pending subchannel send batches and determine if - // this was the last one. - bool last_callback_complete = false; - if (batch_data->batch.send_initial_metadata || - batch_data->batch.send_message || - batch_data->batch.send_trailing_metadata) { - --calld->num_pending_retriable_subchannel_send_batches; - last_callback_complete = - calld->num_pending_retriable_subchannel_send_batches == 0; - } - batch_data_unref(batch_data); - // If we just completed the last subchannel send batch, unref the - // call stack. - if (last_callback_complete) { - GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches"); - } - return; - } - // Not retrying, so commit the call. - retry_commit(elem, retry_state); - } + if (batch_data->batch.send_initial_metadata) { + retry_state->completed_send_initial_metadata = true; + } + if (batch_data->batch.send_message) { + ++retry_state->completed_send_message_count; } - // If we were already committed before receiving this callback, free - // cached data for send ops that we've just completed. (If the call has - // just now finished, the call to retry_commit() above will have freed all - // cached send ops, so we don't need to do it here.) - if (previously_committed) { + if (batch_data->batch.send_trailing_metadata) { + retry_state->completed_send_trailing_metadata = true; + } + // If the call is committed, free cached data for send ops that we've just + // completed. + if (calld->retry_committed) { free_cached_send_op_data_for_completed_batch(elem, batch_data, retry_state); } - // Call not being retried. // Construct list of closures to execute. - // Max number of closures is number of pending batches plus one for - // each of: - // - recv_initial_metadata_ready (either deferred or unstarted) - // - recv_message_ready (either deferred or unstarted) - // - starting a new batch for pending send ops - closure_to_execute closures[GPR_ARRAY_SIZE(calld->pending_batches) + 3]; - size_t num_closures = 0; - // If there are deferred recv_initial_metadata_ready or recv_message_ready - // callbacks, add them to closures. - add_closures_for_deferred_recv_callbacks(batch_data, retry_state, closures, - &num_closures); - // Find pending batches whose ops are now complete and add their - // on_complete callbacks to closures. - add_closures_for_completed_pending_batches(elem, batch_data, retry_state, - GRPC_ERROR_REF(error), closures, - &num_closures); - // Add closures to handle any pending batches that have not yet been started. - // If the call is finished, we fail these batches; otherwise, we add a - // callback to start_retriable_subchannel_batches() to start them on - // the subchannel call. - if (call_finished) { - add_closures_to_fail_unstarted_pending_batches( - elem, retry_state, GRPC_ERROR_REF(error), closures, &num_closures); - } else { - add_closures_for_replay_or_pending_send_ops(elem, batch_data, retry_state, - closures, &num_closures); + grpc_core::CallCombinerClosureList closures; + // If a retry was already dispatched, that means we saw + // recv_trailing_metadata before this, so we do nothing here. + // Otherwise, invoke the callback to return the result to the surface. + if (!retry_state->retry_dispatched) { + // Add closure for the completed pending batch, if any. + add_closure_for_completed_pending_batch(elem, batch_data, retry_state, + GRPC_ERROR_REF(error), &closures); + // If needed, add a callback to start any replay or pending send ops on + // the subchannel call. + if (!retry_state->completed_recv_trailing_metadata) { + add_closures_for_replay_or_pending_send_ops(elem, batch_data, retry_state, + &closures); + } } // Track number of pending subchannel send batches and determine if this // was the last one. - bool last_callback_complete = false; - if (batch_data->batch.send_initial_metadata || - batch_data->batch.send_message || - batch_data->batch.send_trailing_metadata) { - --calld->num_pending_retriable_subchannel_send_batches; - last_callback_complete = - calld->num_pending_retriable_subchannel_send_batches == 0; - } + --calld->num_pending_retriable_subchannel_send_batches; + const bool last_send_batch_complete = + calld->num_pending_retriable_subchannel_send_batches == 0; // Don't need batch_data anymore. batch_data_unref(batch_data); // Schedule all of the closures identified above. // Note: This yeilds the call combiner. - execute_closures_in_call_combiner(elem, "on_complete", closures, - num_closures); - // If we just completed the last subchannel send batch, unref the call stack. - if (last_callback_complete) { + closures.RunClosures(calld->call_combiner); + // If this was the last subchannel send batch, unref the call stack. + if (last_send_batch_complete) { GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches"); } } @@ -2172,27 +2076,22 @@ static void start_batch_in_call_combiner(void* arg, grpc_error* ignored) { // Adds a closure to closures that will execute batch in the call combiner. static void add_closure_for_subchannel_batch( - call_data* calld, grpc_transport_stream_op_batch* batch, - closure_to_execute* closures, size_t* num_closures) { + grpc_call_element* elem, grpc_transport_stream_op_batch* batch, + grpc_core::CallCombinerClosureList* closures) { + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); batch->handler_private.extra_arg = calld->subchannel_call; GRPC_CLOSURE_INIT(&batch->handler_private.closure, start_batch_in_call_combiner, batch, grpc_schedule_on_exec_ctx); - closure_to_execute* closure = &closures[(*num_closures)++]; - closure->closure = &batch->handler_private.closure; - closure->error = GRPC_ERROR_NONE; - // If the tracer is enabled, we log a more detailed message, which - // requires dynamic allocation. This will be freed in - // start_retriable_subchannel_batches(). if (grpc_client_channel_trace.enabled()) { char* batch_str = grpc_transport_stream_op_batch_string(batch); - gpr_asprintf(const_cast<char**>(&closure->reason), - "starting batch in call combiner: %s", batch_str); + gpr_log(GPR_INFO, "chand=%p calld=%p: starting subchannel batch: %s", chand, + calld, batch_str); gpr_free(batch_str); - closure->free_reason = true; - } else { - closure->reason = "start_subchannel_batch"; } + closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE, + "start_subchannel_batch"); } // Adds retriable send_initial_metadata op to batch_data. @@ -2328,9 +2227,13 @@ static void add_retriable_recv_trailing_metadata_op( grpc_metadata_batch_init(&batch_data->recv_trailing_metadata); batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata = &batch_data->recv_trailing_metadata; - batch_data->batch.collect_stats = true; - batch_data->batch.payload->collect_stats.collect_stats = + batch_data->batch.payload->recv_trailing_metadata.collect_stats = &batch_data->collect_stats; + GRPC_CLOSURE_INIT(&batch_data->recv_trailing_metadata_ready, + recv_trailing_metadata_ready, batch_data, + grpc_schedule_on_exec_ctx); + batch_data->batch.payload->recv_trailing_metadata + .recv_trailing_metadata_ready = &batch_data->recv_trailing_metadata_ready; } // Helper function used to start a recv_trailing_metadata batch. This @@ -2351,9 +2254,11 @@ static void start_internal_recv_trailing_metadata(grpc_call_element* elem) { grpc_connected_subchannel_call_get_parent_data( calld->subchannel_call)); // Create batch_data with 2 refs, since this batch will be unreffed twice: - // once when the subchannel batch returns, and again when we actually get - // a recv_trailing_metadata op from the surface. - subchannel_batch_data* batch_data = batch_data_create(elem, 2); + // once for the recv_trailing_metadata_ready callback when the subchannel + // batch returns, and again when we actually get a recv_trailing_metadata + // op from the surface. + subchannel_batch_data* batch_data = + batch_data_create(elem, 2, false /* set_on_complete */); add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data); retry_state->recv_trailing_metadata_internal_batch = batch_data; // Note: This will release the call combiner. @@ -2378,7 +2283,7 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay( "send_initial_metadata op", chand, calld); } - replay_batch_data = batch_data_create(elem, 1); + replay_batch_data = batch_data_create(elem, 1, true /* set_on_complete */); add_retriable_send_initial_metadata_op(calld, retry_state, replay_batch_data); } @@ -2395,7 +2300,8 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay( chand, calld); } if (replay_batch_data == nullptr) { - replay_batch_data = batch_data_create(elem, 1); + replay_batch_data = + batch_data_create(elem, 1, true /* set_on_complete */); } add_retriable_send_message_op(elem, retry_state, replay_batch_data); } @@ -2414,7 +2320,8 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay( chand, calld); } if (replay_batch_data == nullptr) { - replay_batch_data = batch_data_create(elem, 1); + replay_batch_data = + batch_data_create(elem, 1, true /* set_on_complete */); } add_retriable_send_trailing_metadata_op(calld, retry_state, replay_batch_data); @@ -2426,7 +2333,7 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay( // *num_batches as needed. static void add_subchannel_batches_for_pending_batches( grpc_call_element* elem, subchannel_call_retry_state* retry_state, - closure_to_execute* closures, size_t* num_closures) { + grpc_core::CallCombinerClosureList* closures) { call_data* calld = static_cast<call_data*>(elem->call_data); for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { pending_batch* pending = &calld->pending_batches[i]; @@ -2482,13 +2389,11 @@ static void add_subchannel_batches_for_pending_batches( if (retry_state->completed_recv_trailing_metadata) { subchannel_batch_data* batch_data = retry_state->recv_trailing_metadata_internal_batch; - closure_to_execute* closure = &closures[(*num_closures)++]; - closure->closure = &batch_data->on_complete; // Batches containing recv_trailing_metadata always succeed. - closure->error = GRPC_ERROR_NONE; - closure->reason = - "re-executing on_complete for recv_trailing_metadata " - "to propagate internally triggered result"; + closures->Add( + &batch_data->recv_trailing_metadata_ready, GRPC_ERROR_NONE, + "re-executing recv_trailing_metadata_ready to propagate " + "internally triggered result"); } else { batch_data_unref(retry_state->recv_trailing_metadata_internal_batch); } @@ -2500,14 +2405,19 @@ static void add_subchannel_batches_for_pending_batches( if (calld->method_params == nullptr || calld->method_params->retry_policy() == nullptr || calld->retry_committed) { - add_closure_for_subchannel_batch(calld, batch, closures, num_closures); + add_closure_for_subchannel_batch(elem, batch, closures); pending_batch_clear(calld, pending); continue; } // Create batch with the right number of callbacks. - const int num_callbacks = - 1 + batch->recv_initial_metadata + batch->recv_message; - subchannel_batch_data* batch_data = batch_data_create(elem, num_callbacks); + const bool has_send_ops = batch->send_initial_metadata || + batch->send_message || + batch->send_trailing_metadata; + const int num_callbacks = has_send_ops + batch->recv_initial_metadata + + batch->recv_message + + batch->recv_trailing_metadata; + subchannel_batch_data* batch_data = batch_data_create( + elem, num_callbacks, has_send_ops /* set_on_complete */); // Cache send ops if needed. maybe_cache_send_ops_for_batch(calld, pending); // send_initial_metadata. @@ -2534,11 +2444,9 @@ static void add_subchannel_batches_for_pending_batches( } // recv_trailing_metadata. if (batch->recv_trailing_metadata) { - GPR_ASSERT(batch->collect_stats); add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data); } - add_closure_for_subchannel_batch(calld, &batch_data->batch, closures, - num_closures); + add_closure_for_subchannel_batch(elem, &batch_data->batch, closures); // Track number of pending subchannel send batches. // If this is the first one, take a ref to the call stack. if (batch->send_initial_metadata || batch->send_message || @@ -2566,15 +2474,13 @@ static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) { grpc_connected_subchannel_call_get_parent_data( calld->subchannel_call)); // Construct list of closures to execute, one for each pending batch. - // We can start up to 6 batches. - closure_to_execute closures[GPR_ARRAY_SIZE(calld->pending_batches)]; - size_t num_closures = 0; + grpc_core::CallCombinerClosureList closures; // Replay previously-returned send_* ops if needed. subchannel_batch_data* replay_batch_data = maybe_create_subchannel_batch_for_replay(elem, retry_state); if (replay_batch_data != nullptr) { - add_closure_for_subchannel_batch(calld, &replay_batch_data->batch, closures, - &num_closures); + add_closure_for_subchannel_batch(elem, &replay_batch_data->batch, + &closures); // Track number of pending subchannel send batches. // If this is the first one, take a ref to the call stack. if (calld->num_pending_retriable_subchannel_send_batches == 0) { @@ -2583,17 +2489,16 @@ static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) { ++calld->num_pending_retriable_subchannel_send_batches; } // Now add pending batches. - add_subchannel_batches_for_pending_batches(elem, retry_state, closures, - &num_closures); + add_subchannel_batches_for_pending_batches(elem, retry_state, &closures); // Start batches on subchannel call. if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: starting %" PRIuPTR " retriable batches on subchannel_call=%p", - chand, calld, num_closures, calld->subchannel_call); + chand, calld, closures.size(), calld->subchannel_call); } - execute_closures_in_call_combiner(elem, "start_retriable_subchannel_batches", - closures, num_closures); + // Note: This will yield the call combiner. + closures.RunClosures(calld->call_combiner); } // |