aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel/client_channel.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/filters/client_channel/client_channel.cc')
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc891
1 files changed, 398 insertions, 493 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index f141aabe70..8a29c80729 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -804,6 +804,7 @@ typedef struct {
// For intercepting recv_trailing_metadata.
grpc_metadata_batch recv_trailing_metadata;
grpc_transport_stream_stats collect_stats;
+ grpc_closure recv_trailing_metadata_ready;
// For intercepting on_complete.
grpc_closure on_complete;
} subchannel_batch_data;
@@ -1179,35 +1180,24 @@ static void pending_batches_fail(grpc_call_element* elem, grpc_error* error,
"chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
elem->channel_data, calld, num_batches, grpc_error_string(error));
}
- grpc_transport_stream_op_batch*
- batches[GPR_ARRAY_SIZE(calld->pending_batches)];
- size_t num_batches = 0;
+ grpc_core::CallCombinerClosureList closures;
for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
pending_batch* pending = &calld->pending_batches[i];
grpc_transport_stream_op_batch* batch = pending->batch;
if (batch != nullptr) {
- batches[num_batches++] = batch;
+ batch->handler_private.extra_arg = calld;
+ GRPC_CLOSURE_INIT(&batch->handler_private.closure,
+ fail_pending_batch_in_call_combiner, batch,
+ grpc_schedule_on_exec_ctx);
+ closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error),
+ "pending_batches_fail");
pending_batch_clear(calld, pending);
}
}
- for (size_t i = yield_call_combiner ? 1 : 0; i < num_batches; ++i) {
- grpc_transport_stream_op_batch* batch = batches[i];
- batch->handler_private.extra_arg = calld;
- GRPC_CLOSURE_INIT(&batch->handler_private.closure,
- fail_pending_batch_in_call_combiner, batch,
- grpc_schedule_on_exec_ctx);
- GRPC_CALL_COMBINER_START(calld->call_combiner,
- &batch->handler_private.closure,
- GRPC_ERROR_REF(error), "pending_batches_fail");
- }
if (yield_call_combiner) {
- if (num_batches > 0) {
- // Note: This will release the call combiner.
- grpc_transport_stream_op_batch_finish_with_failure(
- batches[0], GRPC_ERROR_REF(error), calld->call_combiner);
- } else {
- GRPC_CALL_COMBINER_STOP(calld->call_combiner, "pending_batches_fail");
- }
+ closures.RunClosures(calld->call_combiner);
+ } else {
+ closures.RunClosuresWithoutYielding(calld->call_combiner);
}
GRPC_ERROR_UNREF(error);
}
@@ -1242,30 +1232,22 @@ static void pending_batches_resume(grpc_call_element* elem) {
" pending batches on subchannel_call=%p",
chand, calld, num_batches, calld->subchannel_call);
}
- grpc_transport_stream_op_batch*
- batches[GPR_ARRAY_SIZE(calld->pending_batches)];
- size_t num_batches = 0;
+ grpc_core::CallCombinerClosureList closures;
for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
pending_batch* pending = &calld->pending_batches[i];
grpc_transport_stream_op_batch* batch = pending->batch;
if (batch != nullptr) {
- batches[num_batches++] = batch;
+ batch->handler_private.extra_arg = calld->subchannel_call;
+ GRPC_CLOSURE_INIT(&batch->handler_private.closure,
+ resume_pending_batch_in_call_combiner, batch,
+ grpc_schedule_on_exec_ctx);
+ closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
+ "pending_batches_resume");
pending_batch_clear(calld, pending);
}
}
- for (size_t i = 1; i < num_batches; ++i) {
- grpc_transport_stream_op_batch* batch = batches[i];
- batch->handler_private.extra_arg = calld->subchannel_call;
- GRPC_CLOSURE_INIT(&batch->handler_private.closure,
- resume_pending_batch_in_call_combiner, batch,
- grpc_schedule_on_exec_ctx);
- GRPC_CALL_COMBINER_START(calld->call_combiner,
- &batch->handler_private.closure, GRPC_ERROR_NONE,
- "pending_batches_resume");
- }
- GPR_ASSERT(num_batches > 0);
// Note: This will release the call combiner.
- grpc_subchannel_call_process_op(calld->subchannel_call, batches[0]);
+ closures.RunClosures(calld->call_combiner);
}
static void maybe_clear_pending_batch(grpc_call_element* elem,
@@ -1280,7 +1262,10 @@ static void maybe_clear_pending_batch(grpc_call_element* elem,
batch->payload->recv_initial_metadata.recv_initial_metadata_ready ==
nullptr) &&
(!batch->recv_message ||
- batch->payload->recv_message.recv_message_ready == nullptr)) {
+ batch->payload->recv_message.recv_message_ready == nullptr) &&
+ (!batch->recv_trailing_metadata ||
+ batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready ==
+ nullptr)) {
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand,
calld);
@@ -1289,75 +1274,27 @@ static void maybe_clear_pending_batch(grpc_call_element* elem,
}
}
-// Returns true if all ops in the pending batch have been completed.
-static bool pending_batch_is_completed(
- pending_batch* pending, call_data* calld,
- subchannel_call_retry_state* retry_state) {
- if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
- return false;
- }
- if (pending->batch->send_initial_metadata &&
- !retry_state->completed_send_initial_metadata) {
- return false;
- }
- if (pending->batch->send_message &&
- retry_state->completed_send_message_count <
- calld->send_messages->size()) {
- return false;
- }
- if (pending->batch->send_trailing_metadata &&
- !retry_state->completed_send_trailing_metadata) {
- return false;
- }
- if (pending->batch->recv_initial_metadata &&
- !retry_state->completed_recv_initial_metadata) {
- return false;
- }
- if (pending->batch->recv_message &&
- retry_state->completed_recv_message_count <
- retry_state->started_recv_message_count) {
- return false;
- }
- if (pending->batch->recv_trailing_metadata &&
- !retry_state->completed_recv_trailing_metadata) {
- return false;
- }
- return true;
-}
-
-// Returns true if any op in the batch was not yet started.
-static bool pending_batch_is_unstarted(
- pending_batch* pending, call_data* calld,
- subchannel_call_retry_state* retry_state) {
- if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
- return false;
- }
- if (pending->batch->send_initial_metadata &&
- !retry_state->started_send_initial_metadata) {
- return true;
- }
- if (pending->batch->send_message &&
- retry_state->started_send_message_count < calld->send_messages->size()) {
- return true;
- }
- if (pending->batch->send_trailing_metadata &&
- !retry_state->started_send_trailing_metadata) {
- return true;
- }
- if (pending->batch->recv_initial_metadata &&
- !retry_state->started_recv_initial_metadata) {
- return true;
- }
- if (pending->batch->recv_message &&
- retry_state->completed_recv_message_count ==
- retry_state->started_recv_message_count) {
- return true;
- }
- if (pending->batch->recv_trailing_metadata &&
- !retry_state->started_recv_trailing_metadata) {
- return true;
+// Returns a pointer to the first pending batch for which predicate(batch)
+// returns true, or null if not found.
+template <typename Predicate>
+static pending_batch* pending_batch_find(grpc_call_element* elem,
+ const char* log_message,
+ Predicate predicate) {
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
+ pending_batch* pending = &calld->pending_batches[i];
+ grpc_transport_stream_op_batch* batch = pending->batch;
+ if (batch != nullptr && predicate(batch)) {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "chand=%p calld=%p: %s pending batch at index %" PRIuPTR, chand,
+ calld, log_message, i);
+ }
+ return pending;
+ }
}
- return false;
+ return nullptr;
}
//
@@ -1544,8 +1481,13 @@ static bool maybe_retry(grpc_call_element* elem,
// subchannel_batch_data
//
+// Creates a subchannel_batch_data object on the call's arena with the
+// specified refcount. If set_on_complete is true, the batch's
+// on_complete callback will be set to point to on_complete();
+// otherwise, the batch's on_complete callback will be null.
static subchannel_batch_data* batch_data_create(grpc_call_element* elem,
- int refcount) {
+ int refcount,
+ bool set_on_complete) {
call_data* calld = static_cast<call_data*>(elem->call_data);
subchannel_call_retry_state* retry_state =
static_cast<subchannel_call_retry_state*>(
@@ -1558,9 +1500,11 @@ static subchannel_batch_data* batch_data_create(grpc_call_element* elem,
GRPC_SUBCHANNEL_CALL_REF(calld->subchannel_call, "batch_data_create");
batch_data->batch.payload = &retry_state->batch_payload;
gpr_ref_init(&batch_data->refs, refcount);
- GRPC_CLOSURE_INIT(&batch_data->on_complete, on_complete, batch_data,
- grpc_schedule_on_exec_ctx);
- batch_data->batch.on_complete = &batch_data->on_complete;
+ if (set_on_complete) {
+ GRPC_CLOSURE_INIT(&batch_data->on_complete, on_complete, batch_data,
+ grpc_schedule_on_exec_ctx);
+ batch_data->batch.on_complete = &batch_data->on_complete;
+ }
GRPC_CALL_STACK_REF(calld->owning_call, "batch_data");
return batch_data;
}
@@ -1593,26 +1537,14 @@ static void batch_data_unref(subchannel_batch_data* batch_data) {
static void invoke_recv_initial_metadata_callback(void* arg,
grpc_error* error) {
subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
- channel_data* chand =
- static_cast<channel_data*>(batch_data->elem->channel_data);
- call_data* calld = static_cast<call_data*>(batch_data->elem->call_data);
// Find pending batch.
- pending_batch* pending = nullptr;
- for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
- grpc_transport_stream_op_batch* batch = calld->pending_batches[i].batch;
- if (batch != nullptr && batch->recv_initial_metadata &&
- batch->payload->recv_initial_metadata.recv_initial_metadata_ready !=
- nullptr) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO,
- "chand=%p calld=%p: invoking recv_initial_metadata_ready for "
- "pending batch at index %" PRIuPTR,
- chand, calld, i);
- }
- pending = &calld->pending_batches[i];
- break;
- }
- }
+ pending_batch* pending = pending_batch_find(
+ batch_data->elem, "invoking recv_initial_metadata_ready for",
+ [](grpc_transport_stream_op_batch* batch) {
+ return batch->recv_initial_metadata &&
+ batch->payload->recv_initial_metadata
+ .recv_initial_metadata_ready != nullptr;
+ });
GPR_ASSERT(pending != nullptr);
// Return metadata.
grpc_metadata_batch_move(
@@ -1648,10 +1580,19 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
static_cast<subchannel_call_retry_state*>(
grpc_connected_subchannel_call_get_parent_data(
batch_data->subchannel_call));
+ retry_state->completed_recv_initial_metadata = true;
+ // If a retry was already dispatched, then we're not going to use the
+ // result of this recv_initial_metadata op, so do nothing.
+ if (retry_state->retry_dispatched) {
+ GRPC_CALL_COMBINER_STOP(
+ calld->call_combiner,
+ "recv_initial_metadata_ready after retry dispatched");
+ return;
+ }
// If we got an error or a Trailers-Only response and have not yet gotten
- // the recv_trailing_metadata on_complete callback, then defer
- // propagating this callback back to the surface. We can evaluate whether
- // to retry when recv_trailing_metadata comes back.
+ // the recv_trailing_metadata_ready callback, then defer propagating this
+ // callback back to the surface. We can evaluate whether to retry when
+ // recv_trailing_metadata comes back.
if (GPR_UNLIKELY((batch_data->trailing_metadata_available ||
error != GRPC_ERROR_NONE) &&
!retry_state->completed_recv_trailing_metadata)) {
@@ -1676,9 +1617,9 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
}
// Received valid initial metadata, so commit the call.
retry_commit(elem, retry_state);
+ // Invoke the callback to return the result to the surface.
// Manually invoking a callback function; it does not take ownership of error.
invoke_recv_initial_metadata_callback(batch_data, error);
- GRPC_ERROR_UNREF(error);
}
//
@@ -1688,25 +1629,13 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
// Invokes recv_message_ready for a subchannel batch.
static void invoke_recv_message_callback(void* arg, grpc_error* error) {
subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
- channel_data* chand =
- static_cast<channel_data*>(batch_data->elem->channel_data);
- call_data* calld = static_cast<call_data*>(batch_data->elem->call_data);
// Find pending op.
- pending_batch* pending = nullptr;
- for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
- grpc_transport_stream_op_batch* batch = calld->pending_batches[i].batch;
- if (batch != nullptr && batch->recv_message &&
- batch->payload->recv_message.recv_message_ready != nullptr) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO,
- "chand=%p calld=%p: invoking recv_message_ready for "
- "pending batch at index %" PRIuPTR,
- chand, calld, i);
- }
- pending = &calld->pending_batches[i];
- break;
- }
- }
+ pending_batch* pending = pending_batch_find(
+ batch_data->elem, "invoking recv_message_ready for",
+ [](grpc_transport_stream_op_batch* batch) {
+ return batch->recv_message &&
+ batch->payload->recv_message.recv_message_ready != nullptr;
+ });
GPR_ASSERT(pending != nullptr);
// Return payload.
*pending->batch->payload->recv_message.recv_message =
@@ -1738,10 +1667,18 @@ static void recv_message_ready(void* arg, grpc_error* error) {
static_cast<subchannel_call_retry_state*>(
grpc_connected_subchannel_call_get_parent_data(
batch_data->subchannel_call));
+ ++retry_state->completed_recv_message_count;
+ // If a retry was already dispatched, then we're not going to use the
+ // result of this recv_message op, so do nothing.
+ if (retry_state->retry_dispatched) {
+ GRPC_CALL_COMBINER_STOP(calld->call_combiner,
+ "recv_message_ready after retry dispatched");
+ return;
+ }
// If we got an error or the payload was nullptr and we have not yet gotten
- // the recv_trailing_metadata on_complete callback, then defer
- // propagating this callback back to the surface. We can evaluate whether
- // to retry when recv_trailing_metadata comes back.
+ // the recv_trailing_metadata_ready callback, then defer propagating this
+ // callback back to the surface. We can evaluate whether to retry when
+ // recv_trailing_metadata comes back.
if (GPR_UNLIKELY(
(batch_data->recv_message == nullptr || error != GRPC_ERROR_NONE) &&
!retry_state->completed_recv_trailing_metadata)) {
@@ -1764,133 +1701,268 @@ static void recv_message_ready(void* arg, grpc_error* error) {
}
// Received a valid message, so commit the call.
retry_commit(elem, retry_state);
+ // Invoke the callback to return the result to the surface.
// Manually invoking a callback function; it does not take ownership of error.
invoke_recv_message_callback(batch_data, error);
- GRPC_ERROR_UNREF(error);
}
//
-// list of closures to execute in call combiner
+// recv_trailing_metadata handling
//
-// Represents a closure that needs to run in the call combiner as part of
-// starting or completing a batch.
-typedef struct {
- grpc_closure* closure;
- grpc_error* error;
- const char* reason;
- bool free_reason = false;
-} closure_to_execute;
-
-static void execute_closures_in_call_combiner(grpc_call_element* elem,
- const char* caller,
- closure_to_execute* closures,
- size_t num_closures) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+// Sets *status and *server_pushback_md based on batch_data and error.
+static void get_call_status(subchannel_batch_data* batch_data,
+ grpc_error* error, grpc_status_code* status,
+ grpc_mdelem** server_pushback_md) {
+ grpc_call_element* elem = batch_data->elem;
call_data* calld = static_cast<call_data*>(elem->call_data);
- // Note that the call combiner will be yielded for each closure that
- // we schedule. We're already running in the call combiner, so one of
- // the closures can be scheduled directly, but the others will
- // have to re-enter the call combiner.
- if (num_closures > 0) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: %s starting closure: %s", chand,
- calld, caller, closures[0].reason);
- }
- GRPC_CLOSURE_SCHED(closures[0].closure, closures[0].error);
- if (closures[0].free_reason) {
- gpr_free(const_cast<char*>(closures[0].reason));
- }
- for (size_t i = 1; i < num_closures; ++i) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO,
- "chand=%p calld=%p: %s starting closure in call combiner: %s",
- chand, calld, caller, closures[i].reason);
- }
- GRPC_CALL_COMBINER_START(calld->call_combiner, closures[i].closure,
- closures[i].error, closures[i].reason);
- if (closures[i].free_reason) {
- gpr_free(const_cast<char*>(closures[i].reason));
- }
- }
+ if (error != GRPC_ERROR_NONE) {
+ grpc_error_get_status(error, calld->deadline, status, nullptr, nullptr,
+ nullptr);
} else {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: no closures to run for %s", chand,
- calld, caller);
+ grpc_metadata_batch* md_batch =
+ batch_data->batch.payload->recv_trailing_metadata
+ .recv_trailing_metadata;
+ GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
+ *status =
+ grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md);
+ if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
+ *server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
}
- GRPC_CALL_COMBINER_STOP(calld->call_combiner, "no closures to run");
}
+ GRPC_ERROR_UNREF(error);
}
-//
-// on_complete callback handling
-//
-
-// Updates retry_state to reflect the ops completed in batch_data.
-static void update_retry_state_for_completed_batch(
- subchannel_batch_data* batch_data,
- subchannel_call_retry_state* retry_state) {
- if (batch_data->batch.send_initial_metadata) {
- retry_state->completed_send_initial_metadata = true;
- }
- if (batch_data->batch.send_message) {
- ++retry_state->completed_send_message_count;
- }
- if (batch_data->batch.send_trailing_metadata) {
- retry_state->completed_send_trailing_metadata = true;
- }
- if (batch_data->batch.recv_initial_metadata) {
- retry_state->completed_recv_initial_metadata = true;
- }
- if (batch_data->batch.recv_message) {
- ++retry_state->completed_recv_message_count;
- }
- if (batch_data->batch.recv_trailing_metadata) {
- retry_state->completed_recv_trailing_metadata = true;
+// Adds recv_trailing_metadata_ready closure to closures.
+static void add_closure_for_recv_trailing_metadata_ready(
+ grpc_call_element* elem, subchannel_batch_data* batch_data,
+ grpc_error* error, grpc_core::CallCombinerClosureList* closures) {
+ // Find pending batch.
+ pending_batch* pending = pending_batch_find(
+ elem, "invoking recv_trailing_metadata for",
+ [](grpc_transport_stream_op_batch* batch) {
+ return batch->recv_trailing_metadata &&
+ batch->payload->recv_trailing_metadata
+ .recv_trailing_metadata_ready != nullptr;
+ });
+ // If we generated the recv_trailing_metadata op internally via
+ // start_internal_recv_trailing_metadata(), then there will be no
+ // pending batch.
+ if (pending == nullptr) {
+ GRPC_ERROR_UNREF(error);
+ return;
}
+ // Return metadata.
+ grpc_metadata_batch_move(
+ &batch_data->recv_trailing_metadata,
+ pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata);
+ // Add closure.
+ closures->Add(pending->batch->payload->recv_trailing_metadata
+ .recv_trailing_metadata_ready,
+ error, "recv_trailing_metadata_ready for pending batch");
+ // Update bookkeeping.
+ pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
+ nullptr;
+ maybe_clear_pending_batch(elem, pending);
}
// Adds any necessary closures for deferred recv_initial_metadata and
-// recv_message callbacks to closures, updating *num_closures as needed.
+// recv_message callbacks to closures.
static void add_closures_for_deferred_recv_callbacks(
subchannel_batch_data* batch_data, subchannel_call_retry_state* retry_state,
- closure_to_execute* closures, size_t* num_closures) {
+ grpc_core::CallCombinerClosureList* closures) {
if (batch_data->batch.recv_trailing_metadata) {
// Add closure for deferred recv_initial_metadata_ready.
if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch !=
nullptr)) {
- closure_to_execute* closure = &closures[(*num_closures)++];
- closure->closure = GRPC_CLOSURE_INIT(
- &batch_data->recv_initial_metadata_ready,
- invoke_recv_initial_metadata_callback,
- retry_state->recv_initial_metadata_ready_deferred_batch,
- grpc_schedule_on_exec_ctx);
- closure->error = retry_state->recv_initial_metadata_error;
- closure->reason = "resuming recv_initial_metadata_ready";
+ GRPC_CLOSURE_INIT(&batch_data->recv_initial_metadata_ready,
+ invoke_recv_initial_metadata_callback,
+ retry_state->recv_initial_metadata_ready_deferred_batch,
+ grpc_schedule_on_exec_ctx);
+ closures->Add(&batch_data->recv_initial_metadata_ready,
+ retry_state->recv_initial_metadata_error,
+ "resuming recv_initial_metadata_ready");
retry_state->recv_initial_metadata_ready_deferred_batch = nullptr;
}
// Add closure for deferred recv_message_ready.
if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch !=
nullptr)) {
- closure_to_execute* closure = &closures[(*num_closures)++];
- closure->closure = GRPC_CLOSURE_INIT(
- &batch_data->recv_message_ready, invoke_recv_message_callback,
- retry_state->recv_message_ready_deferred_batch,
- grpc_schedule_on_exec_ctx);
- closure->error = retry_state->recv_message_error;
- closure->reason = "resuming recv_message_ready";
+ GRPC_CLOSURE_INIT(&batch_data->recv_message_ready,
+ invoke_recv_message_callback,
+ retry_state->recv_message_ready_deferred_batch,
+ grpc_schedule_on_exec_ctx);
+ closures->Add(&batch_data->recv_message_ready,
+ retry_state->recv_message_error,
+ "resuming recv_message_ready");
retry_state->recv_message_ready_deferred_batch = nullptr;
}
}
}
+// Returns true if any op in the batch was not yet started.
+// Only looks at send ops, since recv ops are always started immediately.
+static bool pending_batch_is_unstarted(
+ pending_batch* pending, call_data* calld,
+ subchannel_call_retry_state* retry_state) {
+ if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
+ return false;
+ }
+ if (pending->batch->send_initial_metadata &&
+ !retry_state->started_send_initial_metadata) {
+ return true;
+ }
+ if (pending->batch->send_message &&
+ retry_state->started_send_message_count < calld->send_messages->size()) {
+ return true;
+ }
+ if (pending->batch->send_trailing_metadata &&
+ !retry_state->started_send_trailing_metadata) {
+ return true;
+ }
+ return false;
+}
+
+// For any pending batch containing an op that has not yet been started,
+// adds the pending batch's completion closures to closures.
+static void add_closures_to_fail_unstarted_pending_batches(
+ grpc_call_element* elem, subchannel_call_retry_state* retry_state,
+ grpc_error* error, grpc_core::CallCombinerClosureList* closures) {
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
+ pending_batch* pending = &calld->pending_batches[i];
+ if (pending_batch_is_unstarted(pending, calld, retry_state)) {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "chand=%p calld=%p: failing unstarted pending batch at index "
+ "%" PRIuPTR,
+ chand, calld, i);
+ }
+ closures->Add(pending->batch->on_complete, GRPC_ERROR_REF(error),
+ "failing on_complete for pending batch");
+ pending->batch->on_complete = nullptr;
+ maybe_clear_pending_batch(elem, pending);
+ }
+ }
+ GRPC_ERROR_UNREF(error);
+}
+
+// Runs necessary closures upon completion of a call attempt.
+static void run_closures_for_completed_call(subchannel_batch_data* batch_data,
+ grpc_error* error) {
+ grpc_call_element* elem = batch_data->elem;
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ subchannel_call_retry_state* retry_state =
+ static_cast<subchannel_call_retry_state*>(
+ grpc_connected_subchannel_call_get_parent_data(
+ batch_data->subchannel_call));
+ // Construct list of closures to execute.
+ grpc_core::CallCombinerClosureList closures;
+ // First, add closure for recv_trailing_metadata_ready.
+ add_closure_for_recv_trailing_metadata_ready(
+ elem, batch_data, GRPC_ERROR_REF(error), &closures);
+ // If there are deferred recv_initial_metadata_ready or recv_message_ready
+ // callbacks, add them to closures.
+ add_closures_for_deferred_recv_callbacks(batch_data, retry_state, &closures);
+ // Add closures to fail any pending batches that have not yet been started.
+ add_closures_to_fail_unstarted_pending_batches(
+ elem, retry_state, GRPC_ERROR_REF(error), &closures);
+ // Don't need batch_data anymore.
+ batch_data_unref(batch_data);
+ // Schedule all of the closures identified above.
+ // Note: This will release the call combiner.
+ closures.RunClosures(calld->call_combiner);
+ GRPC_ERROR_UNREF(error);
+}
+
+// Intercepts recv_trailing_metadata_ready callback for retries.
+// Commits the call and returns the trailing metadata up the stack.
+static void recv_trailing_metadata_ready(void* arg, grpc_error* error) {
+ subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
+ grpc_call_element* elem = batch_data->elem;
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "chand=%p calld=%p: got recv_trailing_metadata_ready, error=%s",
+ chand, calld, grpc_error_string(error));
+ }
+ subchannel_call_retry_state* retry_state =
+ static_cast<subchannel_call_retry_state*>(
+ grpc_connected_subchannel_call_get_parent_data(
+ batch_data->subchannel_call));
+ retry_state->completed_recv_trailing_metadata = true;
+ // Get the call's status and check for server pushback metadata.
+ grpc_status_code status = GRPC_STATUS_OK;
+ grpc_mdelem* server_pushback_md = nullptr;
+ get_call_status(batch_data, GRPC_ERROR_REF(error), &status,
+ &server_pushback_md);
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand,
+ calld, grpc_status_code_to_string(status));
+ }
+ // Check if we should retry.
+ if (maybe_retry(elem, batch_data, status, server_pushback_md)) {
+ // Unref batch_data for deferred recv_initial_metadata_ready or
+ // recv_message_ready callbacks, if any.
+ if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) {
+ batch_data_unref(batch_data);
+ GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
+ }
+ if (retry_state->recv_message_ready_deferred_batch != nullptr) {
+ batch_data_unref(batch_data);
+ GRPC_ERROR_UNREF(retry_state->recv_message_error);
+ }
+ batch_data_unref(batch_data);
+ return;
+ }
+ // Not retrying, so commit the call.
+ retry_commit(elem, retry_state);
+ // Run any necessary closures.
+ run_closures_for_completed_call(batch_data, GRPC_ERROR_REF(error));
+}
+
+//
+// on_complete callback handling
+//
+
+// Adds the on_complete closure for the pending batch completed in
+// batch_data to closures.
+static void add_closure_for_completed_pending_batch(
+ grpc_call_element* elem, subchannel_batch_data* batch_data,
+ subchannel_call_retry_state* retry_state, grpc_error* error,
+ grpc_core::CallCombinerClosureList* closures) {
+ pending_batch* pending = pending_batch_find(
+ elem, "completed", [batch_data](grpc_transport_stream_op_batch* batch) {
+ // Match the pending batch with the same set of send ops as the
+ // subchannel batch we've just completed.
+ return batch->on_complete != nullptr &&
+ batch_data->batch.send_initial_metadata ==
+ batch->send_initial_metadata &&
+ batch_data->batch.send_message == batch->send_message &&
+ batch_data->batch.send_trailing_metadata ==
+ batch->send_trailing_metadata;
+ });
+ // If batch_data is a replay batch, then there will be no pending
+ // batch to complete.
+ if (pending == nullptr) {
+ GRPC_ERROR_UNREF(error);
+ return;
+ }
+ // Add closure.
+ closures->Add(pending->batch->on_complete, error,
+ "on_complete for pending batch");
+ pending->batch->on_complete = nullptr;
+ maybe_clear_pending_batch(elem, pending);
+}
+
// If there are any cached ops to replay or pending ops to start on the
// subchannel call, adds a closure to closures to invoke
-// start_retriable_subchannel_batches(), updating *num_closures as needed.
+// start_retriable_subchannel_batches().
static void add_closures_for_replay_or_pending_send_ops(
grpc_call_element* elem, subchannel_batch_data* batch_data,
- subchannel_call_retry_state* retry_state, closure_to_execute* closures,
- size_t* num_closures) {
+ subchannel_call_retry_state* retry_state,
+ grpc_core::CallCombinerClosureList* closures) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
bool have_pending_send_message_ops =
@@ -1916,93 +1988,12 @@ static void add_closures_for_replay_or_pending_send_ops(
"chand=%p calld=%p: starting next batch for pending send op(s)",
chand, calld);
}
- closure_to_execute* closure = &closures[(*num_closures)++];
- closure->closure = GRPC_CLOSURE_INIT(
- &batch_data->batch.handler_private.closure,
- start_retriable_subchannel_batches, elem, grpc_schedule_on_exec_ctx);
- closure->error = GRPC_ERROR_NONE;
- closure->reason = "starting next batch for send_* op(s)";
- }
-}
-
-// For any pending batch completed in batch_data, adds the necessary
-// completion closures to closures, updating *num_closures as needed.
-static void add_closures_for_completed_pending_batches(
- grpc_call_element* elem, subchannel_batch_data* batch_data,
- subchannel_call_retry_state* retry_state, grpc_error* error,
- closure_to_execute* closures, size_t* num_closures) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
- pending_batch* pending = &calld->pending_batches[i];
- if (pending_batch_is_completed(pending, calld, retry_state)) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO,
- "chand=%p calld=%p: pending batch completed at index %" PRIuPTR,
- chand, calld, i);
- }
- // Copy the trailing metadata to return it to the surface.
- if (batch_data->batch.recv_trailing_metadata) {
- grpc_metadata_batch_move(&batch_data->recv_trailing_metadata,
- pending->batch->payload->recv_trailing_metadata
- .recv_trailing_metadata);
- }
- closure_to_execute* closure = &closures[(*num_closures)++];
- closure->closure = pending->batch->on_complete;
- closure->error = GRPC_ERROR_REF(error);
- closure->reason = "on_complete for pending batch";
- pending->batch->on_complete = nullptr;
- maybe_clear_pending_batch(elem, pending);
- }
+ GRPC_CLOSURE_INIT(&batch_data->batch.handler_private.closure,
+ start_retriable_subchannel_batches, elem,
+ grpc_schedule_on_exec_ctx);
+ closures->Add(&batch_data->batch.handler_private.closure, GRPC_ERROR_NONE,
+ "starting next batch for send_* op(s)");
}
- GRPC_ERROR_UNREF(error);
-}
-
-// For any pending batch containing an op that has not yet been started,
-// adds the pending batch's completion closures to closures, updating
-// *num_closures as needed.
-static void add_closures_to_fail_unstarted_pending_batches(
- grpc_call_element* elem, subchannel_call_retry_state* retry_state,
- grpc_error* error, closure_to_execute* closures, size_t* num_closures) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
- pending_batch* pending = &calld->pending_batches[i];
- if (pending_batch_is_unstarted(pending, calld, retry_state)) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO,
- "chand=%p calld=%p: failing unstarted pending batch at index "
- "%" PRIuPTR,
- chand, calld, i);
- }
- if (pending->batch->recv_initial_metadata) {
- closure_to_execute* closure = &closures[(*num_closures)++];
- closure->closure = pending->batch->payload->recv_initial_metadata
- .recv_initial_metadata_ready;
- closure->error = GRPC_ERROR_REF(error);
- closure->reason =
- "failing recv_initial_metadata_ready for pending batch";
- pending->batch->payload->recv_initial_metadata
- .recv_initial_metadata_ready = nullptr;
- }
- if (pending->batch->recv_message) {
- *pending->batch->payload->recv_message.recv_message = nullptr;
- closure_to_execute* closure = &closures[(*num_closures)++];
- closure->closure =
- pending->batch->payload->recv_message.recv_message_ready;
- closure->error = GRPC_ERROR_REF(error);
- closure->reason = "failing recv_message_ready for pending batch";
- pending->batch->payload->recv_message.recv_message_ready = nullptr;
- }
- closure_to_execute* closure = &closures[(*num_closures)++];
- closure->closure = pending->batch->on_complete;
- closure->error = GRPC_ERROR_REF(error);
- closure->reason = "failing on_complete for pending batch";
- pending->batch->on_complete = nullptr;
- maybe_clear_pending_batch(elem, pending);
- }
- }
- GRPC_ERROR_UNREF(error);
}
// Callback used to intercept on_complete from subchannel calls.
@@ -2022,136 +2013,49 @@ static void on_complete(void* arg, grpc_error* error) {
static_cast<subchannel_call_retry_state*>(
grpc_connected_subchannel_call_get_parent_data(
batch_data->subchannel_call));
- // If we have previously completed recv_trailing_metadata, then the
- // call is finished.
- bool call_finished = retry_state->completed_recv_trailing_metadata;
- // Record whether we were already committed before receiving this callback.
- const bool previously_committed = calld->retry_committed;
// Update bookkeeping in retry_state.
- update_retry_state_for_completed_batch(batch_data, retry_state);
- if (call_finished) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: call already finished", chand,
- calld);
- }
- } else {
- // Check if this batch finished the call, and if so, get its status.
- // The call is finished if either (a) this callback was invoked with
- // an error or (b) we receive status.
- grpc_status_code status = GRPC_STATUS_OK;
- grpc_mdelem* server_pushback_md = nullptr;
- if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { // Case (a).
- call_finished = true;
- grpc_error_get_status(error, calld->deadline, &status, nullptr, nullptr,
- nullptr);
- } else if (batch_data->batch.recv_trailing_metadata) { // Case (b).
- call_finished = true;
- grpc_metadata_batch* md_batch =
- batch_data->batch.payload->recv_trailing_metadata
- .recv_trailing_metadata;
- GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
- status = grpc_get_status_code_from_metadata(
- md_batch->idx.named.grpc_status->md);
- if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
- server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
- }
- }
- // If the call just finished, check if we should retry.
- if (call_finished) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand,
- calld, grpc_status_code_to_string(status));
- }
- if (maybe_retry(elem, batch_data, status, server_pushback_md)) {
- // Unref batch_data for deferred recv_initial_metadata_ready or
- // recv_message_ready callbacks, if any.
- if (batch_data->batch.recv_trailing_metadata &&
- retry_state->recv_initial_metadata_ready_deferred_batch !=
- nullptr) {
- batch_data_unref(batch_data);
- GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
- }
- if (batch_data->batch.recv_trailing_metadata &&
- retry_state->recv_message_ready_deferred_batch != nullptr) {
- batch_data_unref(batch_data);
- GRPC_ERROR_UNREF(retry_state->recv_message_error);
- }
- // Track number of pending subchannel send batches and determine if
- // this was the last one.
- bool last_callback_complete = false;
- if (batch_data->batch.send_initial_metadata ||
- batch_data->batch.send_message ||
- batch_data->batch.send_trailing_metadata) {
- --calld->num_pending_retriable_subchannel_send_batches;
- last_callback_complete =
- calld->num_pending_retriable_subchannel_send_batches == 0;
- }
- batch_data_unref(batch_data);
- // If we just completed the last subchannel send batch, unref the
- // call stack.
- if (last_callback_complete) {
- GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches");
- }
- return;
- }
- // Not retrying, so commit the call.
- retry_commit(elem, retry_state);
- }
+ if (batch_data->batch.send_initial_metadata) {
+ retry_state->completed_send_initial_metadata = true;
+ }
+ if (batch_data->batch.send_message) {
+ ++retry_state->completed_send_message_count;
}
- // If we were already committed before receiving this callback, free
- // cached data for send ops that we've just completed. (If the call has
- // just now finished, the call to retry_commit() above will have freed all
- // cached send ops, so we don't need to do it here.)
- if (previously_committed) {
+ if (batch_data->batch.send_trailing_metadata) {
+ retry_state->completed_send_trailing_metadata = true;
+ }
+ // If the call is committed, free cached data for send ops that we've just
+ // completed.
+ if (calld->retry_committed) {
free_cached_send_op_data_for_completed_batch(elem, batch_data, retry_state);
}
- // Call not being retried.
// Construct list of closures to execute.
- // Max number of closures is number of pending batches plus one for
- // each of:
- // - recv_initial_metadata_ready (either deferred or unstarted)
- // - recv_message_ready (either deferred or unstarted)
- // - starting a new batch for pending send ops
- closure_to_execute closures[GPR_ARRAY_SIZE(calld->pending_batches) + 3];
- size_t num_closures = 0;
- // If there are deferred recv_initial_metadata_ready or recv_message_ready
- // callbacks, add them to closures.
- add_closures_for_deferred_recv_callbacks(batch_data, retry_state, closures,
- &num_closures);
- // Find pending batches whose ops are now complete and add their
- // on_complete callbacks to closures.
- add_closures_for_completed_pending_batches(elem, batch_data, retry_state,
- GRPC_ERROR_REF(error), closures,
- &num_closures);
- // Add closures to handle any pending batches that have not yet been started.
- // If the call is finished, we fail these batches; otherwise, we add a
- // callback to start_retriable_subchannel_batches() to start them on
- // the subchannel call.
- if (call_finished) {
- add_closures_to_fail_unstarted_pending_batches(
- elem, retry_state, GRPC_ERROR_REF(error), closures, &num_closures);
- } else {
- add_closures_for_replay_or_pending_send_ops(elem, batch_data, retry_state,
- closures, &num_closures);
+ grpc_core::CallCombinerClosureList closures;
+ // If a retry was already dispatched, that means we saw
+ // recv_trailing_metadata before this, so we do nothing here.
+ // Otherwise, invoke the callback to return the result to the surface.
+ if (!retry_state->retry_dispatched) {
+ // Add closure for the completed pending batch, if any.
+ add_closure_for_completed_pending_batch(elem, batch_data, retry_state,
+ GRPC_ERROR_REF(error), &closures);
+ // If needed, add a callback to start any replay or pending send ops on
+ // the subchannel call.
+ if (!retry_state->completed_recv_trailing_metadata) {
+ add_closures_for_replay_or_pending_send_ops(elem, batch_data, retry_state,
+ &closures);
+ }
}
// Track number of pending subchannel send batches and determine if this
// was the last one.
- bool last_callback_complete = false;
- if (batch_data->batch.send_initial_metadata ||
- batch_data->batch.send_message ||
- batch_data->batch.send_trailing_metadata) {
- --calld->num_pending_retriable_subchannel_send_batches;
- last_callback_complete =
- calld->num_pending_retriable_subchannel_send_batches == 0;
- }
+ --calld->num_pending_retriable_subchannel_send_batches;
+ const bool last_send_batch_complete =
+ calld->num_pending_retriable_subchannel_send_batches == 0;
// Don't need batch_data anymore.
batch_data_unref(batch_data);
// Schedule all of the closures identified above.
// Note: This yeilds the call combiner.
- execute_closures_in_call_combiner(elem, "on_complete", closures,
- num_closures);
- // If we just completed the last subchannel send batch, unref the call stack.
- if (last_callback_complete) {
+ closures.RunClosures(calld->call_combiner);
+ // If this was the last subchannel send batch, unref the call stack.
+ if (last_send_batch_complete) {
GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches");
}
}
@@ -2172,27 +2076,22 @@ static void start_batch_in_call_combiner(void* arg, grpc_error* ignored) {
// Adds a closure to closures that will execute batch in the call combiner.
static void add_closure_for_subchannel_batch(
- call_data* calld, grpc_transport_stream_op_batch* batch,
- closure_to_execute* closures, size_t* num_closures) {
+ grpc_call_element* elem, grpc_transport_stream_op_batch* batch,
+ grpc_core::CallCombinerClosureList* closures) {
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
batch->handler_private.extra_arg = calld->subchannel_call;
GRPC_CLOSURE_INIT(&batch->handler_private.closure,
start_batch_in_call_combiner, batch,
grpc_schedule_on_exec_ctx);
- closure_to_execute* closure = &closures[(*num_closures)++];
- closure->closure = &batch->handler_private.closure;
- closure->error = GRPC_ERROR_NONE;
- // If the tracer is enabled, we log a more detailed message, which
- // requires dynamic allocation. This will be freed in
- // start_retriable_subchannel_batches().
if (grpc_client_channel_trace.enabled()) {
char* batch_str = grpc_transport_stream_op_batch_string(batch);
- gpr_asprintf(const_cast<char**>(&closure->reason),
- "starting batch in call combiner: %s", batch_str);
+ gpr_log(GPR_INFO, "chand=%p calld=%p: starting subchannel batch: %s", chand,
+ calld, batch_str);
gpr_free(batch_str);
- closure->free_reason = true;
- } else {
- closure->reason = "start_subchannel_batch";
}
+ closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
+ "start_subchannel_batch");
}
// Adds retriable send_initial_metadata op to batch_data.
@@ -2328,9 +2227,13 @@ static void add_retriable_recv_trailing_metadata_op(
grpc_metadata_batch_init(&batch_data->recv_trailing_metadata);
batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata =
&batch_data->recv_trailing_metadata;
- batch_data->batch.collect_stats = true;
- batch_data->batch.payload->collect_stats.collect_stats =
+ batch_data->batch.payload->recv_trailing_metadata.collect_stats =
&batch_data->collect_stats;
+ GRPC_CLOSURE_INIT(&batch_data->recv_trailing_metadata_ready,
+ recv_trailing_metadata_ready, batch_data,
+ grpc_schedule_on_exec_ctx);
+ batch_data->batch.payload->recv_trailing_metadata
+ .recv_trailing_metadata_ready = &batch_data->recv_trailing_metadata_ready;
}
// Helper function used to start a recv_trailing_metadata batch. This
@@ -2351,9 +2254,11 @@ static void start_internal_recv_trailing_metadata(grpc_call_element* elem) {
grpc_connected_subchannel_call_get_parent_data(
calld->subchannel_call));
// Create batch_data with 2 refs, since this batch will be unreffed twice:
- // once when the subchannel batch returns, and again when we actually get
- // a recv_trailing_metadata op from the surface.
- subchannel_batch_data* batch_data = batch_data_create(elem, 2);
+ // once for the recv_trailing_metadata_ready callback when the subchannel
+ // batch returns, and again when we actually get a recv_trailing_metadata
+ // op from the surface.
+ subchannel_batch_data* batch_data =
+ batch_data_create(elem, 2, false /* set_on_complete */);
add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
retry_state->recv_trailing_metadata_internal_batch = batch_data;
// Note: This will release the call combiner.
@@ -2378,7 +2283,7 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay(
"send_initial_metadata op",
chand, calld);
}
- replay_batch_data = batch_data_create(elem, 1);
+ replay_batch_data = batch_data_create(elem, 1, true /* set_on_complete */);
add_retriable_send_initial_metadata_op(calld, retry_state,
replay_batch_data);
}
@@ -2395,7 +2300,8 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay(
chand, calld);
}
if (replay_batch_data == nullptr) {
- replay_batch_data = batch_data_create(elem, 1);
+ replay_batch_data =
+ batch_data_create(elem, 1, true /* set_on_complete */);
}
add_retriable_send_message_op(elem, retry_state, replay_batch_data);
}
@@ -2414,7 +2320,8 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay(
chand, calld);
}
if (replay_batch_data == nullptr) {
- replay_batch_data = batch_data_create(elem, 1);
+ replay_batch_data =
+ batch_data_create(elem, 1, true /* set_on_complete */);
}
add_retriable_send_trailing_metadata_op(calld, retry_state,
replay_batch_data);
@@ -2426,7 +2333,7 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay(
// *num_batches as needed.
static void add_subchannel_batches_for_pending_batches(
grpc_call_element* elem, subchannel_call_retry_state* retry_state,
- closure_to_execute* closures, size_t* num_closures) {
+ grpc_core::CallCombinerClosureList* closures) {
call_data* calld = static_cast<call_data*>(elem->call_data);
for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
pending_batch* pending = &calld->pending_batches[i];
@@ -2482,13 +2389,11 @@ static void add_subchannel_batches_for_pending_batches(
if (retry_state->completed_recv_trailing_metadata) {
subchannel_batch_data* batch_data =
retry_state->recv_trailing_metadata_internal_batch;
- closure_to_execute* closure = &closures[(*num_closures)++];
- closure->closure = &batch_data->on_complete;
// Batches containing recv_trailing_metadata always succeed.
- closure->error = GRPC_ERROR_NONE;
- closure->reason =
- "re-executing on_complete for recv_trailing_metadata "
- "to propagate internally triggered result";
+ closures->Add(
+ &batch_data->recv_trailing_metadata_ready, GRPC_ERROR_NONE,
+ "re-executing recv_trailing_metadata_ready to propagate "
+ "internally triggered result");
} else {
batch_data_unref(retry_state->recv_trailing_metadata_internal_batch);
}
@@ -2500,14 +2405,19 @@ static void add_subchannel_batches_for_pending_batches(
if (calld->method_params == nullptr ||
calld->method_params->retry_policy() == nullptr ||
calld->retry_committed) {
- add_closure_for_subchannel_batch(calld, batch, closures, num_closures);
+ add_closure_for_subchannel_batch(elem, batch, closures);
pending_batch_clear(calld, pending);
continue;
}
// Create batch with the right number of callbacks.
- const int num_callbacks =
- 1 + batch->recv_initial_metadata + batch->recv_message;
- subchannel_batch_data* batch_data = batch_data_create(elem, num_callbacks);
+ const bool has_send_ops = batch->send_initial_metadata ||
+ batch->send_message ||
+ batch->send_trailing_metadata;
+ const int num_callbacks = has_send_ops + batch->recv_initial_metadata +
+ batch->recv_message +
+ batch->recv_trailing_metadata;
+ subchannel_batch_data* batch_data = batch_data_create(
+ elem, num_callbacks, has_send_ops /* set_on_complete */);
// Cache send ops if needed.
maybe_cache_send_ops_for_batch(calld, pending);
// send_initial_metadata.
@@ -2534,11 +2444,9 @@ static void add_subchannel_batches_for_pending_batches(
}
// recv_trailing_metadata.
if (batch->recv_trailing_metadata) {
- GPR_ASSERT(batch->collect_stats);
add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
}
- add_closure_for_subchannel_batch(calld, &batch_data->batch, closures,
- num_closures);
+ add_closure_for_subchannel_batch(elem, &batch_data->batch, closures);
// Track number of pending subchannel send batches.
// If this is the first one, take a ref to the call stack.
if (batch->send_initial_metadata || batch->send_message ||
@@ -2566,15 +2474,13 @@ static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) {
grpc_connected_subchannel_call_get_parent_data(
calld->subchannel_call));
// Construct list of closures to execute, one for each pending batch.
- // We can start up to 6 batches.
- closure_to_execute closures[GPR_ARRAY_SIZE(calld->pending_batches)];
- size_t num_closures = 0;
+ grpc_core::CallCombinerClosureList closures;
// Replay previously-returned send_* ops if needed.
subchannel_batch_data* replay_batch_data =
maybe_create_subchannel_batch_for_replay(elem, retry_state);
if (replay_batch_data != nullptr) {
- add_closure_for_subchannel_batch(calld, &replay_batch_data->batch, closures,
- &num_closures);
+ add_closure_for_subchannel_batch(elem, &replay_batch_data->batch,
+ &closures);
// Track number of pending subchannel send batches.
// If this is the first one, take a ref to the call stack.
if (calld->num_pending_retriable_subchannel_send_batches == 0) {
@@ -2583,17 +2489,16 @@ static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) {
++calld->num_pending_retriable_subchannel_send_batches;
}
// Now add pending batches.
- add_subchannel_batches_for_pending_batches(elem, retry_state, closures,
- &num_closures);
+ add_subchannel_batches_for_pending_batches(elem, retry_state, &closures);
// Start batches on subchannel call.
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: starting %" PRIuPTR
" retriable batches on subchannel_call=%p",
- chand, calld, num_closures, calld->subchannel_call);
+ chand, calld, closures.size(), calld->subchannel_call);
}
- execute_closures_in_call_combiner(elem, "start_retriable_subchannel_batches",
- closures, num_closures);
+ // Note: This will yield the call combiner.
+ closures.RunClosures(calld->call_combiner);
}
//