aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel/client_channel.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/filters/client_channel/client_channel.cc')
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc891
1 files changed, 493 insertions, 398 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index 34ea97e23e..ea6775a8d8 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -817,7 +817,6 @@ typedef struct {
// For intercepting recv_trailing_metadata.
grpc_metadata_batch recv_trailing_metadata;
grpc_transport_stream_stats collect_stats;
- grpc_closure recv_trailing_metadata_ready;
// For intercepting on_complete.
grpc_closure on_complete;
} subchannel_batch_data;
@@ -1193,24 +1192,35 @@ static void pending_batches_fail(grpc_call_element* elem, grpc_error* error,
"chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
elem->channel_data, calld, num_batches, grpc_error_string(error));
}
- grpc_core::CallCombinerClosureList closures;
+ grpc_transport_stream_op_batch*
+ batches[GPR_ARRAY_SIZE(calld->pending_batches)];
+ size_t num_batches = 0;
for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
pending_batch* pending = &calld->pending_batches[i];
grpc_transport_stream_op_batch* batch = pending->batch;
if (batch != nullptr) {
- batch->handler_private.extra_arg = calld;
- GRPC_CLOSURE_INIT(&batch->handler_private.closure,
- fail_pending_batch_in_call_combiner, batch,
- grpc_schedule_on_exec_ctx);
- closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error),
- "pending_batches_fail");
+ batches[num_batches++] = batch;
pending_batch_clear(calld, pending);
}
}
+ for (size_t i = yield_call_combiner ? 1 : 0; i < num_batches; ++i) {
+ grpc_transport_stream_op_batch* batch = batches[i];
+ batch->handler_private.extra_arg = calld;
+ GRPC_CLOSURE_INIT(&batch->handler_private.closure,
+ fail_pending_batch_in_call_combiner, batch,
+ grpc_schedule_on_exec_ctx);
+ GRPC_CALL_COMBINER_START(calld->call_combiner,
+ &batch->handler_private.closure,
+ GRPC_ERROR_REF(error), "pending_batches_fail");
+ }
if (yield_call_combiner) {
- closures.RunClosures(calld->call_combiner);
- } else {
- closures.RunClosuresWithoutYielding(calld->call_combiner);
+ if (num_batches > 0) {
+ // Note: This will release the call combiner.
+ grpc_transport_stream_op_batch_finish_with_failure(
+ batches[0], GRPC_ERROR_REF(error), calld->call_combiner);
+ } else {
+ GRPC_CALL_COMBINER_STOP(calld->call_combiner, "pending_batches_fail");
+ }
}
GRPC_ERROR_UNREF(error);
}
@@ -1245,22 +1255,30 @@ static void pending_batches_resume(grpc_call_element* elem) {
" pending batches on subchannel_call=%p",
chand, calld, num_batches, calld->subchannel_call);
}
- grpc_core::CallCombinerClosureList closures;
+ grpc_transport_stream_op_batch*
+ batches[GPR_ARRAY_SIZE(calld->pending_batches)];
+ size_t num_batches = 0;
for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
pending_batch* pending = &calld->pending_batches[i];
grpc_transport_stream_op_batch* batch = pending->batch;
if (batch != nullptr) {
- batch->handler_private.extra_arg = calld->subchannel_call;
- GRPC_CLOSURE_INIT(&batch->handler_private.closure,
- resume_pending_batch_in_call_combiner, batch,
- grpc_schedule_on_exec_ctx);
- closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
- "pending_batches_resume");
+ batches[num_batches++] = batch;
pending_batch_clear(calld, pending);
}
}
+ for (size_t i = 1; i < num_batches; ++i) {
+ grpc_transport_stream_op_batch* batch = batches[i];
+ batch->handler_private.extra_arg = calld->subchannel_call;
+ GRPC_CLOSURE_INIT(&batch->handler_private.closure,
+ resume_pending_batch_in_call_combiner, batch,
+ grpc_schedule_on_exec_ctx);
+ GRPC_CALL_COMBINER_START(calld->call_combiner,
+ &batch->handler_private.closure, GRPC_ERROR_NONE,
+ "pending_batches_resume");
+ }
+ GPR_ASSERT(num_batches > 0);
// Note: This will release the call combiner.
- closures.RunClosures(calld->call_combiner);
+ grpc_subchannel_call_process_op(calld->subchannel_call, batches[0]);
}
static void maybe_clear_pending_batch(grpc_call_element* elem,
@@ -1275,10 +1293,7 @@ static void maybe_clear_pending_batch(grpc_call_element* elem,
batch->payload->recv_initial_metadata.recv_initial_metadata_ready ==
nullptr) &&
(!batch->recv_message ||
- batch->payload->recv_message.recv_message_ready == nullptr) &&
- (!batch->recv_trailing_metadata ||
- batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready ==
- nullptr)) {
+ batch->payload->recv_message.recv_message_ready == nullptr)) {
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand,
calld);
@@ -1287,27 +1302,75 @@ static void maybe_clear_pending_batch(grpc_call_element* elem,
}
}
-// Returns a pointer to the first pending batch for which predicate(batch)
-// returns true, or null if not found.
-template <typename Predicate>
-static pending_batch* pending_batch_find(grpc_call_element* elem,
- const char* log_message,
- Predicate predicate) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
- pending_batch* pending = &calld->pending_batches[i];
- grpc_transport_stream_op_batch* batch = pending->batch;
- if (batch != nullptr && predicate(batch)) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO,
- "chand=%p calld=%p: %s pending batch at index %" PRIuPTR, chand,
- calld, log_message, i);
- }
- return pending;
- }
+// Returns true if all ops in the pending batch have been completed.
+static bool pending_batch_is_completed(
+ pending_batch* pending, call_data* calld,
+ subchannel_call_retry_state* retry_state) {
+ if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
+ return false;
+ }
+ if (pending->batch->send_initial_metadata &&
+ !retry_state->completed_send_initial_metadata) {
+ return false;
+ }
+ if (pending->batch->send_message &&
+ retry_state->completed_send_message_count <
+ calld->send_messages->size()) {
+ return false;
+ }
+ if (pending->batch->send_trailing_metadata &&
+ !retry_state->completed_send_trailing_metadata) {
+ return false;
+ }
+ if (pending->batch->recv_initial_metadata &&
+ !retry_state->completed_recv_initial_metadata) {
+ return false;
+ }
+ if (pending->batch->recv_message &&
+ retry_state->completed_recv_message_count <
+ retry_state->started_recv_message_count) {
+ return false;
+ }
+ if (pending->batch->recv_trailing_metadata &&
+ !retry_state->completed_recv_trailing_metadata) {
+ return false;
}
- return nullptr;
+ return true;
+}
+
+// Returns true if any op in the batch was not yet started.
+static bool pending_batch_is_unstarted(
+ pending_batch* pending, call_data* calld,
+ subchannel_call_retry_state* retry_state) {
+ if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
+ return false;
+ }
+ if (pending->batch->send_initial_metadata &&
+ !retry_state->started_send_initial_metadata) {
+ return true;
+ }
+ if (pending->batch->send_message &&
+ retry_state->started_send_message_count < calld->send_messages->size()) {
+ return true;
+ }
+ if (pending->batch->send_trailing_metadata &&
+ !retry_state->started_send_trailing_metadata) {
+ return true;
+ }
+ if (pending->batch->recv_initial_metadata &&
+ !retry_state->started_recv_initial_metadata) {
+ return true;
+ }
+ if (pending->batch->recv_message &&
+ retry_state->completed_recv_message_count ==
+ retry_state->started_recv_message_count) {
+ return true;
+ }
+ if (pending->batch->recv_trailing_metadata &&
+ !retry_state->started_recv_trailing_metadata) {
+ return true;
+ }
+ return false;
}
//
@@ -1494,13 +1557,8 @@ static bool maybe_retry(grpc_call_element* elem,
// subchannel_batch_data
//
-// Creates a subchannel_batch_data object on the call's arena with the
-// specified refcount. If set_on_complete is true, the batch's
-// on_complete callback will be set to point to on_complete();
-// otherwise, the batch's on_complete callback will be null.
static subchannel_batch_data* batch_data_create(grpc_call_element* elem,
- int refcount,
- bool set_on_complete) {
+ int refcount) {
call_data* calld = static_cast<call_data*>(elem->call_data);
subchannel_call_retry_state* retry_state =
static_cast<subchannel_call_retry_state*>(
@@ -1513,11 +1571,9 @@ static subchannel_batch_data* batch_data_create(grpc_call_element* elem,
GRPC_SUBCHANNEL_CALL_REF(calld->subchannel_call, "batch_data_create");
batch_data->batch.payload = &retry_state->batch_payload;
gpr_ref_init(&batch_data->refs, refcount);
- if (set_on_complete) {
- GRPC_CLOSURE_INIT(&batch_data->on_complete, on_complete, batch_data,
- grpc_schedule_on_exec_ctx);
- batch_data->batch.on_complete = &batch_data->on_complete;
- }
+ GRPC_CLOSURE_INIT(&batch_data->on_complete, on_complete, batch_data,
+ grpc_schedule_on_exec_ctx);
+ batch_data->batch.on_complete = &batch_data->on_complete;
GRPC_CALL_STACK_REF(calld->owning_call, "batch_data");
return batch_data;
}
@@ -1550,14 +1606,26 @@ static void batch_data_unref(subchannel_batch_data* batch_data) {
static void invoke_recv_initial_metadata_callback(void* arg,
grpc_error* error) {
subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
+ channel_data* chand =
+ static_cast<channel_data*>(batch_data->elem->channel_data);
+ call_data* calld = static_cast<call_data*>(batch_data->elem->call_data);
// Find pending batch.
- pending_batch* pending = pending_batch_find(
- batch_data->elem, "invoking recv_initial_metadata_ready for",
- [](grpc_transport_stream_op_batch* batch) {
- return batch->recv_initial_metadata &&
- batch->payload->recv_initial_metadata
- .recv_initial_metadata_ready != nullptr;
- });
+ pending_batch* pending = nullptr;
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
+ grpc_transport_stream_op_batch* batch = calld->pending_batches[i].batch;
+ if (batch != nullptr && batch->recv_initial_metadata &&
+ batch->payload->recv_initial_metadata.recv_initial_metadata_ready !=
+ nullptr) {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "chand=%p calld=%p: invoking recv_initial_metadata_ready for "
+ "pending batch at index %" PRIuPTR,
+ chand, calld, i);
+ }
+ pending = &calld->pending_batches[i];
+ break;
+ }
+ }
GPR_ASSERT(pending != nullptr);
// Return metadata.
grpc_metadata_batch_move(
@@ -1593,19 +1661,10 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
static_cast<subchannel_call_retry_state*>(
grpc_connected_subchannel_call_get_parent_data(
batch_data->subchannel_call));
- retry_state->completed_recv_initial_metadata = true;
- // If a retry was already dispatched, then we're not going to use the
- // result of this recv_initial_metadata op, so do nothing.
- if (retry_state->retry_dispatched) {
- GRPC_CALL_COMBINER_STOP(
- calld->call_combiner,
- "recv_initial_metadata_ready after retry dispatched");
- return;
- }
// If we got an error or a Trailers-Only response and have not yet gotten
- // the recv_trailing_metadata_ready callback, then defer propagating this
- // callback back to the surface. We can evaluate whether to retry when
- // recv_trailing_metadata comes back.
+ // the recv_trailing_metadata on_complete callback, then defer
+ // propagating this callback back to the surface. We can evaluate whether
+ // to retry when recv_trailing_metadata comes back.
if (GPR_UNLIKELY((batch_data->trailing_metadata_available ||
error != GRPC_ERROR_NONE) &&
!retry_state->completed_recv_trailing_metadata)) {
@@ -1630,9 +1689,9 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
}
// Received valid initial metadata, so commit the call.
retry_commit(elem, retry_state);
- // Invoke the callback to return the result to the surface.
// Manually invoking a callback function; it does not take ownership of error.
invoke_recv_initial_metadata_callback(batch_data, error);
+ GRPC_ERROR_UNREF(error);
}
//
@@ -1642,13 +1701,25 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
// Invokes recv_message_ready for a subchannel batch.
static void invoke_recv_message_callback(void* arg, grpc_error* error) {
subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
+ channel_data* chand =
+ static_cast<channel_data*>(batch_data->elem->channel_data);
+ call_data* calld = static_cast<call_data*>(batch_data->elem->call_data);
// Find pending op.
- pending_batch* pending = pending_batch_find(
- batch_data->elem, "invoking recv_message_ready for",
- [](grpc_transport_stream_op_batch* batch) {
- return batch->recv_message &&
- batch->payload->recv_message.recv_message_ready != nullptr;
- });
+ pending_batch* pending = nullptr;
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
+ grpc_transport_stream_op_batch* batch = calld->pending_batches[i].batch;
+ if (batch != nullptr && batch->recv_message &&
+ batch->payload->recv_message.recv_message_ready != nullptr) {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "chand=%p calld=%p: invoking recv_message_ready for "
+ "pending batch at index %" PRIuPTR,
+ chand, calld, i);
+ }
+ pending = &calld->pending_batches[i];
+ break;
+ }
+ }
GPR_ASSERT(pending != nullptr);
// Return payload.
*pending->batch->payload->recv_message.recv_message =
@@ -1680,18 +1751,10 @@ static void recv_message_ready(void* arg, grpc_error* error) {
static_cast<subchannel_call_retry_state*>(
grpc_connected_subchannel_call_get_parent_data(
batch_data->subchannel_call));
- ++retry_state->completed_recv_message_count;
- // If a retry was already dispatched, then we're not going to use the
- // result of this recv_message op, so do nothing.
- if (retry_state->retry_dispatched) {
- GRPC_CALL_COMBINER_STOP(calld->call_combiner,
- "recv_message_ready after retry dispatched");
- return;
- }
// If we got an error or the payload was nullptr and we have not yet gotten
- // the recv_trailing_metadata_ready callback, then defer propagating this
- // callback back to the surface. We can evaluate whether to retry when
- // recv_trailing_metadata comes back.
+ // the recv_trailing_metadata on_complete callback, then defer
+ // propagating this callback back to the surface. We can evaluate whether
+ // to retry when recv_trailing_metadata comes back.
if (GPR_UNLIKELY(
(batch_data->recv_message == nullptr || error != GRPC_ERROR_NONE) &&
!retry_state->completed_recv_trailing_metadata)) {
@@ -1714,268 +1777,133 @@ static void recv_message_ready(void* arg, grpc_error* error) {
}
// Received a valid message, so commit the call.
retry_commit(elem, retry_state);
- // Invoke the callback to return the result to the surface.
// Manually invoking a callback function; it does not take ownership of error.
invoke_recv_message_callback(batch_data, error);
+ GRPC_ERROR_UNREF(error);
}
//
-// recv_trailing_metadata handling
+// list of closures to execute in call combiner
//
-// Sets *status and *server_pushback_md based on batch_data and error.
-static void get_call_status(subchannel_batch_data* batch_data,
- grpc_error* error, grpc_status_code* status,
- grpc_mdelem** server_pushback_md) {
- grpc_call_element* elem = batch_data->elem;
+// Represents a closure that needs to run in the call combiner as part of
+// starting or completing a batch.
+typedef struct {
+ grpc_closure* closure;
+ grpc_error* error;
+ const char* reason;
+ bool free_reason = false;
+} closure_to_execute;
+
+static void execute_closures_in_call_combiner(grpc_call_element* elem,
+ const char* caller,
+ closure_to_execute* closures,
+ size_t num_closures) {
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
- if (error != GRPC_ERROR_NONE) {
- grpc_error_get_status(error, calld->deadline, status, nullptr, nullptr,
- nullptr);
+ // Note that the call combiner will be yielded for each closure that
+ // we schedule. We're already running in the call combiner, so one of
+ // the closures can be scheduled directly, but the others will
+ // have to re-enter the call combiner.
+ if (num_closures > 0) {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO, "chand=%p calld=%p: %s starting closure: %s", chand,
+ calld, caller, closures[0].reason);
+ }
+ GRPC_CLOSURE_SCHED(closures[0].closure, closures[0].error);
+ if (closures[0].free_reason) {
+ gpr_free(const_cast<char*>(closures[0].reason));
+ }
+ for (size_t i = 1; i < num_closures; ++i) {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "chand=%p calld=%p: %s starting closure in call combiner: %s",
+ chand, calld, caller, closures[i].reason);
+ }
+ GRPC_CALL_COMBINER_START(calld->call_combiner, closures[i].closure,
+ closures[i].error, closures[i].reason);
+ if (closures[i].free_reason) {
+ gpr_free(const_cast<char*>(closures[i].reason));
+ }
+ }
} else {
- grpc_metadata_batch* md_batch =
- batch_data->batch.payload->recv_trailing_metadata
- .recv_trailing_metadata;
- GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
- *status =
- grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md);
- if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
- *server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO, "chand=%p calld=%p: no closures to run for %s", chand,
+ calld, caller);
}
+ GRPC_CALL_COMBINER_STOP(calld->call_combiner, "no closures to run");
}
- GRPC_ERROR_UNREF(error);
}
-// Adds recv_trailing_metadata_ready closure to closures.
-static void add_closure_for_recv_trailing_metadata_ready(
- grpc_call_element* elem, subchannel_batch_data* batch_data,
- grpc_error* error, grpc_core::CallCombinerClosureList* closures) {
- // Find pending batch.
- pending_batch* pending = pending_batch_find(
- elem, "invoking recv_trailing_metadata for",
- [](grpc_transport_stream_op_batch* batch) {
- return batch->recv_trailing_metadata &&
- batch->payload->recv_trailing_metadata
- .recv_trailing_metadata_ready != nullptr;
- });
- // If we generated the recv_trailing_metadata op internally via
- // start_internal_recv_trailing_metadata(), then there will be no
- // pending batch.
- if (pending == nullptr) {
- GRPC_ERROR_UNREF(error);
- return;
+//
+// on_complete callback handling
+//
+
+// Updates retry_state to reflect the ops completed in batch_data.
+static void update_retry_state_for_completed_batch(
+ subchannel_batch_data* batch_data,
+ subchannel_call_retry_state* retry_state) {
+ if (batch_data->batch.send_initial_metadata) {
+ retry_state->completed_send_initial_metadata = true;
+ }
+ if (batch_data->batch.send_message) {
+ ++retry_state->completed_send_message_count;
+ }
+ if (batch_data->batch.send_trailing_metadata) {
+ retry_state->completed_send_trailing_metadata = true;
+ }
+ if (batch_data->batch.recv_initial_metadata) {
+ retry_state->completed_recv_initial_metadata = true;
+ }
+ if (batch_data->batch.recv_message) {
+ ++retry_state->completed_recv_message_count;
+ }
+ if (batch_data->batch.recv_trailing_metadata) {
+ retry_state->completed_recv_trailing_metadata = true;
}
- // Return metadata.
- grpc_metadata_batch_move(
- &batch_data->recv_trailing_metadata,
- pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata);
- // Add closure.
- closures->Add(pending->batch->payload->recv_trailing_metadata
- .recv_trailing_metadata_ready,
- error, "recv_trailing_metadata_ready for pending batch");
- // Update bookkeeping.
- pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
- nullptr;
- maybe_clear_pending_batch(elem, pending);
}
// Adds any necessary closures for deferred recv_initial_metadata and
-// recv_message callbacks to closures.
+// recv_message callbacks to closures, updating *num_closures as needed.
static void add_closures_for_deferred_recv_callbacks(
subchannel_batch_data* batch_data, subchannel_call_retry_state* retry_state,
- grpc_core::CallCombinerClosureList* closures) {
+ closure_to_execute* closures, size_t* num_closures) {
if (batch_data->batch.recv_trailing_metadata) {
// Add closure for deferred recv_initial_metadata_ready.
if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch !=
nullptr)) {
- GRPC_CLOSURE_INIT(&batch_data->recv_initial_metadata_ready,
- invoke_recv_initial_metadata_callback,
- retry_state->recv_initial_metadata_ready_deferred_batch,
- grpc_schedule_on_exec_ctx);
- closures->Add(&batch_data->recv_initial_metadata_ready,
- retry_state->recv_initial_metadata_error,
- "resuming recv_initial_metadata_ready");
+ closure_to_execute* closure = &closures[(*num_closures)++];
+ closure->closure = GRPC_CLOSURE_INIT(
+ &batch_data->recv_initial_metadata_ready,
+ invoke_recv_initial_metadata_callback,
+ retry_state->recv_initial_metadata_ready_deferred_batch,
+ grpc_schedule_on_exec_ctx);
+ closure->error = retry_state->recv_initial_metadata_error;
+ closure->reason = "resuming recv_initial_metadata_ready";
retry_state->recv_initial_metadata_ready_deferred_batch = nullptr;
}
// Add closure for deferred recv_message_ready.
if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch !=
nullptr)) {
- GRPC_CLOSURE_INIT(&batch_data->recv_message_ready,
- invoke_recv_message_callback,
- retry_state->recv_message_ready_deferred_batch,
- grpc_schedule_on_exec_ctx);
- closures->Add(&batch_data->recv_message_ready,
- retry_state->recv_message_error,
- "resuming recv_message_ready");
+ closure_to_execute* closure = &closures[(*num_closures)++];
+ closure->closure = GRPC_CLOSURE_INIT(
+ &batch_data->recv_message_ready, invoke_recv_message_callback,
+ retry_state->recv_message_ready_deferred_batch,
+ grpc_schedule_on_exec_ctx);
+ closure->error = retry_state->recv_message_error;
+ closure->reason = "resuming recv_message_ready";
retry_state->recv_message_ready_deferred_batch = nullptr;
}
}
}
-// Returns true if any op in the batch was not yet started.
-// Only looks at send ops, since recv ops are always started immediately.
-static bool pending_batch_is_unstarted(
- pending_batch* pending, call_data* calld,
- subchannel_call_retry_state* retry_state) {
- if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
- return false;
- }
- if (pending->batch->send_initial_metadata &&
- !retry_state->started_send_initial_metadata) {
- return true;
- }
- if (pending->batch->send_message &&
- retry_state->started_send_message_count < calld->send_messages->size()) {
- return true;
- }
- if (pending->batch->send_trailing_metadata &&
- !retry_state->started_send_trailing_metadata) {
- return true;
- }
- return false;
-}
-
-// For any pending batch containing an op that has not yet been started,
-// adds the pending batch's completion closures to closures.
-static void add_closures_to_fail_unstarted_pending_batches(
- grpc_call_element* elem, subchannel_call_retry_state* retry_state,
- grpc_error* error, grpc_core::CallCombinerClosureList* closures) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
- pending_batch* pending = &calld->pending_batches[i];
- if (pending_batch_is_unstarted(pending, calld, retry_state)) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO,
- "chand=%p calld=%p: failing unstarted pending batch at index "
- "%" PRIuPTR,
- chand, calld, i);
- }
- closures->Add(pending->batch->on_complete, GRPC_ERROR_REF(error),
- "failing on_complete for pending batch");
- pending->batch->on_complete = nullptr;
- maybe_clear_pending_batch(elem, pending);
- }
- }
- GRPC_ERROR_UNREF(error);
-}
-
-// Runs necessary closures upon completion of a call attempt.
-static void run_closures_for_completed_call(subchannel_batch_data* batch_data,
- grpc_error* error) {
- grpc_call_element* elem = batch_data->elem;
- call_data* calld = static_cast<call_data*>(elem->call_data);
- subchannel_call_retry_state* retry_state =
- static_cast<subchannel_call_retry_state*>(
- grpc_connected_subchannel_call_get_parent_data(
- batch_data->subchannel_call));
- // Construct list of closures to execute.
- grpc_core::CallCombinerClosureList closures;
- // First, add closure for recv_trailing_metadata_ready.
- add_closure_for_recv_trailing_metadata_ready(
- elem, batch_data, GRPC_ERROR_REF(error), &closures);
- // If there are deferred recv_initial_metadata_ready or recv_message_ready
- // callbacks, add them to closures.
- add_closures_for_deferred_recv_callbacks(batch_data, retry_state, &closures);
- // Add closures to fail any pending batches that have not yet been started.
- add_closures_to_fail_unstarted_pending_batches(
- elem, retry_state, GRPC_ERROR_REF(error), &closures);
- // Don't need batch_data anymore.
- batch_data_unref(batch_data);
- // Schedule all of the closures identified above.
- // Note: This will release the call combiner.
- closures.RunClosures(calld->call_combiner);
- GRPC_ERROR_UNREF(error);
-}
-
-// Intercepts recv_trailing_metadata_ready callback for retries.
-// Commits the call and returns the trailing metadata up the stack.
-static void recv_trailing_metadata_ready(void* arg, grpc_error* error) {
- subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
- grpc_call_element* elem = batch_data->elem;
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO,
- "chand=%p calld=%p: got recv_trailing_metadata_ready, error=%s",
- chand, calld, grpc_error_string(error));
- }
- subchannel_call_retry_state* retry_state =
- static_cast<subchannel_call_retry_state*>(
- grpc_connected_subchannel_call_get_parent_data(
- batch_data->subchannel_call));
- retry_state->completed_recv_trailing_metadata = true;
- // Get the call's status and check for server pushback metadata.
- grpc_status_code status = GRPC_STATUS_OK;
- grpc_mdelem* server_pushback_md = nullptr;
- get_call_status(batch_data, GRPC_ERROR_REF(error), &status,
- &server_pushback_md);
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand,
- calld, grpc_status_code_to_string(status));
- }
- // Check if we should retry.
- if (maybe_retry(elem, batch_data, status, server_pushback_md)) {
- // Unref batch_data for deferred recv_initial_metadata_ready or
- // recv_message_ready callbacks, if any.
- if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) {
- batch_data_unref(batch_data);
- GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
- }
- if (retry_state->recv_message_ready_deferred_batch != nullptr) {
- batch_data_unref(batch_data);
- GRPC_ERROR_UNREF(retry_state->recv_message_error);
- }
- batch_data_unref(batch_data);
- return;
- }
- // Not retrying, so commit the call.
- retry_commit(elem, retry_state);
- // Run any necessary closures.
- run_closures_for_completed_call(batch_data, GRPC_ERROR_REF(error));
-}
-
-//
-// on_complete callback handling
-//
-
-// Adds the on_complete closure for the pending batch completed in
-// batch_data to closures.
-static void add_closure_for_completed_pending_batch(
- grpc_call_element* elem, subchannel_batch_data* batch_data,
- subchannel_call_retry_state* retry_state, grpc_error* error,
- grpc_core::CallCombinerClosureList* closures) {
- pending_batch* pending = pending_batch_find(
- elem, "completed", [batch_data](grpc_transport_stream_op_batch* batch) {
- // Match the pending batch with the same set of send ops as the
- // subchannel batch we've just completed.
- return batch->on_complete != nullptr &&
- batch_data->batch.send_initial_metadata ==
- batch->send_initial_metadata &&
- batch_data->batch.send_message == batch->send_message &&
- batch_data->batch.send_trailing_metadata ==
- batch->send_trailing_metadata;
- });
- // If batch_data is a replay batch, then there will be no pending
- // batch to complete.
- if (pending == nullptr) {
- GRPC_ERROR_UNREF(error);
- return;
- }
- // Add closure.
- closures->Add(pending->batch->on_complete, error,
- "on_complete for pending batch");
- pending->batch->on_complete = nullptr;
- maybe_clear_pending_batch(elem, pending);
-}
-
// If there are any cached ops to replay or pending ops to start on the
// subchannel call, adds a closure to closures to invoke
-// start_retriable_subchannel_batches().
+// start_retriable_subchannel_batches(), updating *num_closures as needed.
static void add_closures_for_replay_or_pending_send_ops(
grpc_call_element* elem, subchannel_batch_data* batch_data,
- subchannel_call_retry_state* retry_state,
- grpc_core::CallCombinerClosureList* closures) {
+ subchannel_call_retry_state* retry_state, closure_to_execute* closures,
+ size_t* num_closures) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
bool have_pending_send_message_ops =
@@ -2001,12 +1929,93 @@ static void add_closures_for_replay_or_pending_send_ops(
"chand=%p calld=%p: starting next batch for pending send op(s)",
chand, calld);
}
- GRPC_CLOSURE_INIT(&batch_data->batch.handler_private.closure,
- start_retriable_subchannel_batches, elem,
- grpc_schedule_on_exec_ctx);
- closures->Add(&batch_data->batch.handler_private.closure, GRPC_ERROR_NONE,
- "starting next batch for send_* op(s)");
+ closure_to_execute* closure = &closures[(*num_closures)++];
+ closure->closure = GRPC_CLOSURE_INIT(
+ &batch_data->batch.handler_private.closure,
+ start_retriable_subchannel_batches, elem, grpc_schedule_on_exec_ctx);
+ closure->error = GRPC_ERROR_NONE;
+ closure->reason = "starting next batch for send_* op(s)";
+ }
+}
+
+// For any pending batch completed in batch_data, adds the necessary
+// completion closures to closures, updating *num_closures as needed.
+static void add_closures_for_completed_pending_batches(
+ grpc_call_element* elem, subchannel_batch_data* batch_data,
+ subchannel_call_retry_state* retry_state, grpc_error* error,
+ closure_to_execute* closures, size_t* num_closures) {
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
+ pending_batch* pending = &calld->pending_batches[i];
+ if (pending_batch_is_completed(pending, calld, retry_state)) {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "chand=%p calld=%p: pending batch completed at index %" PRIuPTR,
+ chand, calld, i);
+ }
+ // Copy the trailing metadata to return it to the surface.
+ if (batch_data->batch.recv_trailing_metadata) {
+ grpc_metadata_batch_move(&batch_data->recv_trailing_metadata,
+ pending->batch->payload->recv_trailing_metadata
+ .recv_trailing_metadata);
+ }
+ closure_to_execute* closure = &closures[(*num_closures)++];
+ closure->closure = pending->batch->on_complete;
+ closure->error = GRPC_ERROR_REF(error);
+ closure->reason = "on_complete for pending batch";
+ pending->batch->on_complete = nullptr;
+ maybe_clear_pending_batch(elem, pending);
+ }
}
+ GRPC_ERROR_UNREF(error);
+}
+
+// For any pending batch containing an op that has not yet been started,
+// adds the pending batch's completion closures to closures, updating
+// *num_closures as needed.
+static void add_closures_to_fail_unstarted_pending_batches(
+ grpc_call_element* elem, subchannel_call_retry_state* retry_state,
+ grpc_error* error, closure_to_execute* closures, size_t* num_closures) {
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
+ pending_batch* pending = &calld->pending_batches[i];
+ if (pending_batch_is_unstarted(pending, calld, retry_state)) {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "chand=%p calld=%p: failing unstarted pending batch at index "
+ "%" PRIuPTR,
+ chand, calld, i);
+ }
+ if (pending->batch->recv_initial_metadata) {
+ closure_to_execute* closure = &closures[(*num_closures)++];
+ closure->closure = pending->batch->payload->recv_initial_metadata
+ .recv_initial_metadata_ready;
+ closure->error = GRPC_ERROR_REF(error);
+ closure->reason =
+ "failing recv_initial_metadata_ready for pending batch";
+ pending->batch->payload->recv_initial_metadata
+ .recv_initial_metadata_ready = nullptr;
+ }
+ if (pending->batch->recv_message) {
+ *pending->batch->payload->recv_message.recv_message = nullptr;
+ closure_to_execute* closure = &closures[(*num_closures)++];
+ closure->closure =
+ pending->batch->payload->recv_message.recv_message_ready;
+ closure->error = GRPC_ERROR_REF(error);
+ closure->reason = "failing recv_message_ready for pending batch";
+ pending->batch->payload->recv_message.recv_message_ready = nullptr;
+ }
+ closure_to_execute* closure = &closures[(*num_closures)++];
+ closure->closure = pending->batch->on_complete;
+ closure->error = GRPC_ERROR_REF(error);
+ closure->reason = "failing on_complete for pending batch";
+ pending->batch->on_complete = nullptr;
+ maybe_clear_pending_batch(elem, pending);
+ }
+ }
+ GRPC_ERROR_UNREF(error);
}
// Callback used to intercept on_complete from subchannel calls.
@@ -2026,49 +2035,136 @@ static void on_complete(void* arg, grpc_error* error) {
static_cast<subchannel_call_retry_state*>(
grpc_connected_subchannel_call_get_parent_data(
batch_data->subchannel_call));
+ // If we have previously completed recv_trailing_metadata, then the
+ // call is finished.
+ bool call_finished = retry_state->completed_recv_trailing_metadata;
+ // Record whether we were already committed before receiving this callback.
+ const bool previously_committed = calld->retry_committed;
// Update bookkeeping in retry_state.
- if (batch_data->batch.send_initial_metadata) {
- retry_state->completed_send_initial_metadata = true;
- }
- if (batch_data->batch.send_message) {
- ++retry_state->completed_send_message_count;
- }
- if (batch_data->batch.send_trailing_metadata) {
- retry_state->completed_send_trailing_metadata = true;
+ update_retry_state_for_completed_batch(batch_data, retry_state);
+ if (call_finished) {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO, "chand=%p calld=%p: call already finished", chand,
+ calld);
+ }
+ } else {
+ // Check if this batch finished the call, and if so, get its status.
+ // The call is finished if either (a) this callback was invoked with
+ // an error or (b) we receive status.
+ grpc_status_code status = GRPC_STATUS_OK;
+ grpc_mdelem* server_pushback_md = nullptr;
+ if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { // Case (a).
+ call_finished = true;
+ grpc_error_get_status(error, calld->deadline, &status, nullptr, nullptr,
+ nullptr);
+ } else if (batch_data->batch.recv_trailing_metadata) { // Case (b).
+ call_finished = true;
+ grpc_metadata_batch* md_batch =
+ batch_data->batch.payload->recv_trailing_metadata
+ .recv_trailing_metadata;
+ GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
+ status = grpc_get_status_code_from_metadata(
+ md_batch->idx.named.grpc_status->md);
+ if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
+ server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
+ }
+ }
+ // If the call just finished, check if we should retry.
+ if (call_finished) {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand,
+ calld, grpc_status_code_to_string(status));
+ }
+ if (maybe_retry(elem, batch_data, status, server_pushback_md)) {
+ // Unref batch_data for deferred recv_initial_metadata_ready or
+ // recv_message_ready callbacks, if any.
+ if (batch_data->batch.recv_trailing_metadata &&
+ retry_state->recv_initial_metadata_ready_deferred_batch !=
+ nullptr) {
+ batch_data_unref(batch_data);
+ GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
+ }
+ if (batch_data->batch.recv_trailing_metadata &&
+ retry_state->recv_message_ready_deferred_batch != nullptr) {
+ batch_data_unref(batch_data);
+ GRPC_ERROR_UNREF(retry_state->recv_message_error);
+ }
+ // Track number of pending subchannel send batches and determine if
+ // this was the last one.
+ bool last_callback_complete = false;
+ if (batch_data->batch.send_initial_metadata ||
+ batch_data->batch.send_message ||
+ batch_data->batch.send_trailing_metadata) {
+ --calld->num_pending_retriable_subchannel_send_batches;
+ last_callback_complete =
+ calld->num_pending_retriable_subchannel_send_batches == 0;
+ }
+ batch_data_unref(batch_data);
+ // If we just completed the last subchannel send batch, unref the
+ // call stack.
+ if (last_callback_complete) {
+ GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches");
+ }
+ return;
+ }
+ // Not retrying, so commit the call.
+ retry_commit(elem, retry_state);
+ }
}
- // If the call is committed, free cached data for send ops that we've just
- // completed.
- if (calld->retry_committed) {
+ // If we were already committed before receiving this callback, free
+ // cached data for send ops that we've just completed. (If the call has
+ // just now finished, the call to retry_commit() above will have freed all
+ // cached send ops, so we don't need to do it here.)
+ if (previously_committed) {
free_cached_send_op_data_for_completed_batch(elem, batch_data, retry_state);
}
+ // Call not being retried.
// Construct list of closures to execute.
- grpc_core::CallCombinerClosureList closures;
- // If a retry was already dispatched, that means we saw
- // recv_trailing_metadata before this, so we do nothing here.
- // Otherwise, invoke the callback to return the result to the surface.
- if (!retry_state->retry_dispatched) {
- // Add closure for the completed pending batch, if any.
- add_closure_for_completed_pending_batch(elem, batch_data, retry_state,
- GRPC_ERROR_REF(error), &closures);
- // If needed, add a callback to start any replay or pending send ops on
- // the subchannel call.
- if (!retry_state->completed_recv_trailing_metadata) {
- add_closures_for_replay_or_pending_send_ops(elem, batch_data, retry_state,
- &closures);
- }
+ // Max number of closures is number of pending batches plus one for
+ // each of:
+ // - recv_initial_metadata_ready (either deferred or unstarted)
+ // - recv_message_ready (either deferred or unstarted)
+ // - starting a new batch for pending send ops
+ closure_to_execute closures[GPR_ARRAY_SIZE(calld->pending_batches) + 3];
+ size_t num_closures = 0;
+ // If there are deferred recv_initial_metadata_ready or recv_message_ready
+ // callbacks, add them to closures.
+ add_closures_for_deferred_recv_callbacks(batch_data, retry_state, closures,
+ &num_closures);
+ // Find pending batches whose ops are now complete and add their
+ // on_complete callbacks to closures.
+ add_closures_for_completed_pending_batches(elem, batch_data, retry_state,
+ GRPC_ERROR_REF(error), closures,
+ &num_closures);
+ // Add closures to handle any pending batches that have not yet been started.
+ // If the call is finished, we fail these batches; otherwise, we add a
+ // callback to start_retriable_subchannel_batches() to start them on
+ // the subchannel call.
+ if (call_finished) {
+ add_closures_to_fail_unstarted_pending_batches(
+ elem, retry_state, GRPC_ERROR_REF(error), closures, &num_closures);
+ } else {
+ add_closures_for_replay_or_pending_send_ops(elem, batch_data, retry_state,
+ closures, &num_closures);
}
// Track number of pending subchannel send batches and determine if this
// was the last one.
- --calld->num_pending_retriable_subchannel_send_batches;
- const bool last_send_batch_complete =
- calld->num_pending_retriable_subchannel_send_batches == 0;
+ bool last_callback_complete = false;
+ if (batch_data->batch.send_initial_metadata ||
+ batch_data->batch.send_message ||
+ batch_data->batch.send_trailing_metadata) {
+ --calld->num_pending_retriable_subchannel_send_batches;
+ last_callback_complete =
+ calld->num_pending_retriable_subchannel_send_batches == 0;
+ }
// Don't need batch_data anymore.
batch_data_unref(batch_data);
// Schedule all of the closures identified above.
// Note: This yeilds the call combiner.
- closures.RunClosures(calld->call_combiner);
- // If this was the last subchannel send batch, unref the call stack.
- if (last_send_batch_complete) {
+ execute_closures_in_call_combiner(elem, "on_complete", closures,
+ num_closures);
+ // If we just completed the last subchannel send batch, unref the call stack.
+ if (last_callback_complete) {
GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches");
}
}
@@ -2089,22 +2185,27 @@ static void start_batch_in_call_combiner(void* arg, grpc_error* ignored) {
// Adds a closure to closures that will execute batch in the call combiner.
static void add_closure_for_subchannel_batch(
- grpc_call_element* elem, grpc_transport_stream_op_batch* batch,
- grpc_core::CallCombinerClosureList* closures) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
+ call_data* calld, grpc_transport_stream_op_batch* batch,
+ closure_to_execute* closures, size_t* num_closures) {
batch->handler_private.extra_arg = calld->subchannel_call;
GRPC_CLOSURE_INIT(&batch->handler_private.closure,
start_batch_in_call_combiner, batch,
grpc_schedule_on_exec_ctx);
+ closure_to_execute* closure = &closures[(*num_closures)++];
+ closure->closure = &batch->handler_private.closure;
+ closure->error = GRPC_ERROR_NONE;
+ // If the tracer is enabled, we log a more detailed message, which
+ // requires dynamic allocation. This will be freed in
+ // start_retriable_subchannel_batches().
if (grpc_client_channel_trace.enabled()) {
char* batch_str = grpc_transport_stream_op_batch_string(batch);
- gpr_log(GPR_INFO, "chand=%p calld=%p: starting subchannel batch: %s", chand,
- calld, batch_str);
+ gpr_asprintf(const_cast<char**>(&closure->reason),
+ "starting batch in call combiner: %s", batch_str);
gpr_free(batch_str);
+ closure->free_reason = true;
+ } else {
+ closure->reason = "start_subchannel_batch";
}
- closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
- "start_subchannel_batch");
}
// Adds retriable send_initial_metadata op to batch_data.
@@ -2240,13 +2341,9 @@ static void add_retriable_recv_trailing_metadata_op(
grpc_metadata_batch_init(&batch_data->recv_trailing_metadata);
batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata =
&batch_data->recv_trailing_metadata;
- batch_data->batch.payload->recv_trailing_metadata.collect_stats =
+ batch_data->batch.collect_stats = true;
+ batch_data->batch.payload->collect_stats.collect_stats =
&batch_data->collect_stats;
- GRPC_CLOSURE_INIT(&batch_data->recv_trailing_metadata_ready,
- recv_trailing_metadata_ready, batch_data,
- grpc_schedule_on_exec_ctx);
- batch_data->batch.payload->recv_trailing_metadata
- .recv_trailing_metadata_ready = &batch_data->recv_trailing_metadata_ready;
}
// Helper function used to start a recv_trailing_metadata batch. This
@@ -2267,11 +2364,9 @@ static void start_internal_recv_trailing_metadata(grpc_call_element* elem) {
grpc_connected_subchannel_call_get_parent_data(
calld->subchannel_call));
// Create batch_data with 2 refs, since this batch will be unreffed twice:
- // once for the recv_trailing_metadata_ready callback when the subchannel
- // batch returns, and again when we actually get a recv_trailing_metadata
- // op from the surface.
- subchannel_batch_data* batch_data =
- batch_data_create(elem, 2, false /* set_on_complete */);
+ // once when the subchannel batch returns, and again when we actually get
+ // a recv_trailing_metadata op from the surface.
+ subchannel_batch_data* batch_data = batch_data_create(elem, 2);
add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
retry_state->recv_trailing_metadata_internal_batch = batch_data;
// Note: This will release the call combiner.
@@ -2296,7 +2391,7 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay(
"send_initial_metadata op",
chand, calld);
}
- replay_batch_data = batch_data_create(elem, 1, true /* set_on_complete */);
+ replay_batch_data = batch_data_create(elem, 1);
add_retriable_send_initial_metadata_op(calld, retry_state,
replay_batch_data);
}
@@ -2313,8 +2408,7 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay(
chand, calld);
}
if (replay_batch_data == nullptr) {
- replay_batch_data =
- batch_data_create(elem, 1, true /* set_on_complete */);
+ replay_batch_data = batch_data_create(elem, 1);
}
add_retriable_send_message_op(elem, retry_state, replay_batch_data);
}
@@ -2333,8 +2427,7 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay(
chand, calld);
}
if (replay_batch_data == nullptr) {
- replay_batch_data =
- batch_data_create(elem, 1, true /* set_on_complete */);
+ replay_batch_data = batch_data_create(elem, 1);
}
add_retriable_send_trailing_metadata_op(calld, retry_state,
replay_batch_data);
@@ -2346,7 +2439,7 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay(
// *num_batches as needed.
static void add_subchannel_batches_for_pending_batches(
grpc_call_element* elem, subchannel_call_retry_state* retry_state,
- grpc_core::CallCombinerClosureList* closures) {
+ closure_to_execute* closures, size_t* num_closures) {
call_data* calld = static_cast<call_data*>(elem->call_data);
for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
pending_batch* pending = &calld->pending_batches[i];
@@ -2402,11 +2495,13 @@ static void add_subchannel_batches_for_pending_batches(
if (retry_state->completed_recv_trailing_metadata) {
subchannel_batch_data* batch_data =
retry_state->recv_trailing_metadata_internal_batch;
+ closure_to_execute* closure = &closures[(*num_closures)++];
+ closure->closure = &batch_data->on_complete;
// Batches containing recv_trailing_metadata always succeed.
- closures->Add(
- &batch_data->recv_trailing_metadata_ready, GRPC_ERROR_NONE,
- "re-executing recv_trailing_metadata_ready to propagate "
- "internally triggered result");
+ closure->error = GRPC_ERROR_NONE;
+ closure->reason =
+ "re-executing on_complete for recv_trailing_metadata "
+ "to propagate internally triggered result";
} else {
batch_data_unref(retry_state->recv_trailing_metadata_internal_batch);
}
@@ -2418,19 +2513,14 @@ static void add_subchannel_batches_for_pending_batches(
if (calld->method_params == nullptr ||
calld->method_params->retry_policy() == nullptr ||
calld->retry_committed) {
- add_closure_for_subchannel_batch(elem, batch, closures);
+ add_closure_for_subchannel_batch(calld, batch, closures, num_closures);
pending_batch_clear(calld, pending);
continue;
}
// Create batch with the right number of callbacks.
- const bool has_send_ops = batch->send_initial_metadata ||
- batch->send_message ||
- batch->send_trailing_metadata;
- const int num_callbacks = has_send_ops + batch->recv_initial_metadata +
- batch->recv_message +
- batch->recv_trailing_metadata;
- subchannel_batch_data* batch_data = batch_data_create(
- elem, num_callbacks, has_send_ops /* set_on_complete */);
+ const int num_callbacks =
+ 1 + batch->recv_initial_metadata + batch->recv_message;
+ subchannel_batch_data* batch_data = batch_data_create(elem, num_callbacks);
// Cache send ops if needed.
maybe_cache_send_ops_for_batch(calld, pending);
// send_initial_metadata.
@@ -2457,9 +2547,11 @@ static void add_subchannel_batches_for_pending_batches(
}
// recv_trailing_metadata.
if (batch->recv_trailing_metadata) {
+ GPR_ASSERT(batch->collect_stats);
add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
}
- add_closure_for_subchannel_batch(elem, &batch_data->batch, closures);
+ add_closure_for_subchannel_batch(calld, &batch_data->batch, closures,
+ num_closures);
// Track number of pending subchannel send batches.
// If this is the first one, take a ref to the call stack.
if (batch->send_initial_metadata || batch->send_message ||
@@ -2487,13 +2579,15 @@ static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) {
grpc_connected_subchannel_call_get_parent_data(
calld->subchannel_call));
// Construct list of closures to execute, one for each pending batch.
- grpc_core::CallCombinerClosureList closures;
+ // We can start up to 6 batches.
+ closure_to_execute closures[GPR_ARRAY_SIZE(calld->pending_batches)];
+ size_t num_closures = 0;
// Replay previously-returned send_* ops if needed.
subchannel_batch_data* replay_batch_data =
maybe_create_subchannel_batch_for_replay(elem, retry_state);
if (replay_batch_data != nullptr) {
- add_closure_for_subchannel_batch(elem, &replay_batch_data->batch,
- &closures);
+ add_closure_for_subchannel_batch(calld, &replay_batch_data->batch, closures,
+ &num_closures);
// Track number of pending subchannel send batches.
// If this is the first one, take a ref to the call stack.
if (calld->num_pending_retriable_subchannel_send_batches == 0) {
@@ -2502,16 +2596,17 @@ static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) {
++calld->num_pending_retriable_subchannel_send_batches;
}
// Now add pending batches.
- add_subchannel_batches_for_pending_batches(elem, retry_state, &closures);
+ add_subchannel_batches_for_pending_batches(elem, retry_state, closures,
+ &num_closures);
// Start batches on subchannel call.
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: starting %" PRIuPTR
" retriable batches on subchannel_call=%p",
- chand, calld, closures.size(), calld->subchannel_call);
+ chand, calld, num_closures, calld->subchannel_call);
}
- // Note: This will yield the call combiner.
- closures.RunClosures(calld->call_combiner);
+ execute_closures_in_call_combiner(elem, "start_retriable_subchannel_batches",
+ closures, num_closures);
}
//