aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Mark D. Roth <roth@google.com>2018-06-14 09:44:58 -0700
committerGravatar Mark D. Roth <roth@google.com>2018-06-14 09:44:58 -0700
commit817d28fed6184053153831ab194891be882df138 (patch)
tree4654b7e5046cc3f3ae3d071ae8fd3fcd5b285ea0
parent9a2c0a8641d1837185a60436adf9419209f89fbe (diff)
Revert "Merge pull request #15746 from grpc/revert-15709-recv_trailing_metadata_ready2"
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc891
-rw-r--r--src/core/ext/filters/deadline/deadline_filter.cc28
-rw-r--r--src/core/ext/filters/deadline/deadline_filter.h10
-rw-r--r--src/core/ext/filters/http/client/http_client_filter.cc19
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.cc35
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.cc19
-rw-r--r--src/core/ext/transport/inproc/inproc_transport.cc52
-rw-r--r--src/core/lib/channel/connected_channel.cc9
-rw-r--r--src/core/lib/gprpp/inlined_vector.h2
-rw-r--r--src/core/lib/iomgr/call_combiner.h80
-rw-r--r--src/core/lib/iomgr/closure.h5
-rw-r--r--src/core/lib/surface/call.cc81
-rw-r--r--src/core/lib/transport/transport.cc29
-rw-r--r--src/core/lib/transport/transport.h22
-rw-r--r--src/core/lib/transport/transport_op_string.cc7
-rw-r--r--test/core/gprpp/inlined_vector_test.cc2
-rw-r--r--test/cpp/microbenchmarks/bm_call_create.cc24
17 files changed, 709 insertions, 606 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index f141aabe70..8a29c80729 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -804,6 +804,7 @@ typedef struct {
// For intercepting recv_trailing_metadata.
grpc_metadata_batch recv_trailing_metadata;
grpc_transport_stream_stats collect_stats;
+ grpc_closure recv_trailing_metadata_ready;
// For intercepting on_complete.
grpc_closure on_complete;
} subchannel_batch_data;
@@ -1179,35 +1180,24 @@ static void pending_batches_fail(grpc_call_element* elem, grpc_error* error,
"chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
elem->channel_data, calld, num_batches, grpc_error_string(error));
}
- grpc_transport_stream_op_batch*
- batches[GPR_ARRAY_SIZE(calld->pending_batches)];
- size_t num_batches = 0;
+ grpc_core::CallCombinerClosureList closures;
for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
pending_batch* pending = &calld->pending_batches[i];
grpc_transport_stream_op_batch* batch = pending->batch;
if (batch != nullptr) {
- batches[num_batches++] = batch;
+ batch->handler_private.extra_arg = calld;
+ GRPC_CLOSURE_INIT(&batch->handler_private.closure,
+ fail_pending_batch_in_call_combiner, batch,
+ grpc_schedule_on_exec_ctx);
+ closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error),
+ "pending_batches_fail");
pending_batch_clear(calld, pending);
}
}
- for (size_t i = yield_call_combiner ? 1 : 0; i < num_batches; ++i) {
- grpc_transport_stream_op_batch* batch = batches[i];
- batch->handler_private.extra_arg = calld;
- GRPC_CLOSURE_INIT(&batch->handler_private.closure,
- fail_pending_batch_in_call_combiner, batch,
- grpc_schedule_on_exec_ctx);
- GRPC_CALL_COMBINER_START(calld->call_combiner,
- &batch->handler_private.closure,
- GRPC_ERROR_REF(error), "pending_batches_fail");
- }
if (yield_call_combiner) {
- if (num_batches > 0) {
- // Note: This will release the call combiner.
- grpc_transport_stream_op_batch_finish_with_failure(
- batches[0], GRPC_ERROR_REF(error), calld->call_combiner);
- } else {
- GRPC_CALL_COMBINER_STOP(calld->call_combiner, "pending_batches_fail");
- }
+ closures.RunClosures(calld->call_combiner);
+ } else {
+ closures.RunClosuresWithoutYielding(calld->call_combiner);
}
GRPC_ERROR_UNREF(error);
}
@@ -1242,30 +1232,22 @@ static void pending_batches_resume(grpc_call_element* elem) {
" pending batches on subchannel_call=%p",
chand, calld, num_batches, calld->subchannel_call);
}
- grpc_transport_stream_op_batch*
- batches[GPR_ARRAY_SIZE(calld->pending_batches)];
- size_t num_batches = 0;
+ grpc_core::CallCombinerClosureList closures;
for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
pending_batch* pending = &calld->pending_batches[i];
grpc_transport_stream_op_batch* batch = pending->batch;
if (batch != nullptr) {
- batches[num_batches++] = batch;
+ batch->handler_private.extra_arg = calld->subchannel_call;
+ GRPC_CLOSURE_INIT(&batch->handler_private.closure,
+ resume_pending_batch_in_call_combiner, batch,
+ grpc_schedule_on_exec_ctx);
+ closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
+ "pending_batches_resume");
pending_batch_clear(calld, pending);
}
}
- for (size_t i = 1; i < num_batches; ++i) {
- grpc_transport_stream_op_batch* batch = batches[i];
- batch->handler_private.extra_arg = calld->subchannel_call;
- GRPC_CLOSURE_INIT(&batch->handler_private.closure,
- resume_pending_batch_in_call_combiner, batch,
- grpc_schedule_on_exec_ctx);
- GRPC_CALL_COMBINER_START(calld->call_combiner,
- &batch->handler_private.closure, GRPC_ERROR_NONE,
- "pending_batches_resume");
- }
- GPR_ASSERT(num_batches > 0);
// Note: This will release the call combiner.
- grpc_subchannel_call_process_op(calld->subchannel_call, batches[0]);
+ closures.RunClosures(calld->call_combiner);
}
static void maybe_clear_pending_batch(grpc_call_element* elem,
@@ -1280,7 +1262,10 @@ static void maybe_clear_pending_batch(grpc_call_element* elem,
batch->payload->recv_initial_metadata.recv_initial_metadata_ready ==
nullptr) &&
(!batch->recv_message ||
- batch->payload->recv_message.recv_message_ready == nullptr)) {
+ batch->payload->recv_message.recv_message_ready == nullptr) &&
+ (!batch->recv_trailing_metadata ||
+ batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready ==
+ nullptr)) {
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand,
calld);
@@ -1289,75 +1274,27 @@ static void maybe_clear_pending_batch(grpc_call_element* elem,
}
}
-// Returns true if all ops in the pending batch have been completed.
-static bool pending_batch_is_completed(
- pending_batch* pending, call_data* calld,
- subchannel_call_retry_state* retry_state) {
- if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
- return false;
- }
- if (pending->batch->send_initial_metadata &&
- !retry_state->completed_send_initial_metadata) {
- return false;
- }
- if (pending->batch->send_message &&
- retry_state->completed_send_message_count <
- calld->send_messages->size()) {
- return false;
- }
- if (pending->batch->send_trailing_metadata &&
- !retry_state->completed_send_trailing_metadata) {
- return false;
- }
- if (pending->batch->recv_initial_metadata &&
- !retry_state->completed_recv_initial_metadata) {
- return false;
- }
- if (pending->batch->recv_message &&
- retry_state->completed_recv_message_count <
- retry_state->started_recv_message_count) {
- return false;
- }
- if (pending->batch->recv_trailing_metadata &&
- !retry_state->completed_recv_trailing_metadata) {
- return false;
- }
- return true;
-}
-
-// Returns true if any op in the batch was not yet started.
-static bool pending_batch_is_unstarted(
- pending_batch* pending, call_data* calld,
- subchannel_call_retry_state* retry_state) {
- if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
- return false;
- }
- if (pending->batch->send_initial_metadata &&
- !retry_state->started_send_initial_metadata) {
- return true;
- }
- if (pending->batch->send_message &&
- retry_state->started_send_message_count < calld->send_messages->size()) {
- return true;
- }
- if (pending->batch->send_trailing_metadata &&
- !retry_state->started_send_trailing_metadata) {
- return true;
- }
- if (pending->batch->recv_initial_metadata &&
- !retry_state->started_recv_initial_metadata) {
- return true;
- }
- if (pending->batch->recv_message &&
- retry_state->completed_recv_message_count ==
- retry_state->started_recv_message_count) {
- return true;
- }
- if (pending->batch->recv_trailing_metadata &&
- !retry_state->started_recv_trailing_metadata) {
- return true;
+// Returns a pointer to the first pending batch for which predicate(batch)
+// returns true, or null if not found.
+template <typename Predicate>
+static pending_batch* pending_batch_find(grpc_call_element* elem,
+ const char* log_message,
+ Predicate predicate) {
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
+ pending_batch* pending = &calld->pending_batches[i];
+ grpc_transport_stream_op_batch* batch = pending->batch;
+ if (batch != nullptr && predicate(batch)) {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "chand=%p calld=%p: %s pending batch at index %" PRIuPTR, chand,
+ calld, log_message, i);
+ }
+ return pending;
+ }
}
- return false;
+ return nullptr;
}
//
@@ -1544,8 +1481,13 @@ static bool maybe_retry(grpc_call_element* elem,
// subchannel_batch_data
//
+// Creates a subchannel_batch_data object on the call's arena with the
+// specified refcount. If set_on_complete is true, the batch's
+// on_complete callback will be set to point to on_complete();
+// otherwise, the batch's on_complete callback will be null.
static subchannel_batch_data* batch_data_create(grpc_call_element* elem,
- int refcount) {
+ int refcount,
+ bool set_on_complete) {
call_data* calld = static_cast<call_data*>(elem->call_data);
subchannel_call_retry_state* retry_state =
static_cast<subchannel_call_retry_state*>(
@@ -1558,9 +1500,11 @@ static subchannel_batch_data* batch_data_create(grpc_call_element* elem,
GRPC_SUBCHANNEL_CALL_REF(calld->subchannel_call, "batch_data_create");
batch_data->batch.payload = &retry_state->batch_payload;
gpr_ref_init(&batch_data->refs, refcount);
- GRPC_CLOSURE_INIT(&batch_data->on_complete, on_complete, batch_data,
- grpc_schedule_on_exec_ctx);
- batch_data->batch.on_complete = &batch_data->on_complete;
+ if (set_on_complete) {
+ GRPC_CLOSURE_INIT(&batch_data->on_complete, on_complete, batch_data,
+ grpc_schedule_on_exec_ctx);
+ batch_data->batch.on_complete = &batch_data->on_complete;
+ }
GRPC_CALL_STACK_REF(calld->owning_call, "batch_data");
return batch_data;
}
@@ -1593,26 +1537,14 @@ static void batch_data_unref(subchannel_batch_data* batch_data) {
static void invoke_recv_initial_metadata_callback(void* arg,
grpc_error* error) {
subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
- channel_data* chand =
- static_cast<channel_data*>(batch_data->elem->channel_data);
- call_data* calld = static_cast<call_data*>(batch_data->elem->call_data);
// Find pending batch.
- pending_batch* pending = nullptr;
- for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
- grpc_transport_stream_op_batch* batch = calld->pending_batches[i].batch;
- if (batch != nullptr && batch->recv_initial_metadata &&
- batch->payload->recv_initial_metadata.recv_initial_metadata_ready !=
- nullptr) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO,
- "chand=%p calld=%p: invoking recv_initial_metadata_ready for "
- "pending batch at index %" PRIuPTR,
- chand, calld, i);
- }
- pending = &calld->pending_batches[i];
- break;
- }
- }
+ pending_batch* pending = pending_batch_find(
+ batch_data->elem, "invoking recv_initial_metadata_ready for",
+ [](grpc_transport_stream_op_batch* batch) {
+ return batch->recv_initial_metadata &&
+ batch->payload->recv_initial_metadata
+ .recv_initial_metadata_ready != nullptr;
+ });
GPR_ASSERT(pending != nullptr);
// Return metadata.
grpc_metadata_batch_move(
@@ -1648,10 +1580,19 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
static_cast<subchannel_call_retry_state*>(
grpc_connected_subchannel_call_get_parent_data(
batch_data->subchannel_call));
+ retry_state->completed_recv_initial_metadata = true;
+ // If a retry was already dispatched, then we're not going to use the
+ // result of this recv_initial_metadata op, so do nothing.
+ if (retry_state->retry_dispatched) {
+ GRPC_CALL_COMBINER_STOP(
+ calld->call_combiner,
+ "recv_initial_metadata_ready after retry dispatched");
+ return;
+ }
// If we got an error or a Trailers-Only response and have not yet gotten
- // the recv_trailing_metadata on_complete callback, then defer
- // propagating this callback back to the surface. We can evaluate whether
- // to retry when recv_trailing_metadata comes back.
+ // the recv_trailing_metadata_ready callback, then defer propagating this
+ // callback back to the surface. We can evaluate whether to retry when
+ // recv_trailing_metadata comes back.
if (GPR_UNLIKELY((batch_data->trailing_metadata_available ||
error != GRPC_ERROR_NONE) &&
!retry_state->completed_recv_trailing_metadata)) {
@@ -1676,9 +1617,9 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
}
// Received valid initial metadata, so commit the call.
retry_commit(elem, retry_state);
+ // Invoke the callback to return the result to the surface.
// Manually invoking a callback function; it does not take ownership of error.
invoke_recv_initial_metadata_callback(batch_data, error);
- GRPC_ERROR_UNREF(error);
}
//
@@ -1688,25 +1629,13 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
// Invokes recv_message_ready for a subchannel batch.
static void invoke_recv_message_callback(void* arg, grpc_error* error) {
subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
- channel_data* chand =
- static_cast<channel_data*>(batch_data->elem->channel_data);
- call_data* calld = static_cast<call_data*>(batch_data->elem->call_data);
// Find pending op.
- pending_batch* pending = nullptr;
- for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
- grpc_transport_stream_op_batch* batch = calld->pending_batches[i].batch;
- if (batch != nullptr && batch->recv_message &&
- batch->payload->recv_message.recv_message_ready != nullptr) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO,
- "chand=%p calld=%p: invoking recv_message_ready for "
- "pending batch at index %" PRIuPTR,
- chand, calld, i);
- }
- pending = &calld->pending_batches[i];
- break;
- }
- }
+ pending_batch* pending = pending_batch_find(
+ batch_data->elem, "invoking recv_message_ready for",
+ [](grpc_transport_stream_op_batch* batch) {
+ return batch->recv_message &&
+ batch->payload->recv_message.recv_message_ready != nullptr;
+ });
GPR_ASSERT(pending != nullptr);
// Return payload.
*pending->batch->payload->recv_message.recv_message =
@@ -1738,10 +1667,18 @@ static void recv_message_ready(void* arg, grpc_error* error) {
static_cast<subchannel_call_retry_state*>(
grpc_connected_subchannel_call_get_parent_data(
batch_data->subchannel_call));
+ ++retry_state->completed_recv_message_count;
+ // If a retry was already dispatched, then we're not going to use the
+ // result of this recv_message op, so do nothing.
+ if (retry_state->retry_dispatched) {
+ GRPC_CALL_COMBINER_STOP(calld->call_combiner,
+ "recv_message_ready after retry dispatched");
+ return;
+ }
// If we got an error or the payload was nullptr and we have not yet gotten
- // the recv_trailing_metadata on_complete callback, then defer
- // propagating this callback back to the surface. We can evaluate whether
- // to retry when recv_trailing_metadata comes back.
+ // the recv_trailing_metadata_ready callback, then defer propagating this
+ // callback back to the surface. We can evaluate whether to retry when
+ // recv_trailing_metadata comes back.
if (GPR_UNLIKELY(
(batch_data->recv_message == nullptr || error != GRPC_ERROR_NONE) &&
!retry_state->completed_recv_trailing_metadata)) {
@@ -1764,133 +1701,268 @@ static void recv_message_ready(void* arg, grpc_error* error) {
}
// Received a valid message, so commit the call.
retry_commit(elem, retry_state);
+ // Invoke the callback to return the result to the surface.
// Manually invoking a callback function; it does not take ownership of error.
invoke_recv_message_callback(batch_data, error);
- GRPC_ERROR_UNREF(error);
}
//
-// list of closures to execute in call combiner
+// recv_trailing_metadata handling
//
-// Represents a closure that needs to run in the call combiner as part of
-// starting or completing a batch.
-typedef struct {
- grpc_closure* closure;
- grpc_error* error;
- const char* reason;
- bool free_reason = false;
-} closure_to_execute;
-
-static void execute_closures_in_call_combiner(grpc_call_element* elem,
- const char* caller,
- closure_to_execute* closures,
- size_t num_closures) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+// Sets *status and *server_pushback_md based on batch_data and error.
+static void get_call_status(subchannel_batch_data* batch_data,
+ grpc_error* error, grpc_status_code* status,
+ grpc_mdelem** server_pushback_md) {
+ grpc_call_element* elem = batch_data->elem;
call_data* calld = static_cast<call_data*>(elem->call_data);
- // Note that the call combiner will be yielded for each closure that
- // we schedule. We're already running in the call combiner, so one of
- // the closures can be scheduled directly, but the others will
- // have to re-enter the call combiner.
- if (num_closures > 0) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: %s starting closure: %s", chand,
- calld, caller, closures[0].reason);
- }
- GRPC_CLOSURE_SCHED(closures[0].closure, closures[0].error);
- if (closures[0].free_reason) {
- gpr_free(const_cast<char*>(closures[0].reason));
- }
- for (size_t i = 1; i < num_closures; ++i) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO,
- "chand=%p calld=%p: %s starting closure in call combiner: %s",
- chand, calld, caller, closures[i].reason);
- }
- GRPC_CALL_COMBINER_START(calld->call_combiner, closures[i].closure,
- closures[i].error, closures[i].reason);
- if (closures[i].free_reason) {
- gpr_free(const_cast<char*>(closures[i].reason));
- }
- }
+ if (error != GRPC_ERROR_NONE) {
+ grpc_error_get_status(error, calld->deadline, status, nullptr, nullptr,
+ nullptr);
} else {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: no closures to run for %s", chand,
- calld, caller);
+ grpc_metadata_batch* md_batch =
+ batch_data->batch.payload->recv_trailing_metadata
+ .recv_trailing_metadata;
+ GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
+ *status =
+ grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md);
+ if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
+ *server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
}
- GRPC_CALL_COMBINER_STOP(calld->call_combiner, "no closures to run");
}
+ GRPC_ERROR_UNREF(error);
}
-//
-// on_complete callback handling
-//
-
-// Updates retry_state to reflect the ops completed in batch_data.
-static void update_retry_state_for_completed_batch(
- subchannel_batch_data* batch_data,
- subchannel_call_retry_state* retry_state) {
- if (batch_data->batch.send_initial_metadata) {
- retry_state->completed_send_initial_metadata = true;
- }
- if (batch_data->batch.send_message) {
- ++retry_state->completed_send_message_count;
- }
- if (batch_data->batch.send_trailing_metadata) {
- retry_state->completed_send_trailing_metadata = true;
- }
- if (batch_data->batch.recv_initial_metadata) {
- retry_state->completed_recv_initial_metadata = true;
- }
- if (batch_data->batch.recv_message) {
- ++retry_state->completed_recv_message_count;
- }
- if (batch_data->batch.recv_trailing_metadata) {
- retry_state->completed_recv_trailing_metadata = true;
+// Adds recv_trailing_metadata_ready closure to closures.
+static void add_closure_for_recv_trailing_metadata_ready(
+ grpc_call_element* elem, subchannel_batch_data* batch_data,
+ grpc_error* error, grpc_core::CallCombinerClosureList* closures) {
+ // Find pending batch.
+ pending_batch* pending = pending_batch_find(
+ elem, "invoking recv_trailing_metadata for",
+ [](grpc_transport_stream_op_batch* batch) {
+ return batch->recv_trailing_metadata &&
+ batch->payload->recv_trailing_metadata
+ .recv_trailing_metadata_ready != nullptr;
+ });
+ // If we generated the recv_trailing_metadata op internally via
+ // start_internal_recv_trailing_metadata(), then there will be no
+ // pending batch.
+ if (pending == nullptr) {
+ GRPC_ERROR_UNREF(error);
+ return;
}
+ // Return metadata.
+ grpc_metadata_batch_move(
+ &batch_data->recv_trailing_metadata,
+ pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata);
+ // Add closure.
+ closures->Add(pending->batch->payload->recv_trailing_metadata
+ .recv_trailing_metadata_ready,
+ error, "recv_trailing_metadata_ready for pending batch");
+ // Update bookkeeping.
+ pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
+ nullptr;
+ maybe_clear_pending_batch(elem, pending);
}
// Adds any necessary closures for deferred recv_initial_metadata and
-// recv_message callbacks to closures, updating *num_closures as needed.
+// recv_message callbacks to closures.
static void add_closures_for_deferred_recv_callbacks(
subchannel_batch_data* batch_data, subchannel_call_retry_state* retry_state,
- closure_to_execute* closures, size_t* num_closures) {
+ grpc_core::CallCombinerClosureList* closures) {
if (batch_data->batch.recv_trailing_metadata) {
// Add closure for deferred recv_initial_metadata_ready.
if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch !=
nullptr)) {
- closure_to_execute* closure = &closures[(*num_closures)++];
- closure->closure = GRPC_CLOSURE_INIT(
- &batch_data->recv_initial_metadata_ready,
- invoke_recv_initial_metadata_callback,
- retry_state->recv_initial_metadata_ready_deferred_batch,
- grpc_schedule_on_exec_ctx);
- closure->error = retry_state->recv_initial_metadata_error;
- closure->reason = "resuming recv_initial_metadata_ready";
+ GRPC_CLOSURE_INIT(&batch_data->recv_initial_metadata_ready,
+ invoke_recv_initial_metadata_callback,
+ retry_state->recv_initial_metadata_ready_deferred_batch,
+ grpc_schedule_on_exec_ctx);
+ closures->Add(&batch_data->recv_initial_metadata_ready,
+ retry_state->recv_initial_metadata_error,
+ "resuming recv_initial_metadata_ready");
retry_state->recv_initial_metadata_ready_deferred_batch = nullptr;
}
// Add closure for deferred recv_message_ready.
if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch !=
nullptr)) {
- closure_to_execute* closure = &closures[(*num_closures)++];
- closure->closure = GRPC_CLOSURE_INIT(
- &batch_data->recv_message_ready, invoke_recv_message_callback,
- retry_state->recv_message_ready_deferred_batch,
- grpc_schedule_on_exec_ctx);
- closure->error = retry_state->recv_message_error;
- closure->reason = "resuming recv_message_ready";
+ GRPC_CLOSURE_INIT(&batch_data->recv_message_ready,
+ invoke_recv_message_callback,
+ retry_state->recv_message_ready_deferred_batch,
+ grpc_schedule_on_exec_ctx);
+ closures->Add(&batch_data->recv_message_ready,
+ retry_state->recv_message_error,
+ "resuming recv_message_ready");
retry_state->recv_message_ready_deferred_batch = nullptr;
}
}
}
+// Returns true if any op in the batch was not yet started.
+// Only looks at send ops, since recv ops are always started immediately.
+static bool pending_batch_is_unstarted(
+ pending_batch* pending, call_data* calld,
+ subchannel_call_retry_state* retry_state) {
+ if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
+ return false;
+ }
+ if (pending->batch->send_initial_metadata &&
+ !retry_state->started_send_initial_metadata) {
+ return true;
+ }
+ if (pending->batch->send_message &&
+ retry_state->started_send_message_count < calld->send_messages->size()) {
+ return true;
+ }
+ if (pending->batch->send_trailing_metadata &&
+ !retry_state->started_send_trailing_metadata) {
+ return true;
+ }
+ return false;
+}
+
+// For any pending batch containing an op that has not yet been started,
+// adds the pending batch's completion closures to closures.
+static void add_closures_to_fail_unstarted_pending_batches(
+ grpc_call_element* elem, subchannel_call_retry_state* retry_state,
+ grpc_error* error, grpc_core::CallCombinerClosureList* closures) {
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
+ pending_batch* pending = &calld->pending_batches[i];
+ if (pending_batch_is_unstarted(pending, calld, retry_state)) {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "chand=%p calld=%p: failing unstarted pending batch at index "
+ "%" PRIuPTR,
+ chand, calld, i);
+ }
+ closures->Add(pending->batch->on_complete, GRPC_ERROR_REF(error),
+ "failing on_complete for pending batch");
+ pending->batch->on_complete = nullptr;
+ maybe_clear_pending_batch(elem, pending);
+ }
+ }
+ GRPC_ERROR_UNREF(error);
+}
+
+// Runs necessary closures upon completion of a call attempt.
+static void run_closures_for_completed_call(subchannel_batch_data* batch_data,
+ grpc_error* error) {
+ grpc_call_element* elem = batch_data->elem;
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ subchannel_call_retry_state* retry_state =
+ static_cast<subchannel_call_retry_state*>(
+ grpc_connected_subchannel_call_get_parent_data(
+ batch_data->subchannel_call));
+ // Construct list of closures to execute.
+ grpc_core::CallCombinerClosureList closures;
+ // First, add closure for recv_trailing_metadata_ready.
+ add_closure_for_recv_trailing_metadata_ready(
+ elem, batch_data, GRPC_ERROR_REF(error), &closures);
+ // If there are deferred recv_initial_metadata_ready or recv_message_ready
+ // callbacks, add them to closures.
+ add_closures_for_deferred_recv_callbacks(batch_data, retry_state, &closures);
+ // Add closures to fail any pending batches that have not yet been started.
+ add_closures_to_fail_unstarted_pending_batches(
+ elem, retry_state, GRPC_ERROR_REF(error), &closures);
+ // Don't need batch_data anymore.
+ batch_data_unref(batch_data);
+ // Schedule all of the closures identified above.
+ // Note: This will release the call combiner.
+ closures.RunClosures(calld->call_combiner);
+ GRPC_ERROR_UNREF(error);
+}
+
+// Intercepts recv_trailing_metadata_ready callback for retries.
+// Commits the call and returns the trailing metadata up the stack.
+static void recv_trailing_metadata_ready(void* arg, grpc_error* error) {
+ subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
+ grpc_call_element* elem = batch_data->elem;
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "chand=%p calld=%p: got recv_trailing_metadata_ready, error=%s",
+ chand, calld, grpc_error_string(error));
+ }
+ subchannel_call_retry_state* retry_state =
+ static_cast<subchannel_call_retry_state*>(
+ grpc_connected_subchannel_call_get_parent_data(
+ batch_data->subchannel_call));
+ retry_state->completed_recv_trailing_metadata = true;
+ // Get the call's status and check for server pushback metadata.
+ grpc_status_code status = GRPC_STATUS_OK;
+ grpc_mdelem* server_pushback_md = nullptr;
+ get_call_status(batch_data, GRPC_ERROR_REF(error), &status,
+ &server_pushback_md);
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand,
+ calld, grpc_status_code_to_string(status));
+ }
+ // Check if we should retry.
+ if (maybe_retry(elem, batch_data, status, server_pushback_md)) {
+ // Unref batch_data for deferred recv_initial_metadata_ready or
+ // recv_message_ready callbacks, if any.
+ if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) {
+ batch_data_unref(batch_data);
+ GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
+ }
+ if (retry_state->recv_message_ready_deferred_batch != nullptr) {
+ batch_data_unref(batch_data);
+ GRPC_ERROR_UNREF(retry_state->recv_message_error);
+ }
+ batch_data_unref(batch_data);
+ return;
+ }
+ // Not retrying, so commit the call.
+ retry_commit(elem, retry_state);
+ // Run any necessary closures.
+ run_closures_for_completed_call(batch_data, GRPC_ERROR_REF(error));
+}
+
+//
+// on_complete callback handling
+//
+
+// Adds the on_complete closure for the pending batch completed in
+// batch_data to closures.
+static void add_closure_for_completed_pending_batch(
+ grpc_call_element* elem, subchannel_batch_data* batch_data,
+ subchannel_call_retry_state* retry_state, grpc_error* error,
+ grpc_core::CallCombinerClosureList* closures) {
+ pending_batch* pending = pending_batch_find(
+ elem, "completed", [batch_data](grpc_transport_stream_op_batch* batch) {
+ // Match the pending batch with the same set of send ops as the
+ // subchannel batch we've just completed.
+ return batch->on_complete != nullptr &&
+ batch_data->batch.send_initial_metadata ==
+ batch->send_initial_metadata &&
+ batch_data->batch.send_message == batch->send_message &&
+ batch_data->batch.send_trailing_metadata ==
+ batch->send_trailing_metadata;
+ });
+ // If batch_data is a replay batch, then there will be no pending
+ // batch to complete.
+ if (pending == nullptr) {
+ GRPC_ERROR_UNREF(error);
+ return;
+ }
+ // Add closure.
+ closures->Add(pending->batch->on_complete, error,
+ "on_complete for pending batch");
+ pending->batch->on_complete = nullptr;
+ maybe_clear_pending_batch(elem, pending);
+}
+
// If there are any cached ops to replay or pending ops to start on the
// subchannel call, adds a closure to closures to invoke
-// start_retriable_subchannel_batches(), updating *num_closures as needed.
+// start_retriable_subchannel_batches().
static void add_closures_for_replay_or_pending_send_ops(
grpc_call_element* elem, subchannel_batch_data* batch_data,
- subchannel_call_retry_state* retry_state, closure_to_execute* closures,
- size_t* num_closures) {
+ subchannel_call_retry_state* retry_state,
+ grpc_core::CallCombinerClosureList* closures) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
bool have_pending_send_message_ops =
@@ -1916,93 +1988,12 @@ static void add_closures_for_replay_or_pending_send_ops(
"chand=%p calld=%p: starting next batch for pending send op(s)",
chand, calld);
}
- closure_to_execute* closure = &closures[(*num_closures)++];
- closure->closure = GRPC_CLOSURE_INIT(
- &batch_data->batch.handler_private.closure,
- start_retriable_subchannel_batches, elem, grpc_schedule_on_exec_ctx);
- closure->error = GRPC_ERROR_NONE;
- closure->reason = "starting next batch for send_* op(s)";
- }
-}
-
-// For any pending batch completed in batch_data, adds the necessary
-// completion closures to closures, updating *num_closures as needed.
-static void add_closures_for_completed_pending_batches(
- grpc_call_element* elem, subchannel_batch_data* batch_data,
- subchannel_call_retry_state* retry_state, grpc_error* error,
- closure_to_execute* closures, size_t* num_closures) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
- pending_batch* pending = &calld->pending_batches[i];
- if (pending_batch_is_completed(pending, calld, retry_state)) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO,
- "chand=%p calld=%p: pending batch completed at index %" PRIuPTR,
- chand, calld, i);
- }
- // Copy the trailing metadata to return it to the surface.
- if (batch_data->batch.recv_trailing_metadata) {
- grpc_metadata_batch_move(&batch_data->recv_trailing_metadata,
- pending->batch->payload->recv_trailing_metadata
- .recv_trailing_metadata);
- }
- closure_to_execute* closure = &closures[(*num_closures)++];
- closure->closure = pending->batch->on_complete;
- closure->error = GRPC_ERROR_REF(error);
- closure->reason = "on_complete for pending batch";
- pending->batch->on_complete = nullptr;
- maybe_clear_pending_batch(elem, pending);
- }
+ GRPC_CLOSURE_INIT(&batch_data->batch.handler_private.closure,
+ start_retriable_subchannel_batches, elem,
+ grpc_schedule_on_exec_ctx);
+ closures->Add(&batch_data->batch.handler_private.closure, GRPC_ERROR_NONE,
+ "starting next batch for send_* op(s)");
}
- GRPC_ERROR_UNREF(error);
-}
-
-// For any pending batch containing an op that has not yet been started,
-// adds the pending batch's completion closures to closures, updating
-// *num_closures as needed.
-static void add_closures_to_fail_unstarted_pending_batches(
- grpc_call_element* elem, subchannel_call_retry_state* retry_state,
- grpc_error* error, closure_to_execute* closures, size_t* num_closures) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
- pending_batch* pending = &calld->pending_batches[i];
- if (pending_batch_is_unstarted(pending, calld, retry_state)) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO,
- "chand=%p calld=%p: failing unstarted pending batch at index "
- "%" PRIuPTR,
- chand, calld, i);
- }
- if (pending->batch->recv_initial_metadata) {
- closure_to_execute* closure = &closures[(*num_closures)++];
- closure->closure = pending->batch->payload->recv_initial_metadata
- .recv_initial_metadata_ready;
- closure->error = GRPC_ERROR_REF(error);
- closure->reason =
- "failing recv_initial_metadata_ready for pending batch";
- pending->batch->payload->recv_initial_metadata
- .recv_initial_metadata_ready = nullptr;
- }
- if (pending->batch->recv_message) {
- *pending->batch->payload->recv_message.recv_message = nullptr;
- closure_to_execute* closure = &closures[(*num_closures)++];
- closure->closure =
- pending->batch->payload->recv_message.recv_message_ready;
- closure->error = GRPC_ERROR_REF(error);
- closure->reason = "failing recv_message_ready for pending batch";
- pending->batch->payload->recv_message.recv_message_ready = nullptr;
- }
- closure_to_execute* closure = &closures[(*num_closures)++];
- closure->closure = pending->batch->on_complete;
- closure->error = GRPC_ERROR_REF(error);
- closure->reason = "failing on_complete for pending batch";
- pending->batch->on_complete = nullptr;
- maybe_clear_pending_batch(elem, pending);
- }
- }
- GRPC_ERROR_UNREF(error);
}
// Callback used to intercept on_complete from subchannel calls.
@@ -2022,136 +2013,49 @@ static void on_complete(void* arg, grpc_error* error) {
static_cast<subchannel_call_retry_state*>(
grpc_connected_subchannel_call_get_parent_data(
batch_data->subchannel_call));
- // If we have previously completed recv_trailing_metadata, then the
- // call is finished.
- bool call_finished = retry_state->completed_recv_trailing_metadata;
- // Record whether we were already committed before receiving this callback.
- const bool previously_committed = calld->retry_committed;
// Update bookkeeping in retry_state.
- update_retry_state_for_completed_batch(batch_data, retry_state);
- if (call_finished) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: call already finished", chand,
- calld);
- }
- } else {
- // Check if this batch finished the call, and if so, get its status.
- // The call is finished if either (a) this callback was invoked with
- // an error or (b) we receive status.
- grpc_status_code status = GRPC_STATUS_OK;
- grpc_mdelem* server_pushback_md = nullptr;
- if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { // Case (a).
- call_finished = true;
- grpc_error_get_status(error, calld->deadline, &status, nullptr, nullptr,
- nullptr);
- } else if (batch_data->batch.recv_trailing_metadata) { // Case (b).
- call_finished = true;
- grpc_metadata_batch* md_batch =
- batch_data->batch.payload->recv_trailing_metadata
- .recv_trailing_metadata;
- GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
- status = grpc_get_status_code_from_metadata(
- md_batch->idx.named.grpc_status->md);
- if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
- server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
- }
- }
- // If the call just finished, check if we should retry.
- if (call_finished) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand,
- calld, grpc_status_code_to_string(status));
- }
- if (maybe_retry(elem, batch_data, status, server_pushback_md)) {
- // Unref batch_data for deferred recv_initial_metadata_ready or
- // recv_message_ready callbacks, if any.
- if (batch_data->batch.recv_trailing_metadata &&
- retry_state->recv_initial_metadata_ready_deferred_batch !=
- nullptr) {
- batch_data_unref(batch_data);
- GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
- }
- if (batch_data->batch.recv_trailing_metadata &&
- retry_state->recv_message_ready_deferred_batch != nullptr) {
- batch_data_unref(batch_data);
- GRPC_ERROR_UNREF(retry_state->recv_message_error);
- }
- // Track number of pending subchannel send batches and determine if
- // this was the last one.
- bool last_callback_complete = false;
- if (batch_data->batch.send_initial_metadata ||
- batch_data->batch.send_message ||
- batch_data->batch.send_trailing_metadata) {
- --calld->num_pending_retriable_subchannel_send_batches;
- last_callback_complete =
- calld->num_pending_retriable_subchannel_send_batches == 0;
- }
- batch_data_unref(batch_data);
- // If we just completed the last subchannel send batch, unref the
- // call stack.
- if (last_callback_complete) {
- GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches");
- }
- return;
- }
- // Not retrying, so commit the call.
- retry_commit(elem, retry_state);
- }
+ if (batch_data->batch.send_initial_metadata) {
+ retry_state->completed_send_initial_metadata = true;
+ }
+ if (batch_data->batch.send_message) {
+ ++retry_state->completed_send_message_count;
}
- // If we were already committed before receiving this callback, free
- // cached data for send ops that we've just completed. (If the call has
- // just now finished, the call to retry_commit() above will have freed all
- // cached send ops, so we don't need to do it here.)
- if (previously_committed) {
+ if (batch_data->batch.send_trailing_metadata) {
+ retry_state->completed_send_trailing_metadata = true;
+ }
+ // If the call is committed, free cached data for send ops that we've just
+ // completed.
+ if (calld->retry_committed) {
free_cached_send_op_data_for_completed_batch(elem, batch_data, retry_state);
}
- // Call not being retried.
// Construct list of closures to execute.
- // Max number of closures is number of pending batches plus one for
- // each of:
- // - recv_initial_metadata_ready (either deferred or unstarted)
- // - recv_message_ready (either deferred or unstarted)
- // - starting a new batch for pending send ops
- closure_to_execute closures[GPR_ARRAY_SIZE(calld->pending_batches) + 3];
- size_t num_closures = 0;
- // If there are deferred recv_initial_metadata_ready or recv_message_ready
- // callbacks, add them to closures.
- add_closures_for_deferred_recv_callbacks(batch_data, retry_state, closures,
- &num_closures);
- // Find pending batches whose ops are now complete and add their
- // on_complete callbacks to closures.
- add_closures_for_completed_pending_batches(elem, batch_data, retry_state,
- GRPC_ERROR_REF(error), closures,
- &num_closures);
- // Add closures to handle any pending batches that have not yet been started.
- // If the call is finished, we fail these batches; otherwise, we add a
- // callback to start_retriable_subchannel_batches() to start them on
- // the subchannel call.
- if (call_finished) {
- add_closures_to_fail_unstarted_pending_batches(
- elem, retry_state, GRPC_ERROR_REF(error), closures, &num_closures);
- } else {
- add_closures_for_replay_or_pending_send_ops(elem, batch_data, retry_state,
- closures, &num_closures);
+ grpc_core::CallCombinerClosureList closures;
+ // If a retry was already dispatched, that means we saw
+ // recv_trailing_metadata before this, so we do nothing here.
+ // Otherwise, invoke the callback to return the result to the surface.
+ if (!retry_state->retry_dispatched) {
+ // Add closure for the completed pending batch, if any.
+ add_closure_for_completed_pending_batch(elem, batch_data, retry_state,
+ GRPC_ERROR_REF(error), &closures);
+ // If needed, add a callback to start any replay or pending send ops on
+ // the subchannel call.
+ if (!retry_state->completed_recv_trailing_metadata) {
+ add_closures_for_replay_or_pending_send_ops(elem, batch_data, retry_state,
+ &closures);
+ }
}
// Track number of pending subchannel send batches and determine if this
// was the last one.
- bool last_callback_complete = false;
- if (batch_data->batch.send_initial_metadata ||
- batch_data->batch.send_message ||
- batch_data->batch.send_trailing_metadata) {
- --calld->num_pending_retriable_subchannel_send_batches;
- last_callback_complete =
- calld->num_pending_retriable_subchannel_send_batches == 0;
- }
+ --calld->num_pending_retriable_subchannel_send_batches;
+ const bool last_send_batch_complete =
+ calld->num_pending_retriable_subchannel_send_batches == 0;
// Don't need batch_data anymore.
batch_data_unref(batch_data);
// Schedule all of the closures identified above.
// Note: This yeilds the call combiner.
- execute_closures_in_call_combiner(elem, "on_complete", closures,
- num_closures);
- // If we just completed the last subchannel send batch, unref the call stack.
- if (last_callback_complete) {
+ closures.RunClosures(calld->call_combiner);
+ // If this was the last subchannel send batch, unref the call stack.
+ if (last_send_batch_complete) {
GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches");
}
}
@@ -2172,27 +2076,22 @@ static void start_batch_in_call_combiner(void* arg, grpc_error* ignored) {
// Adds a closure to closures that will execute batch in the call combiner.
static void add_closure_for_subchannel_batch(
- call_data* calld, grpc_transport_stream_op_batch* batch,
- closure_to_execute* closures, size_t* num_closures) {
+ grpc_call_element* elem, grpc_transport_stream_op_batch* batch,
+ grpc_core::CallCombinerClosureList* closures) {
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
batch->handler_private.extra_arg = calld->subchannel_call;
GRPC_CLOSURE_INIT(&batch->handler_private.closure,
start_batch_in_call_combiner, batch,
grpc_schedule_on_exec_ctx);
- closure_to_execute* closure = &closures[(*num_closures)++];
- closure->closure = &batch->handler_private.closure;
- closure->error = GRPC_ERROR_NONE;
- // If the tracer is enabled, we log a more detailed message, which
- // requires dynamic allocation. This will be freed in
- // start_retriable_subchannel_batches().
if (grpc_client_channel_trace.enabled()) {
char* batch_str = grpc_transport_stream_op_batch_string(batch);
- gpr_asprintf(const_cast<char**>(&closure->reason),
- "starting batch in call combiner: %s", batch_str);
+ gpr_log(GPR_INFO, "chand=%p calld=%p: starting subchannel batch: %s", chand,
+ calld, batch_str);
gpr_free(batch_str);
- closure->free_reason = true;
- } else {
- closure->reason = "start_subchannel_batch";
}
+ closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
+ "start_subchannel_batch");
}
// Adds retriable send_initial_metadata op to batch_data.
@@ -2328,9 +2227,13 @@ static void add_retriable_recv_trailing_metadata_op(
grpc_metadata_batch_init(&batch_data->recv_trailing_metadata);
batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata =
&batch_data->recv_trailing_metadata;
- batch_data->batch.collect_stats = true;
- batch_data->batch.payload->collect_stats.collect_stats =
+ batch_data->batch.payload->recv_trailing_metadata.collect_stats =
&batch_data->collect_stats;
+ GRPC_CLOSURE_INIT(&batch_data->recv_trailing_metadata_ready,
+ recv_trailing_metadata_ready, batch_data,
+ grpc_schedule_on_exec_ctx);
+ batch_data->batch.payload->recv_trailing_metadata
+ .recv_trailing_metadata_ready = &batch_data->recv_trailing_metadata_ready;
}
// Helper function used to start a recv_trailing_metadata batch. This
@@ -2351,9 +2254,11 @@ static void start_internal_recv_trailing_metadata(grpc_call_element* elem) {
grpc_connected_subchannel_call_get_parent_data(
calld->subchannel_call));
// Create batch_data with 2 refs, since this batch will be unreffed twice:
- // once when the subchannel batch returns, and again when we actually get
- // a recv_trailing_metadata op from the surface.
- subchannel_batch_data* batch_data = batch_data_create(elem, 2);
+ // once for the recv_trailing_metadata_ready callback when the subchannel
+ // batch returns, and again when we actually get a recv_trailing_metadata
+ // op from the surface.
+ subchannel_batch_data* batch_data =
+ batch_data_create(elem, 2, false /* set_on_complete */);
add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
retry_state->recv_trailing_metadata_internal_batch = batch_data;
// Note: This will release the call combiner.
@@ -2378,7 +2283,7 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay(
"send_initial_metadata op",
chand, calld);
}
- replay_batch_data = batch_data_create(elem, 1);
+ replay_batch_data = batch_data_create(elem, 1, true /* set_on_complete */);
add_retriable_send_initial_metadata_op(calld, retry_state,
replay_batch_data);
}
@@ -2395,7 +2300,8 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay(
chand, calld);
}
if (replay_batch_data == nullptr) {
- replay_batch_data = batch_data_create(elem, 1);
+ replay_batch_data =
+ batch_data_create(elem, 1, true /* set_on_complete */);
}
add_retriable_send_message_op(elem, retry_state, replay_batch_data);
}
@@ -2414,7 +2320,8 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay(
chand, calld);
}
if (replay_batch_data == nullptr) {
- replay_batch_data = batch_data_create(elem, 1);
+ replay_batch_data =
+ batch_data_create(elem, 1, true /* set_on_complete */);
}
add_retriable_send_trailing_metadata_op(calld, retry_state,
replay_batch_data);
@@ -2426,7 +2333,7 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay(
// *num_batches as needed.
static void add_subchannel_batches_for_pending_batches(
grpc_call_element* elem, subchannel_call_retry_state* retry_state,
- closure_to_execute* closures, size_t* num_closures) {
+ grpc_core::CallCombinerClosureList* closures) {
call_data* calld = static_cast<call_data*>(elem->call_data);
for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
pending_batch* pending = &calld->pending_batches[i];
@@ -2482,13 +2389,11 @@ static void add_subchannel_batches_for_pending_batches(
if (retry_state->completed_recv_trailing_metadata) {
subchannel_batch_data* batch_data =
retry_state->recv_trailing_metadata_internal_batch;
- closure_to_execute* closure = &closures[(*num_closures)++];
- closure->closure = &batch_data->on_complete;
// Batches containing recv_trailing_metadata always succeed.
- closure->error = GRPC_ERROR_NONE;
- closure->reason =
- "re-executing on_complete for recv_trailing_metadata "
- "to propagate internally triggered result";
+ closures->Add(
+ &batch_data->recv_trailing_metadata_ready, GRPC_ERROR_NONE,
+ "re-executing recv_trailing_metadata_ready to propagate "
+ "internally triggered result");
} else {
batch_data_unref(retry_state->recv_trailing_metadata_internal_batch);
}
@@ -2500,14 +2405,19 @@ static void add_subchannel_batches_for_pending_batches(
if (calld->method_params == nullptr ||
calld->method_params->retry_policy() == nullptr ||
calld->retry_committed) {
- add_closure_for_subchannel_batch(calld, batch, closures, num_closures);
+ add_closure_for_subchannel_batch(elem, batch, closures);
pending_batch_clear(calld, pending);
continue;
}
// Create batch with the right number of callbacks.
- const int num_callbacks =
- 1 + batch->recv_initial_metadata + batch->recv_message;
- subchannel_batch_data* batch_data = batch_data_create(elem, num_callbacks);
+ const bool has_send_ops = batch->send_initial_metadata ||
+ batch->send_message ||
+ batch->send_trailing_metadata;
+ const int num_callbacks = has_send_ops + batch->recv_initial_metadata +
+ batch->recv_message +
+ batch->recv_trailing_metadata;
+ subchannel_batch_data* batch_data = batch_data_create(
+ elem, num_callbacks, has_send_ops /* set_on_complete */);
// Cache send ops if needed.
maybe_cache_send_ops_for_batch(calld, pending);
// send_initial_metadata.
@@ -2534,11 +2444,9 @@ static void add_subchannel_batches_for_pending_batches(
}
// recv_trailing_metadata.
if (batch->recv_trailing_metadata) {
- GPR_ASSERT(batch->collect_stats);
add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
}
- add_closure_for_subchannel_batch(calld, &batch_data->batch, closures,
- num_closures);
+ add_closure_for_subchannel_batch(elem, &batch_data->batch, closures);
// Track number of pending subchannel send batches.
// If this is the first one, take a ref to the call stack.
if (batch->send_initial_metadata || batch->send_message ||
@@ -2566,15 +2474,13 @@ static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) {
grpc_connected_subchannel_call_get_parent_data(
calld->subchannel_call));
// Construct list of closures to execute, one for each pending batch.
- // We can start up to 6 batches.
- closure_to_execute closures[GPR_ARRAY_SIZE(calld->pending_batches)];
- size_t num_closures = 0;
+ grpc_core::CallCombinerClosureList closures;
// Replay previously-returned send_* ops if needed.
subchannel_batch_data* replay_batch_data =
maybe_create_subchannel_batch_for_replay(elem, retry_state);
if (replay_batch_data != nullptr) {
- add_closure_for_subchannel_batch(calld, &replay_batch_data->batch, closures,
- &num_closures);
+ add_closure_for_subchannel_batch(elem, &replay_batch_data->batch,
+ &closures);
// Track number of pending subchannel send batches.
// If this is the first one, take a ref to the call stack.
if (calld->num_pending_retriable_subchannel_send_batches == 0) {
@@ -2583,17 +2489,16 @@ static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) {
++calld->num_pending_retriable_subchannel_send_batches;
}
// Now add pending batches.
- add_subchannel_batches_for_pending_batches(elem, retry_state, closures,
- &num_closures);
+ add_subchannel_batches_for_pending_batches(elem, retry_state, &closures);
// Start batches on subchannel call.
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: starting %" PRIuPTR
" retriable batches on subchannel_call=%p",
- chand, calld, num_closures, calld->subchannel_call);
+ chand, calld, closures.size(), calld->subchannel_call);
}
- execute_closures_in_call_combiner(elem, "start_retriable_subchannel_batches",
- closures, num_closures);
+ // Note: This will yield the call combiner.
+ closures.RunClosures(calld->call_combiner);
}
//
diff --git a/src/core/ext/filters/deadline/deadline_filter.cc b/src/core/ext/filters/deadline/deadline_filter.cc
index caa547204b..456f9b8f1e 100644
--- a/src/core/ext/filters/deadline/deadline_filter.cc
+++ b/src/core/ext/filters/deadline/deadline_filter.cc
@@ -128,21 +128,25 @@ static void cancel_timer_if_needed(grpc_deadline_state* deadline_state) {
}
}
-// Callback run when the call is complete.
-static void on_complete(void* arg, grpc_error* error) {
+// Callback run when we receive trailing metadata.
+static void recv_trailing_metadata_ready(void* arg, grpc_error* error) {
grpc_deadline_state* deadline_state = static_cast<grpc_deadline_state*>(arg);
cancel_timer_if_needed(deadline_state);
- // Invoke the next callback.
- GRPC_CLOSURE_RUN(deadline_state->next_on_complete, GRPC_ERROR_REF(error));
+ // Invoke the original callback.
+ GRPC_CLOSURE_RUN(deadline_state->original_recv_trailing_metadata_ready,
+ GRPC_ERROR_REF(error));
}
-// Inject our own on_complete callback into op.
-static void inject_on_complete_cb(grpc_deadline_state* deadline_state,
- grpc_transport_stream_op_batch* op) {
- deadline_state->next_on_complete = op->on_complete;
- GRPC_CLOSURE_INIT(&deadline_state->on_complete, on_complete, deadline_state,
+// Inject our own recv_trailing_metadata_ready callback into op.
+static void inject_recv_trailing_metadata_ready(
+ grpc_deadline_state* deadline_state, grpc_transport_stream_op_batch* op) {
+ deadline_state->original_recv_trailing_metadata_ready =
+ op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
+ GRPC_CLOSURE_INIT(&deadline_state->recv_trailing_metadata_ready,
+ recv_trailing_metadata_ready, deadline_state,
grpc_schedule_on_exec_ctx);
- op->on_complete = &deadline_state->on_complete;
+ op->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
+ &deadline_state->recv_trailing_metadata_ready;
}
// Callback and associated state for starting the timer after call stack
@@ -226,7 +230,7 @@ void grpc_deadline_state_client_start_transport_stream_op_batch(
// Make sure we know when the call is complete, so that we can cancel
// the timer.
if (op->recv_trailing_metadata) {
- inject_on_complete_cb(deadline_state, op);
+ inject_recv_trailing_metadata_ready(deadline_state, op);
}
}
}
@@ -322,7 +326,7 @@ static void server_start_transport_stream_op_batch(
// the client never sends trailing metadata, because this is the
// hook that tells us when the call is complete on the server side.
if (op->recv_trailing_metadata) {
- inject_on_complete_cb(&calld->base.deadline_state, op);
+ inject_recv_trailing_metadata_ready(&calld->base.deadline_state, op);
}
}
// Chain to next filter.
diff --git a/src/core/ext/filters/deadline/deadline_filter.h b/src/core/ext/filters/deadline/deadline_filter.h
index 13207cbd6f..1d797f445a 100644
--- a/src/core/ext/filters/deadline/deadline_filter.h
+++ b/src/core/ext/filters/deadline/deadline_filter.h
@@ -37,12 +37,12 @@ typedef struct grpc_deadline_state {
grpc_deadline_timer_state timer_state;
grpc_timer timer;
grpc_closure timer_callback;
- // Closure to invoke when the call is complete.
+ // Closure to invoke when we receive trailing metadata.
// We use this to cancel the timer.
- grpc_closure on_complete;
- // The original on_complete closure, which we chain to after our own
- // closure is invoked.
- grpc_closure* next_on_complete;
+ grpc_closure recv_trailing_metadata_ready;
+ // The original recv_trailing_metadata_ready closure, which we chain to
+ // after our own closure is invoked.
+ grpc_closure* original_recv_trailing_metadata_ready;
} grpc_deadline_state;
//
diff --git a/src/core/ext/filters/http/client/http_client_filter.cc b/src/core/ext/filters/http/client/http_client_filter.cc
index fe7f2ba705..4585892fc0 100644
--- a/src/core/ext/filters/http/client/http_client_filter.cc
+++ b/src/core/ext/filters/http/client/http_client_filter.cc
@@ -56,8 +56,8 @@ struct call_data {
grpc_closure recv_initial_metadata_ready;
// State for handling recv_trailing_metadata ops.
grpc_metadata_batch* recv_trailing_metadata;
- grpc_closure* original_recv_trailing_metadata_on_complete;
- grpc_closure recv_trailing_metadata_on_complete;
+ grpc_closure* original_recv_trailing_metadata_ready;
+ grpc_closure recv_trailing_metadata_ready;
// State for handling send_message ops.
grpc_transport_stream_op_batch* send_message_batch;
size_t send_message_bytes_read;
@@ -154,8 +154,7 @@ static void recv_initial_metadata_ready(void* user_data, grpc_error* error) {
GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready, error);
}
-static void recv_trailing_metadata_on_complete(void* user_data,
- grpc_error* error) {
+static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
if (error == GRPC_ERROR_NONE) {
@@ -164,7 +163,7 @@ static void recv_trailing_metadata_on_complete(void* user_data,
} else {
GRPC_ERROR_REF(error);
}
- GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_on_complete, error);
+ GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, error);
}
static void send_message_on_complete(void* arg, grpc_error* error) {
@@ -313,8 +312,10 @@ static void hc_start_transport_stream_op_batch(
/* substitute our callback for the higher callback */
calld->recv_trailing_metadata =
batch->payload->recv_trailing_metadata.recv_trailing_metadata;
- calld->original_recv_trailing_metadata_on_complete = batch->on_complete;
- batch->on_complete = &calld->recv_trailing_metadata_on_complete;
+ calld->original_recv_trailing_metadata_ready =
+ batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
+ batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
+ &calld->recv_trailing_metadata_ready;
}
grpc_error* error = GRPC_ERROR_NONE;
@@ -421,8 +422,8 @@ static grpc_error* init_call_elem(grpc_call_element* elem,
GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
recv_initial_metadata_ready, elem,
grpc_schedule_on_exec_ctx);
- GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_on_complete,
- recv_trailing_metadata_on_complete, elem,
+ GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,
+ recv_trailing_metadata_ready, elem,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&calld->send_message_on_complete, send_message_on_complete,
elem, grpc_schedule_on_exec_ctx);
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
index 1a924e66e6..365a24785c 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
@@ -1148,12 +1148,10 @@ static void maybe_start_some_streams(grpc_chttp2_transport* t) {
}
}
-/* Flag that this closure barrier wants stats to be updated before finishing */
-#define CLOSURE_BARRIER_STATS_BIT (1 << 0)
/* Flag that this closure barrier may be covering a write in a pollset, and so
we should not complete this closure until we can prove that the write got
scheduled */
-#define CLOSURE_BARRIER_MAY_COVER_WRITE (1 << 1)
+#define CLOSURE_BARRIER_MAY_COVER_WRITE (1 << 0)
/* First bit of the reference count, stored in the high order bits (with the low
bits being used for flags defined above) */
#define CLOSURE_BARRIER_FIRST_REF_BIT (1 << 16)
@@ -1205,10 +1203,6 @@ void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
grpc_error_add_child(closure->error_data.error, error);
}
if (closure->next_data.scratch < CLOSURE_BARRIER_FIRST_REF_BIT) {
- if (closure->next_data.scratch & CLOSURE_BARRIER_STATS_BIT) {
- grpc_transport_move_stats(&s->stats, s->collecting_stats);
- s->collecting_stats = nullptr;
- }
if ((t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) ||
!(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) {
GRPC_CLOSURE_RUN(closure, closure->error_data.error);
@@ -1350,9 +1344,14 @@ static void perform_stream_op_locked(void* stream_op,
}
grpc_closure* on_complete = op->on_complete;
+ // TODO(roth): This is a hack needed because we use data inside of the
+ // closure itself to do the barrier calculation (i.e., to ensure that
+ // we don't schedule the closure until all ops in the batch have been
+ // completed). This can go away once we move to a new C++ closure API
+ // that provides the ability to create a barrier closure.
if (on_complete == nullptr) {
- on_complete =
- GRPC_CLOSURE_CREATE(do_nothing, nullptr, grpc_schedule_on_exec_ctx);
+ on_complete = GRPC_CLOSURE_INIT(&op->handler_private.closure, do_nothing,
+ nullptr, grpc_schedule_on_exec_ctx);
}
/* use final_data as a barrier until enqueue time; the inital counter is
@@ -1360,12 +1359,6 @@ static void perform_stream_op_locked(void* stream_op,
on_complete->next_data.scratch = CLOSURE_BARRIER_FIRST_REF_BIT;
on_complete->error_data.error = GRPC_ERROR_NONE;
- if (op->collect_stats) {
- GPR_ASSERT(s->collecting_stats == nullptr);
- s->collecting_stats = op_payload->collect_stats.collect_stats;
- on_complete->next_data.scratch |= CLOSURE_BARRIER_STATS_BIT;
- }
-
if (op->cancel_stream) {
GRPC_STATS_INC_HTTP2_OP_CANCEL();
grpc_chttp2_cancel_stream(t, s, op_payload->cancel_stream.cancel_error);
@@ -1599,8 +1592,11 @@ static void perform_stream_op_locked(void* stream_op,
if (op->recv_trailing_metadata) {
GRPC_STATS_INC_HTTP2_OP_RECV_TRAILING_METADATA();
+ GPR_ASSERT(s->collecting_stats == nullptr);
+ s->collecting_stats = op_payload->recv_trailing_metadata.collect_stats;
GPR_ASSERT(s->recv_trailing_metadata_finished == nullptr);
- s->recv_trailing_metadata_finished = add_closure_barrier(on_complete);
+ s->recv_trailing_metadata_finished =
+ op_payload->recv_trailing_metadata.recv_trailing_metadata_ready;
s->recv_trailing_metadata =
op_payload->recv_trailing_metadata.recv_trailing_metadata;
s->final_metadata_requested = true;
@@ -1959,11 +1955,12 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t,
}
if (s->read_closed && s->frame_storage.length == 0 && !pending_data &&
s->recv_trailing_metadata_finished != nullptr) {
+ grpc_transport_move_stats(&s->stats, s->collecting_stats);
+ s->collecting_stats = nullptr;
grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[1],
s->recv_trailing_metadata);
- grpc_chttp2_complete_closure_step(
- t, s, &s->recv_trailing_metadata_finished, GRPC_ERROR_NONE,
- "recv_trailing_metadata_finished");
+ null_then_run_closure(&s->recv_trailing_metadata_finished,
+ GRPC_ERROR_NONE);
}
}
}
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.cc b/src/core/ext/transport/cronet/transport/cronet_transport.cc
index 5d614c1d05..436997a42e 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.cc
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.cc
@@ -925,6 +925,10 @@ static bool op_can_be_run(grpc_transport_stream_op_batch* curr_op,
result = false;
}
/* Check if every op that was asked for is done. */
+ /* TODO(muxi): We should not consider the recv ops here, since they
+ * have their own callbacks. We should invoke a batch's on_complete
+ * as soon as all of the batch's send ops are complete, even if
+ * there are still recv ops pending. */
else if (curr_op->send_initial_metadata &&
!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) {
CRONET_LOG(GPR_DEBUG, "Because");
@@ -1280,12 +1284,20 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
op_can_be_run(stream_op, s, &oas->state,
OP_RECV_TRAILING_METADATA)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_TRAILING_METADATA", oas);
- if (oas->s->state.rs.trailing_metadata_valid) {
+ grpc_error* error = GRPC_ERROR_NONE;
+ if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
+ error = GRPC_ERROR_REF(stream_state->cancel_error);
+ } else if (stream_state->state_op_done[OP_FAILED]) {
+ error = make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable.");
+ } else if (oas->s->state.rs.trailing_metadata_valid) {
grpc_chttp2_incoming_metadata_buffer_publish(
&oas->s->state.rs.trailing_metadata,
stream_op->payload->recv_trailing_metadata.recv_trailing_metadata);
stream_state->rs.trailing_metadata_valid = false;
}
+ GRPC_CLOSURE_SCHED(
+ stream_op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
+ error);
stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true;
result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_op->cancel_stream &&
@@ -1398,6 +1410,11 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
GRPC_CLOSURE_SCHED(op->payload->recv_message.recv_message_ready,
GRPC_ERROR_CANCELLED);
}
+ if (op->recv_trailing_metadata) {
+ GRPC_CLOSURE_SCHED(
+ op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
+ GRPC_ERROR_CANCELLED);
+ }
GRPC_CLOSURE_SCHED(op->on_complete, GRPC_ERROR_CANCELLED);
return;
}
diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc
index 6e5aa5a46b..ac83c7c64d 100644
--- a/src/core/ext/transport/inproc/inproc_transport.cc
+++ b/src/core/ext/transport/inproc/inproc_transport.cc
@@ -120,7 +120,6 @@ typedef struct inproc_stream {
struct inproc_stream* stream_list_next;
} inproc_stream;
-static grpc_closure do_nothing_closure;
static bool cancel_stream_locked(inproc_stream* s, grpc_error* error);
static void op_state_machine(void* arg, grpc_error* error);
@@ -373,6 +372,10 @@ static void complete_if_batch_end_locked(inproc_stream* s, grpc_error* error,
const char* msg) {
int is_sm = static_cast<int>(op == s->send_message_op);
int is_stm = static_cast<int>(op == s->send_trailing_md_op);
+ // TODO(vjpai): We should not consider the recv ops here, since they
+ // have their own callbacks. We should invoke a batch's on_complete
+ // as soon as all of the batch's send ops are complete, even if there
+ // are still recv ops pending.
int is_rim = static_cast<int>(op == s->recv_initial_md_op);
int is_rm = static_cast<int>(op == s->recv_message_op);
int is_rtm = static_cast<int>(op == s->recv_trailing_md_op);
@@ -496,6 +499,11 @@ static void fail_helper_locked(inproc_stream* s, grpc_error* error) {
s->send_trailing_md_op = nullptr;
}
if (s->recv_trailing_md_op) {
+ INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-metadata-ready %p",
+ s, error);
+ GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata
+ .recv_trailing_metadata_ready,
+ GRPC_ERROR_REF(error));
INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-md-on-complete %p",
s, error);
complete_if_batch_end_locked(
@@ -639,6 +647,12 @@ static void op_state_machine(void* arg, grpc_error* error) {
s->trailing_md_sent = true;
if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
INPROC_LOG(GPR_INFO,
+ "op_state_machine %p scheduling trailing-metadata-ready", s);
+ GRPC_CLOSURE_SCHED(
+ s->recv_trailing_md_op->payload->recv_trailing_metadata
+ .recv_trailing_metadata_ready,
+ GRPC_ERROR_NONE);
+ INPROC_LOG(GPR_INFO,
"op_state_machine %p scheduling trailing-md-on-complete", s);
GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->on_complete,
GRPC_ERROR_NONE);
@@ -711,6 +725,12 @@ static void op_state_machine(void* arg, grpc_error* error) {
}
if (s->recv_trailing_md_op && s->t->is_client && other &&
other->send_message_op) {
+ INPROC_LOG(GPR_INFO,
+ "op_state_machine %p scheduling trailing-metadata-ready %p", s,
+ GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata
+ .recv_trailing_metadata_ready,
+ GRPC_ERROR_NONE);
maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE);
}
if (s->to_read_trailing_md_filled) {
@@ -766,6 +786,10 @@ static void op_state_machine(void* arg, grpc_error* error) {
INPROC_LOG(GPR_INFO,
"op_state_machine %p scheduling trailing-md-on-complete %p",
s, new_err);
+ GRPC_CLOSURE_SCHED(
+ s->recv_trailing_md_op->payload->recv_trailing_metadata
+ .recv_trailing_metadata_ready,
+ GRPC_ERROR_REF(new_err));
GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->on_complete,
GRPC_ERROR_REF(new_err));
s->recv_trailing_md_op = nullptr;
@@ -859,6 +883,9 @@ static bool cancel_stream_locked(inproc_stream* s, grpc_error* error) {
// couldn't complete that because we hadn't yet sent out trailing
// md, now's the chance
if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
+ GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata
+ .recv_trailing_metadata_ready,
+ GRPC_ERROR_REF(s->cancel_self_error));
complete_if_batch_end_locked(
s, s->cancel_self_error, s->recv_trailing_md_op,
"cancel_stream scheduling trailing-md-on-complete");
@@ -873,6 +900,8 @@ static bool cancel_stream_locked(inproc_stream* s, grpc_error* error) {
return ret;
}
+static void do_nothing(void* arg, grpc_error* error) {}
+
static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
grpc_transport_stream_op_batch* op) {
INPROC_LOG(GPR_INFO, "perform_stream_op %p %p %p", gt, gs, op);
@@ -892,8 +921,14 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
}
grpc_error* error = GRPC_ERROR_NONE;
grpc_closure* on_complete = op->on_complete;
+ // TODO(roth): This is a hack needed because we use data inside of the
+ // closure itself to do the barrier calculation (i.e., to ensure that
+ // we don't schedule the closure until all ops in the batch have been
+ // completed). This can go away once we move to a new C++ closure API
+ // that provides the ability to create a barrier closure.
if (on_complete == nullptr) {
- on_complete = &do_nothing_closure;
+ on_complete = GRPC_CLOSURE_INIT(&op->handler_private.closure, do_nothing,
+ nullptr, grpc_schedule_on_exec_ctx);
}
if (op->cancel_stream) {
@@ -1026,6 +1061,15 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
GRPC_CLOSURE_SCHED(op->payload->recv_message.recv_message_ready,
GRPC_ERROR_REF(error));
}
+ if (op->recv_trailing_metadata) {
+ INPROC_LOG(
+ GPR_INFO,
+ "perform_stream_op error %p scheduling trailing-metadata-ready %p",
+ s, error);
+ GRPC_CLOSURE_SCHED(
+ op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
+ GRPC_ERROR_REF(error));
+ }
}
INPROC_LOG(GPR_INFO, "perform_stream_op %p scheduling on_complete %p", s,
error);
@@ -1129,12 +1173,8 @@ static grpc_endpoint* get_endpoint(grpc_transport* t) { return nullptr; }
/*******************************************************************************
* GLOBAL INIT AND DESTROY
*/
-static void do_nothing(void* arg, grpc_error* error) {}
-
void grpc_inproc_transport_init(void) {
grpc_core::ExecCtx exec_ctx;
- GRPC_CLOSURE_INIT(&do_nothing_closure, do_nothing, nullptr,
- grpc_schedule_on_exec_ctx);
g_empty_slice = grpc_slice_from_static_buffer(nullptr, 0);
grpc_slice key_tmp = grpc_slice_from_static_string(":path");
diff --git a/src/core/lib/channel/connected_channel.cc b/src/core/lib/channel/connected_channel.cc
index ddd3029402..e2ea334ded 100644
--- a/src/core/lib/channel/connected_channel.cc
+++ b/src/core/lib/channel/connected_channel.cc
@@ -51,6 +51,7 @@ typedef struct connected_channel_call_data {
callback_state on_complete[6]; // Max number of pending batches.
callback_state recv_initial_metadata_ready;
callback_state recv_message_ready;
+ callback_state recv_trailing_metadata_ready;
} call_data;
static void run_in_call_combiner(void* arg, grpc_error* error) {
@@ -111,6 +112,12 @@ static void con_start_transport_stream_op_batch(
intercept_callback(calld, state, false, "recv_message_ready",
&batch->payload->recv_message.recv_message_ready);
}
+ if (batch->recv_trailing_metadata) {
+ callback_state* state = &calld->recv_trailing_metadata_ready;
+ intercept_callback(
+ calld, state, false, "recv_trailing_metadata_ready",
+ &batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready);
+ }
if (batch->cancel_stream) {
// There can be more than one cancellation batch in flight at any
// given time, so we can't just pick out a fixed index into
@@ -121,7 +128,7 @@ static void con_start_transport_stream_op_batch(
static_cast<callback_state*>(gpr_malloc(sizeof(*state)));
intercept_callback(calld, state, true, "on_complete (cancel_stream)",
&batch->on_complete);
- } else {
+ } else if (batch->on_complete != nullptr) {
callback_state* state = get_state_for_batch(calld, batch);
intercept_callback(calld, state, false, "on_complete", &batch->on_complete);
}
diff --git a/src/core/lib/gprpp/inlined_vector.h b/src/core/lib/gprpp/inlined_vector.h
index f36f6cb706..0d2586e507 100644
--- a/src/core/lib/gprpp/inlined_vector.h
+++ b/src/core/lib/gprpp/inlined_vector.h
@@ -99,6 +99,8 @@ class InlinedVector {
void push_back(T&& value) { emplace_back(std::move(value)); }
size_t size() const { return size_; }
+ bool empty() const { return size_ == 0; }
+
size_t capacity() const { return capacity_; }
void clear() {
diff --git a/src/core/lib/iomgr/call_combiner.h b/src/core/lib/iomgr/call_combiner.h
index 0ccd08ea57..641fa18082 100644
--- a/src/core/lib/iomgr/call_combiner.h
+++ b/src/core/lib/iomgr/call_combiner.h
@@ -26,6 +26,7 @@
#include <grpc/support/atm.h>
#include "src/core/lib/gpr/mpscq.h"
+#include "src/core/lib/gprpp/inlined_vector.h"
#include "src/core/lib/iomgr/closure.h"
// A simple, lock-free mechanism for serializing activity related to a
@@ -109,4 +110,83 @@ void grpc_call_combiner_set_notify_on_cancel(grpc_call_combiner* call_combiner,
void grpc_call_combiner_cancel(grpc_call_combiner* call_combiner,
grpc_error* error);
+namespace grpc_core {
+
+// Helper for running a list of closures in a call combiner.
+//
+// Each callback running in the call combiner will eventually be
+// returned to the surface, at which point the surface will yield the
+// call combiner. So when we are running in the call combiner and have
+// more than one callback to return to the surface, we need to re-enter
+// the call combiner for all but one of those callbacks.
+class CallCombinerClosureList {
+ public:
+ CallCombinerClosureList() {}
+
+ // Adds a closure to the list. The closure must eventually result in
+ // the call combiner being yielded.
+ void Add(grpc_closure* closure, grpc_error* error, const char* reason) {
+ closures_.emplace_back(closure, error, reason);
+ }
+
+ // Runs all closures in the call combiner and yields the call combiner.
+ //
+ // All but one of the closures in the list will be scheduled via
+ // GRPC_CALL_COMBINER_START(), and the remaining closure will be
+ // scheduled via GRPC_CLOSURE_SCHED(), which will eventually result in
+ // yielding the call combiner. If the list is empty, then the call
+ // combiner will be yielded immediately.
+ void RunClosures(grpc_call_combiner* call_combiner) {
+ if (closures_.empty()) {
+ GRPC_CALL_COMBINER_STOP(call_combiner, "no closures to schedule");
+ return;
+ }
+ for (size_t i = 1; i < closures_.size(); ++i) {
+ auto& closure = closures_[i];
+ GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error,
+ closure.reason);
+ }
+ if (grpc_call_combiner_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "CallCombinerClosureList executing closure while already "
+ "holding call_combiner %p: closure=%p error=%s reason=%s",
+ call_combiner, closures_[0].closure,
+ grpc_error_string(closures_[0].error), closures_[0].reason);
+ }
+ // This will release the call combiner.
+ GRPC_CLOSURE_SCHED(closures_[0].closure, closures_[0].error);
+ closures_.clear();
+ }
+
+ // Runs all closures in the call combiner, but does NOT yield the call
+ // combiner. All closures will be scheduled via GRPC_CALL_COMBINER_START().
+ void RunClosuresWithoutYielding(grpc_call_combiner* call_combiner) {
+ for (size_t i = 0; i < closures_.size(); ++i) {
+ auto& closure = closures_[i];
+ GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error,
+ closure.reason);
+ }
+ closures_.clear();
+ }
+
+ size_t size() const { return closures_.size(); }
+
+ private:
+ struct CallCombinerClosure {
+ grpc_closure* closure;
+ grpc_error* error;
+ const char* reason;
+
+ CallCombinerClosure(grpc_closure* closure, grpc_error* error,
+ const char* reason)
+ : closure(closure), error(error), reason(reason) {}
+ };
+
+ // There are generally a maximum of 6 closures to run in the call
+ // combiner, one for each pending op.
+ InlinedVector<CallCombinerClosure, 6> closures_;
+};
+
+} // namespace grpc_core
+
#endif /* GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H */
diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h
index 34a494485d..f14c723844 100644
--- a/src/core/lib/iomgr/closure.h
+++ b/src/core/lib/iomgr/closure.h
@@ -283,9 +283,10 @@ inline void grpc_closure_sched(grpc_closure* c, grpc_error* error) {
if (c->scheduled) {
gpr_log(GPR_ERROR,
"Closure already scheduled. (closure: %p, created: [%s:%d], "
- "previously scheduled at: [%s: %d] run?: %s",
+ "previously scheduled at: [%s: %d], newly scheduled at [%s: %d], "
+ "run?: %s",
c, c->file_created, c->line_created, c->file_initiated,
- c->line_initiated, c->run ? "true" : "false");
+ c->line_initiated, file, line, c->run ? "true" : "false");
abort();
}
c->scheduled = true;
diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc
index 1cf8ea94e7..8b224b6e7b 100644
--- a/src/core/lib/surface/call.cc
+++ b/src/core/lib/surface/call.cc
@@ -233,6 +233,7 @@ struct grpc_call {
grpc_closure receiving_slice_ready;
grpc_closure receiving_stream_ready;
grpc_closure receiving_initial_metadata_ready;
+ grpc_closure receiving_trailing_metadata_ready;
uint32_t test_only_last_message_flags;
grpc_closure release_call;
@@ -270,8 +271,17 @@ struct grpc_call {
grpc_core::TraceFlag grpc_call_error_trace(false, "call_error");
grpc_core::TraceFlag grpc_compression_trace(false, "compression");
-#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack*)((call) + 1))
-#define CALL_FROM_CALL_STACK(call_stack) (((grpc_call*)(call_stack)) - 1)
+/* Given a size, round up to the next multiple of sizeof(void*) */
+#define ROUND_UP_TO_ALIGNMENT_SIZE(x) \
+ (((x) + GPR_MAX_ALIGNMENT - 1u) & ~(GPR_MAX_ALIGNMENT - 1u))
+
+#define CALL_STACK_FROM_CALL(call) \
+ (grpc_call_stack*)((char*)(call) + \
+ ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)))
+#define CALL_FROM_CALL_STACK(call_stack) \
+ (grpc_call*)(((char*)(call_stack)) - \
+ ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)))
+
#define CALL_ELEM_FROM_CALL(call, idx) \
grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx)
#define CALL_FROM_TOP_ELEM(top_elem) \
@@ -342,8 +352,9 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
size_t initial_size = grpc_channel_get_call_size_estimate(args->channel);
GRPC_STATS_INC_CALL_INITIAL_SIZE(initial_size);
gpr_arena* arena = gpr_arena_create(initial_size);
- call = static_cast<grpc_call*>(gpr_arena_alloc(
- arena, sizeof(grpc_call) + channel_stack->call_stack_size));
+ call = static_cast<grpc_call*>(
+ gpr_arena_alloc(arena, ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)) +
+ channel_stack->call_stack_size));
gpr_ref_init(&call->ext_ref, 1);
call->arena = arena;
grpc_call_combiner_init(&call->call_combiner);
@@ -1209,7 +1220,6 @@ static void post_batch_completion(batch_control* bctl) {
if (bctl->op.send_initial_metadata) {
grpc_metadata_batch_destroy(
-
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]);
}
if (bctl->op.send_message) {
@@ -1217,14 +1227,9 @@ static void post_batch_completion(batch_control* bctl) {
}
if (bctl->op.send_trailing_metadata) {
grpc_metadata_batch_destroy(
-
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]);
}
if (bctl->op.recv_trailing_metadata) {
- grpc_metadata_batch* md =
- &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
- recv_trailing_filter(call, md);
-
/* propagate cancellation to any interested children */
gpr_atm_rel_store(&call->received_final_op_atm, 1);
parent_call* pc = get_parent_call(call);
@@ -1246,7 +1251,6 @@ static void post_batch_completion(batch_control* bctl) {
}
gpr_mu_unlock(&pc->child_list_mu);
}
-
if (call->is_client) {
get_final_status(call, set_status_value_directly,
call->final_op.client.status,
@@ -1256,7 +1260,6 @@ static void post_batch_completion(batch_control* bctl) {
get_final_status(call, set_cancelled_value,
call->final_op.server.cancelled, nullptr, nullptr);
}
-
GRPC_ERROR_UNREF(error);
error = GRPC_ERROR_NONE;
}
@@ -1538,6 +1541,19 @@ static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) {
finish_batch_step(bctl);
}
+static void receiving_trailing_metadata_ready(void* bctlp, grpc_error* error) {
+ batch_control* bctl = static_cast<batch_control*>(bctlp);
+ grpc_call* call = bctl->call;
+ GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_trailing_metadata_ready");
+ add_batch_error(bctl, GRPC_ERROR_REF(error), false);
+ if (error == GRPC_ERROR_NONE) {
+ grpc_metadata_batch* md =
+ &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
+ recv_trailing_filter(call, md);
+ }
+ finish_batch_step(bctl);
+}
+
static void finish_batch(void* bctlp, grpc_error* error) {
batch_control* bctl = static_cast<batch_control*>(bctlp);
grpc_call* call = bctl->call;
@@ -1558,7 +1574,8 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
size_t i;
const grpc_op* op;
batch_control* bctl;
- int num_completion_callbacks_needed = 1;
+ bool has_send_ops = false;
+ int num_recv_ops = 0;
grpc_call_error error = GRPC_CALL_OK;
grpc_transport_stream_op_batch* stream_op;
grpc_transport_stream_op_batch_payload* stream_op_payload;
@@ -1664,6 +1681,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
stream_op_payload->send_initial_metadata.peer_string =
&call->peer_string;
}
+ has_send_ops = true;
break;
}
case GRPC_OP_SEND_MESSAGE: {
@@ -1693,6 +1711,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
&op->data.send_message.send_message->data.raw.slice_buffer, flags);
stream_op_payload->send_message.send_message.reset(
call->sending_stream.get());
+ has_send_ops = true;
break;
}
case GRPC_OP_SEND_CLOSE_FROM_CLIENT: {
@@ -1713,6 +1732,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
call->sent_final_op = true;
stream_op_payload->send_trailing_metadata.send_trailing_metadata =
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
+ has_send_ops = true;
break;
}
case GRPC_OP_SEND_STATUS_FROM_SERVER: {
@@ -1777,6 +1797,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
}
stream_op_payload->send_trailing_metadata.send_trailing_metadata =
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
+ has_send_ops = true;
break;
}
case GRPC_OP_RECV_INITIAL_METADATA: {
@@ -1804,7 +1825,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
stream_op_payload->recv_initial_metadata.peer_string =
&call->peer_string;
}
- num_completion_callbacks_needed++;
+ ++num_recv_ops;
break;
}
case GRPC_OP_RECV_MESSAGE: {
@@ -1826,7 +1847,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
grpc_schedule_on_exec_ctx);
stream_op_payload->recv_message.recv_message_ready =
&call->receiving_stream_ready;
- num_completion_callbacks_needed++;
+ ++num_recv_ops;
break;
}
case GRPC_OP_RECV_STATUS_ON_CLIENT: {
@@ -1852,11 +1873,16 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
call->final_op.client.error_string =
op->data.recv_status_on_client.error_string;
stream_op->recv_trailing_metadata = true;
- stream_op->collect_stats = true;
stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
- stream_op_payload->collect_stats.collect_stats =
+ stream_op_payload->recv_trailing_metadata.collect_stats =
&call->final_info.stats.transport_stream_stats;
+ GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
+ receiving_trailing_metadata_ready, bctl,
+ grpc_schedule_on_exec_ctx);
+ stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
+ &call->receiving_trailing_metadata_ready;
+ ++num_recv_ops;
break;
}
case GRPC_OP_RECV_CLOSE_ON_SERVER: {
@@ -1877,11 +1903,16 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
call->final_op.server.cancelled =
op->data.recv_close_on_server.cancelled;
stream_op->recv_trailing_metadata = true;
- stream_op->collect_stats = true;
stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
- stream_op_payload->collect_stats.collect_stats =
+ stream_op_payload->recv_trailing_metadata.collect_stats =
&call->final_info.stats.transport_stream_stats;
+ GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
+ receiving_trailing_metadata_ready, bctl,
+ grpc_schedule_on_exec_ctx);
+ stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
+ &call->receiving_trailing_metadata_ready;
+ ++num_recv_ops;
break;
}
}
@@ -1891,13 +1922,15 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
if (!is_notify_tag_closure) {
GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
}
- gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed);
+ gpr_ref_init(&bctl->steps_to_complete, (has_send_ops ? 1 : 0) + num_recv_ops);
- GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl,
- grpc_schedule_on_exec_ctx);
- stream_op->on_complete = &bctl->finish_batch;
- gpr_atm_rel_store(&call->any_ops_sent_atm, 1);
+ if (has_send_ops) {
+ GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl,
+ grpc_schedule_on_exec_ctx);
+ stream_op->on_complete = &bctl->finish_batch;
+ }
+ gpr_atm_rel_store(&call->any_ops_sent_atm, 1);
execute_batch(call, stream_op, &bctl->start_batch);
done:
diff --git a/src/core/lib/transport/transport.cc b/src/core/lib/transport/transport.cc
index 039d603394..cbdb77c844 100644
--- a/src/core/lib/transport/transport.cc
+++ b/src/core/lib/transport/transport.cc
@@ -212,21 +212,32 @@ void grpc_transport_stream_op_batch_finish_with_failure(
if (batch->send_message) {
batch->payload->send_message.send_message.reset();
}
- if (batch->recv_message) {
- GRPC_CALL_COMBINER_START(
- call_combiner, batch->payload->recv_message.recv_message_ready,
- GRPC_ERROR_REF(error), "failing recv_message_ready");
+ if (batch->cancel_stream) {
+ GRPC_ERROR_UNREF(batch->payload->cancel_stream.cancel_error);
}
+ // Construct a list of closures to execute.
+ grpc_core::CallCombinerClosureList closures;
if (batch->recv_initial_metadata) {
- GRPC_CALL_COMBINER_START(
- call_combiner,
+ closures.Add(
batch->payload->recv_initial_metadata.recv_initial_metadata_ready,
GRPC_ERROR_REF(error), "failing recv_initial_metadata_ready");
}
- GRPC_CLOSURE_SCHED(batch->on_complete, error);
- if (batch->cancel_stream) {
- GRPC_ERROR_UNREF(batch->payload->cancel_stream.cancel_error);
+ if (batch->recv_message) {
+ closures.Add(batch->payload->recv_message.recv_message_ready,
+ GRPC_ERROR_REF(error), "failing recv_message_ready");
+ }
+ if (batch->recv_trailing_metadata) {
+ closures.Add(
+ batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
+ GRPC_ERROR_REF(error), "failing recv_trailing_metadata_ready");
+ }
+ if (batch->on_complete != nullptr) {
+ closures.Add(batch->on_complete, GRPC_ERROR_REF(error),
+ "failing on_complete");
}
+ // Execute closures.
+ closures.RunClosures(call_combiner);
+ GRPC_ERROR_UNREF(error);
}
typedef struct {
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index b2e252d939..585b9dfae9 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -122,9 +122,15 @@ typedef struct grpc_transport_stream_op_batch_payload
/* Transport stream op: a set of operations to perform on a transport
against a single stream */
typedef struct grpc_transport_stream_op_batch {
- /** Should be enqueued when all requested operations (excluding recv_message
- and recv_initial_metadata which have their own closures) in a given batch
- have been completed. */
+ /** Should be scheduled when all of the non-recv operations in the batch
+ are complete.
+
+ The recv ops (recv_initial_metadata, recv_message, and
+ recv_trailing_metadata) each have their own callbacks. If a batch
+ contains both recv ops and non-recv ops, on_complete should be
+ scheduled as soon as the non-recv ops are complete, regardless of
+ whether or not the recv ops are complete. If a batch contains
+ only recv ops, on_complete can be null. */
grpc_closure* on_complete;
/** Values for the stream op (fields set are determined by flags above) */
@@ -149,9 +155,6 @@ typedef struct grpc_transport_stream_op_batch {
*/
bool recv_trailing_metadata : 1;
- /** Collect any stats into provided buffer, zero internal stat counters */
- bool collect_stats : 1;
-
/** Cancel this stream with the provided error */
bool cancel_stream : 1;
@@ -219,11 +222,10 @@ struct grpc_transport_stream_op_batch_payload {
struct {
grpc_metadata_batch* recv_trailing_metadata;
- } recv_trailing_metadata;
-
- struct {
grpc_transport_stream_stats* collect_stats;
- } collect_stats;
+ /** Should be enqueued when initial metadata is ready to be processed. */
+ grpc_closure* recv_trailing_metadata_ready;
+ } recv_trailing_metadata;
/** Forcefully close this stream.
The HTTP2 semantics should be:
diff --git a/src/core/lib/transport/transport_op_string.cc b/src/core/lib/transport/transport_op_string.cc
index 25ab492f3a..8c7db642a5 100644
--- a/src/core/lib/transport/transport_op_string.cc
+++ b/src/core/lib/transport/transport_op_string.cc
@@ -120,13 +120,6 @@ char* grpc_transport_stream_op_batch_string(
gpr_strvec_add(&b, tmp);
}
- if (op->collect_stats) {
- gpr_strvec_add(&b, gpr_strdup(" "));
- gpr_asprintf(&tmp, "COLLECT_STATS:%p",
- op->payload->collect_stats.collect_stats);
- gpr_strvec_add(&b, tmp);
- }
-
out = gpr_strvec_flatten(&b, nullptr);
gpr_strvec_destroy(&b);
diff --git a/test/core/gprpp/inlined_vector_test.cc b/test/core/gprpp/inlined_vector_test.cc
index ae34947718..41f4338f8a 100644
--- a/test/core/gprpp/inlined_vector_test.cc
+++ b/test/core/gprpp/inlined_vector_test.cc
@@ -27,10 +27,12 @@ namespace testing {
TEST(InlinedVectorTest, CreateAndIterate) {
const int kNumElements = 9;
InlinedVector<int, 2> v;
+ EXPECT_TRUE(v.empty());
for (int i = 0; i < kNumElements; ++i) {
v.push_back(i);
}
EXPECT_EQ(static_cast<size_t>(kNumElements), v.size());
+ EXPECT_FALSE(v.empty());
for (int i = 0; i < kNumElements; ++i) {
EXPECT_EQ(i, v[i]);
EXPECT_EQ(i, &v[i] - &v[0]); // Ensure contiguous allocation.
diff --git a/test/cpp/microbenchmarks/bm_call_create.cc b/test/cpp/microbenchmarks/bm_call_create.cc
index 831b29c506..dd1610dc3d 100644
--- a/test/cpp/microbenchmarks/bm_call_create.cc
+++ b/test/cpp/microbenchmarks/bm_call_create.cc
@@ -621,18 +621,26 @@ typedef struct {
static void StartTransportStreamOp(grpc_call_element* elem,
grpc_transport_stream_op_batch* op) {
call_data* calld = static_cast<call_data*>(elem->call_data);
+ // Construct list of closures to return.
+ grpc_core::CallCombinerClosureList closures;
if (op->recv_initial_metadata) {
- GRPC_CALL_COMBINER_START(
- calld->call_combiner,
- op->payload->recv_initial_metadata.recv_initial_metadata_ready,
- GRPC_ERROR_NONE, "recv_initial_metadata");
+ closures.Add(op->payload->recv_initial_metadata.recv_initial_metadata_ready,
+ GRPC_ERROR_NONE, "recv_initial_metadata");
}
if (op->recv_message) {
- GRPC_CALL_COMBINER_START(calld->call_combiner,
- op->payload->recv_message.recv_message_ready,
- GRPC_ERROR_NONE, "recv_message");
+ closures.Add(op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE,
+ "recv_message");
}
- GRPC_CLOSURE_SCHED(op->on_complete, GRPC_ERROR_NONE);
+ if (op->recv_trailing_metadata) {
+ closures.Add(
+ op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
+ GRPC_ERROR_NONE, "recv_trailing_metadata");
+ }
+ if (op->on_complete != nullptr) {
+ closures.Add(op->on_complete, GRPC_ERROR_NONE, "on_complete");
+ }
+ // Execute closures.
+ closures.RunClosures(calld->call_combiner);
}
static void StartTransportOp(grpc_channel_element* elem,