aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc891
-rw-r--r--src/core/ext/filters/deadline/deadline_filter.cc28
-rw-r--r--src/core/ext/filters/deadline/deadline_filter.h10
-rw-r--r--src/core/ext/filters/http/client/http_client_filter.cc19
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.cc35
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.cc19
-rw-r--r--src/core/ext/transport/inproc/inproc_transport.cc52
-rw-r--r--src/core/lib/channel/connected_channel.cc9
-rw-r--r--src/core/lib/gprpp/inlined_vector.h2
-rw-r--r--src/core/lib/iomgr/call_combiner.h80
-rw-r--r--src/core/lib/iomgr/closure.h5
-rw-r--r--src/core/lib/surface/call.cc81
-rw-r--r--src/core/lib/transport/transport.cc29
-rw-r--r--src/core/lib/transport/transport.h22
-rw-r--r--src/core/lib/transport/transport_op_string.cc7
-rw-r--r--test/core/gprpp/inlined_vector_test.cc2
-rw-r--r--test/cpp/microbenchmarks/bm_call_create.cc24
17 files changed, 606 insertions, 709 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index 34ea97e23e..ea6775a8d8 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -817,7 +817,6 @@ typedef struct {
// For intercepting recv_trailing_metadata.
grpc_metadata_batch recv_trailing_metadata;
grpc_transport_stream_stats collect_stats;
- grpc_closure recv_trailing_metadata_ready;
// For intercepting on_complete.
grpc_closure on_complete;
} subchannel_batch_data;
@@ -1193,24 +1192,35 @@ static void pending_batches_fail(grpc_call_element* elem, grpc_error* error,
"chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
elem->channel_data, calld, num_batches, grpc_error_string(error));
}
- grpc_core::CallCombinerClosureList closures;
+ grpc_transport_stream_op_batch*
+ batches[GPR_ARRAY_SIZE(calld->pending_batches)];
+ size_t num_batches = 0;
for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
pending_batch* pending = &calld->pending_batches[i];
grpc_transport_stream_op_batch* batch = pending->batch;
if (batch != nullptr) {
- batch->handler_private.extra_arg = calld;
- GRPC_CLOSURE_INIT(&batch->handler_private.closure,
- fail_pending_batch_in_call_combiner, batch,
- grpc_schedule_on_exec_ctx);
- closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error),
- "pending_batches_fail");
+ batches[num_batches++] = batch;
pending_batch_clear(calld, pending);
}
}
+ for (size_t i = yield_call_combiner ? 1 : 0; i < num_batches; ++i) {
+ grpc_transport_stream_op_batch* batch = batches[i];
+ batch->handler_private.extra_arg = calld;
+ GRPC_CLOSURE_INIT(&batch->handler_private.closure,
+ fail_pending_batch_in_call_combiner, batch,
+ grpc_schedule_on_exec_ctx);
+ GRPC_CALL_COMBINER_START(calld->call_combiner,
+ &batch->handler_private.closure,
+ GRPC_ERROR_REF(error), "pending_batches_fail");
+ }
if (yield_call_combiner) {
- closures.RunClosures(calld->call_combiner);
- } else {
- closures.RunClosuresWithoutYielding(calld->call_combiner);
+ if (num_batches > 0) {
+ // Note: This will release the call combiner.
+ grpc_transport_stream_op_batch_finish_with_failure(
+ batches[0], GRPC_ERROR_REF(error), calld->call_combiner);
+ } else {
+ GRPC_CALL_COMBINER_STOP(calld->call_combiner, "pending_batches_fail");
+ }
}
GRPC_ERROR_UNREF(error);
}
@@ -1245,22 +1255,30 @@ static void pending_batches_resume(grpc_call_element* elem) {
" pending batches on subchannel_call=%p",
chand, calld, num_batches, calld->subchannel_call);
}
- grpc_core::CallCombinerClosureList closures;
+ grpc_transport_stream_op_batch*
+ batches[GPR_ARRAY_SIZE(calld->pending_batches)];
+ size_t num_batches = 0;
for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
pending_batch* pending = &calld->pending_batches[i];
grpc_transport_stream_op_batch* batch = pending->batch;
if (batch != nullptr) {
- batch->handler_private.extra_arg = calld->subchannel_call;
- GRPC_CLOSURE_INIT(&batch->handler_private.closure,
- resume_pending_batch_in_call_combiner, batch,
- grpc_schedule_on_exec_ctx);
- closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
- "pending_batches_resume");
+ batches[num_batches++] = batch;
pending_batch_clear(calld, pending);
}
}
+ for (size_t i = 1; i < num_batches; ++i) {
+ grpc_transport_stream_op_batch* batch = batches[i];
+ batch->handler_private.extra_arg = calld->subchannel_call;
+ GRPC_CLOSURE_INIT(&batch->handler_private.closure,
+ resume_pending_batch_in_call_combiner, batch,
+ grpc_schedule_on_exec_ctx);
+ GRPC_CALL_COMBINER_START(calld->call_combiner,
+ &batch->handler_private.closure, GRPC_ERROR_NONE,
+ "pending_batches_resume");
+ }
+ GPR_ASSERT(num_batches > 0);
// Note: This will release the call combiner.
- closures.RunClosures(calld->call_combiner);
+ grpc_subchannel_call_process_op(calld->subchannel_call, batches[0]);
}
static void maybe_clear_pending_batch(grpc_call_element* elem,
@@ -1275,10 +1293,7 @@ static void maybe_clear_pending_batch(grpc_call_element* elem,
batch->payload->recv_initial_metadata.recv_initial_metadata_ready ==
nullptr) &&
(!batch->recv_message ||
- batch->payload->recv_message.recv_message_ready == nullptr) &&
- (!batch->recv_trailing_metadata ||
- batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready ==
- nullptr)) {
+ batch->payload->recv_message.recv_message_ready == nullptr)) {
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand,
calld);
@@ -1287,27 +1302,75 @@ static void maybe_clear_pending_batch(grpc_call_element* elem,
}
}
-// Returns a pointer to the first pending batch for which predicate(batch)
-// returns true, or null if not found.
-template <typename Predicate>
-static pending_batch* pending_batch_find(grpc_call_element* elem,
- const char* log_message,
- Predicate predicate) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
- pending_batch* pending = &calld->pending_batches[i];
- grpc_transport_stream_op_batch* batch = pending->batch;
- if (batch != nullptr && predicate(batch)) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO,
- "chand=%p calld=%p: %s pending batch at index %" PRIuPTR, chand,
- calld, log_message, i);
- }
- return pending;
- }
+// Returns true if all ops in the pending batch have been completed.
+static bool pending_batch_is_completed(
+ pending_batch* pending, call_data* calld,
+ subchannel_call_retry_state* retry_state) {
+ if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
+ return false;
+ }
+ if (pending->batch->send_initial_metadata &&
+ !retry_state->completed_send_initial_metadata) {
+ return false;
+ }
+ if (pending->batch->send_message &&
+ retry_state->completed_send_message_count <
+ calld->send_messages->size()) {
+ return false;
+ }
+ if (pending->batch->send_trailing_metadata &&
+ !retry_state->completed_send_trailing_metadata) {
+ return false;
+ }
+ if (pending->batch->recv_initial_metadata &&
+ !retry_state->completed_recv_initial_metadata) {
+ return false;
+ }
+ if (pending->batch->recv_message &&
+ retry_state->completed_recv_message_count <
+ retry_state->started_recv_message_count) {
+ return false;
+ }
+ if (pending->batch->recv_trailing_metadata &&
+ !retry_state->completed_recv_trailing_metadata) {
+ return false;
}
- return nullptr;
+ return true;
+}
+
+// Returns true if any op in the batch was not yet started.
+static bool pending_batch_is_unstarted(
+ pending_batch* pending, call_data* calld,
+ subchannel_call_retry_state* retry_state) {
+ if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
+ return false;
+ }
+ if (pending->batch->send_initial_metadata &&
+ !retry_state->started_send_initial_metadata) {
+ return true;
+ }
+ if (pending->batch->send_message &&
+ retry_state->started_send_message_count < calld->send_messages->size()) {
+ return true;
+ }
+ if (pending->batch->send_trailing_metadata &&
+ !retry_state->started_send_trailing_metadata) {
+ return true;
+ }
+ if (pending->batch->recv_initial_metadata &&
+ !retry_state->started_recv_initial_metadata) {
+ return true;
+ }
+ if (pending->batch->recv_message &&
+ retry_state->completed_recv_message_count ==
+ retry_state->started_recv_message_count) {
+ return true;
+ }
+ if (pending->batch->recv_trailing_metadata &&
+ !retry_state->started_recv_trailing_metadata) {
+ return true;
+ }
+ return false;
}
//
@@ -1494,13 +1557,8 @@ static bool maybe_retry(grpc_call_element* elem,
// subchannel_batch_data
//
-// Creates a subchannel_batch_data object on the call's arena with the
-// specified refcount. If set_on_complete is true, the batch's
-// on_complete callback will be set to point to on_complete();
-// otherwise, the batch's on_complete callback will be null.
static subchannel_batch_data* batch_data_create(grpc_call_element* elem,
- int refcount,
- bool set_on_complete) {
+ int refcount) {
call_data* calld = static_cast<call_data*>(elem->call_data);
subchannel_call_retry_state* retry_state =
static_cast<subchannel_call_retry_state*>(
@@ -1513,11 +1571,9 @@ static subchannel_batch_data* batch_data_create(grpc_call_element* elem,
GRPC_SUBCHANNEL_CALL_REF(calld->subchannel_call, "batch_data_create");
batch_data->batch.payload = &retry_state->batch_payload;
gpr_ref_init(&batch_data->refs, refcount);
- if (set_on_complete) {
- GRPC_CLOSURE_INIT(&batch_data->on_complete, on_complete, batch_data,
- grpc_schedule_on_exec_ctx);
- batch_data->batch.on_complete = &batch_data->on_complete;
- }
+ GRPC_CLOSURE_INIT(&batch_data->on_complete, on_complete, batch_data,
+ grpc_schedule_on_exec_ctx);
+ batch_data->batch.on_complete = &batch_data->on_complete;
GRPC_CALL_STACK_REF(calld->owning_call, "batch_data");
return batch_data;
}
@@ -1550,14 +1606,26 @@ static void batch_data_unref(subchannel_batch_data* batch_data) {
static void invoke_recv_initial_metadata_callback(void* arg,
grpc_error* error) {
subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
+ channel_data* chand =
+ static_cast<channel_data*>(batch_data->elem->channel_data);
+ call_data* calld = static_cast<call_data*>(batch_data->elem->call_data);
// Find pending batch.
- pending_batch* pending = pending_batch_find(
- batch_data->elem, "invoking recv_initial_metadata_ready for",
- [](grpc_transport_stream_op_batch* batch) {
- return batch->recv_initial_metadata &&
- batch->payload->recv_initial_metadata
- .recv_initial_metadata_ready != nullptr;
- });
+ pending_batch* pending = nullptr;
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
+ grpc_transport_stream_op_batch* batch = calld->pending_batches[i].batch;
+ if (batch != nullptr && batch->recv_initial_metadata &&
+ batch->payload->recv_initial_metadata.recv_initial_metadata_ready !=
+ nullptr) {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "chand=%p calld=%p: invoking recv_initial_metadata_ready for "
+ "pending batch at index %" PRIuPTR,
+ chand, calld, i);
+ }
+ pending = &calld->pending_batches[i];
+ break;
+ }
+ }
GPR_ASSERT(pending != nullptr);
// Return metadata.
grpc_metadata_batch_move(
@@ -1593,19 +1661,10 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
static_cast<subchannel_call_retry_state*>(
grpc_connected_subchannel_call_get_parent_data(
batch_data->subchannel_call));
- retry_state->completed_recv_initial_metadata = true;
- // If a retry was already dispatched, then we're not going to use the
- // result of this recv_initial_metadata op, so do nothing.
- if (retry_state->retry_dispatched) {
- GRPC_CALL_COMBINER_STOP(
- calld->call_combiner,
- "recv_initial_metadata_ready after retry dispatched");
- return;
- }
// If we got an error or a Trailers-Only response and have not yet gotten
- // the recv_trailing_metadata_ready callback, then defer propagating this
- // callback back to the surface. We can evaluate whether to retry when
- // recv_trailing_metadata comes back.
+ // the recv_trailing_metadata on_complete callback, then defer
+ // propagating this callback back to the surface. We can evaluate whether
+ // to retry when recv_trailing_metadata comes back.
if (GPR_UNLIKELY((batch_data->trailing_metadata_available ||
error != GRPC_ERROR_NONE) &&
!retry_state->completed_recv_trailing_metadata)) {
@@ -1630,9 +1689,9 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
}
// Received valid initial metadata, so commit the call.
retry_commit(elem, retry_state);
- // Invoke the callback to return the result to the surface.
// Manually invoking a callback function; it does not take ownership of error.
invoke_recv_initial_metadata_callback(batch_data, error);
+ GRPC_ERROR_UNREF(error);
}
//
@@ -1642,13 +1701,25 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
// Invokes recv_message_ready for a subchannel batch.
static void invoke_recv_message_callback(void* arg, grpc_error* error) {
subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
+ channel_data* chand =
+ static_cast<channel_data*>(batch_data->elem->channel_data);
+ call_data* calld = static_cast<call_data*>(batch_data->elem->call_data);
// Find pending op.
- pending_batch* pending = pending_batch_find(
- batch_data->elem, "invoking recv_message_ready for",
- [](grpc_transport_stream_op_batch* batch) {
- return batch->recv_message &&
- batch->payload->recv_message.recv_message_ready != nullptr;
- });
+ pending_batch* pending = nullptr;
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
+ grpc_transport_stream_op_batch* batch = calld->pending_batches[i].batch;
+ if (batch != nullptr && batch->recv_message &&
+ batch->payload->recv_message.recv_message_ready != nullptr) {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "chand=%p calld=%p: invoking recv_message_ready for "
+ "pending batch at index %" PRIuPTR,
+ chand, calld, i);
+ }
+ pending = &calld->pending_batches[i];
+ break;
+ }
+ }
GPR_ASSERT(pending != nullptr);
// Return payload.
*pending->batch->payload->recv_message.recv_message =
@@ -1680,18 +1751,10 @@ static void recv_message_ready(void* arg, grpc_error* error) {
static_cast<subchannel_call_retry_state*>(
grpc_connected_subchannel_call_get_parent_data(
batch_data->subchannel_call));
- ++retry_state->completed_recv_message_count;
- // If a retry was already dispatched, then we're not going to use the
- // result of this recv_message op, so do nothing.
- if (retry_state->retry_dispatched) {
- GRPC_CALL_COMBINER_STOP(calld->call_combiner,
- "recv_message_ready after retry dispatched");
- return;
- }
// If we got an error or the payload was nullptr and we have not yet gotten
- // the recv_trailing_metadata_ready callback, then defer propagating this
- // callback back to the surface. We can evaluate whether to retry when
- // recv_trailing_metadata comes back.
+ // the recv_trailing_metadata on_complete callback, then defer
+ // propagating this callback back to the surface. We can evaluate whether
+ // to retry when recv_trailing_metadata comes back.
if (GPR_UNLIKELY(
(batch_data->recv_message == nullptr || error != GRPC_ERROR_NONE) &&
!retry_state->completed_recv_trailing_metadata)) {
@@ -1714,268 +1777,133 @@ static void recv_message_ready(void* arg, grpc_error* error) {
}
// Received a valid message, so commit the call.
retry_commit(elem, retry_state);
- // Invoke the callback to return the result to the surface.
// Manually invoking a callback function; it does not take ownership of error.
invoke_recv_message_callback(batch_data, error);
+ GRPC_ERROR_UNREF(error);
}
//
-// recv_trailing_metadata handling
+// list of closures to execute in call combiner
//
-// Sets *status and *server_pushback_md based on batch_data and error.
-static void get_call_status(subchannel_batch_data* batch_data,
- grpc_error* error, grpc_status_code* status,
- grpc_mdelem** server_pushback_md) {
- grpc_call_element* elem = batch_data->elem;
+// Represents a closure that needs to run in the call combiner as part of
+// starting or completing a batch.
+typedef struct {
+ grpc_closure* closure;
+ grpc_error* error;
+ const char* reason;
+ bool free_reason = false;
+} closure_to_execute;
+
+static void execute_closures_in_call_combiner(grpc_call_element* elem,
+ const char* caller,
+ closure_to_execute* closures,
+ size_t num_closures) {
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
- if (error != GRPC_ERROR_NONE) {
- grpc_error_get_status(error, calld->deadline, status, nullptr, nullptr,
- nullptr);
+ // Note that the call combiner will be yielded for each closure that
+ // we schedule. We're already running in the call combiner, so one of
+ // the closures can be scheduled directly, but the others will
+ // have to re-enter the call combiner.
+ if (num_closures > 0) {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO, "chand=%p calld=%p: %s starting closure: %s", chand,
+ calld, caller, closures[0].reason);
+ }
+ GRPC_CLOSURE_SCHED(closures[0].closure, closures[0].error);
+ if (closures[0].free_reason) {
+ gpr_free(const_cast<char*>(closures[0].reason));
+ }
+ for (size_t i = 1; i < num_closures; ++i) {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "chand=%p calld=%p: %s starting closure in call combiner: %s",
+ chand, calld, caller, closures[i].reason);
+ }
+ GRPC_CALL_COMBINER_START(calld->call_combiner, closures[i].closure,
+ closures[i].error, closures[i].reason);
+ if (closures[i].free_reason) {
+ gpr_free(const_cast<char*>(closures[i].reason));
+ }
+ }
} else {
- grpc_metadata_batch* md_batch =
- batch_data->batch.payload->recv_trailing_metadata
- .recv_trailing_metadata;
- GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
- *status =
- grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md);
- if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
- *server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO, "chand=%p calld=%p: no closures to run for %s", chand,
+ calld, caller);
}
+ GRPC_CALL_COMBINER_STOP(calld->call_combiner, "no closures to run");
}
- GRPC_ERROR_UNREF(error);
}
-// Adds recv_trailing_metadata_ready closure to closures.
-static void add_closure_for_recv_trailing_metadata_ready(
- grpc_call_element* elem, subchannel_batch_data* batch_data,
- grpc_error* error, grpc_core::CallCombinerClosureList* closures) {
- // Find pending batch.
- pending_batch* pending = pending_batch_find(
- elem, "invoking recv_trailing_metadata for",
- [](grpc_transport_stream_op_batch* batch) {
- return batch->recv_trailing_metadata &&
- batch->payload->recv_trailing_metadata
- .recv_trailing_metadata_ready != nullptr;
- });
- // If we generated the recv_trailing_metadata op internally via
- // start_internal_recv_trailing_metadata(), then there will be no
- // pending batch.
- if (pending == nullptr) {
- GRPC_ERROR_UNREF(error);
- return;
+//
+// on_complete callback handling
+//
+
+// Updates retry_state to reflect the ops completed in batch_data.
+static void update_retry_state_for_completed_batch(
+ subchannel_batch_data* batch_data,
+ subchannel_call_retry_state* retry_state) {
+ if (batch_data->batch.send_initial_metadata) {
+ retry_state->completed_send_initial_metadata = true;
+ }
+ if (batch_data->batch.send_message) {
+ ++retry_state->completed_send_message_count;
+ }
+ if (batch_data->batch.send_trailing_metadata) {
+ retry_state->completed_send_trailing_metadata = true;
+ }
+ if (batch_data->batch.recv_initial_metadata) {
+ retry_state->completed_recv_initial_metadata = true;
+ }
+ if (batch_data->batch.recv_message) {
+ ++retry_state->completed_recv_message_count;
+ }
+ if (batch_data->batch.recv_trailing_metadata) {
+ retry_state->completed_recv_trailing_metadata = true;
}
- // Return metadata.
- grpc_metadata_batch_move(
- &batch_data->recv_trailing_metadata,
- pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata);
- // Add closure.
- closures->Add(pending->batch->payload->recv_trailing_metadata
- .recv_trailing_metadata_ready,
- error, "recv_trailing_metadata_ready for pending batch");
- // Update bookkeeping.
- pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
- nullptr;
- maybe_clear_pending_batch(elem, pending);
}
// Adds any necessary closures for deferred recv_initial_metadata and
-// recv_message callbacks to closures.
+// recv_message callbacks to closures, updating *num_closures as needed.
static void add_closures_for_deferred_recv_callbacks(
subchannel_batch_data* batch_data, subchannel_call_retry_state* retry_state,
- grpc_core::CallCombinerClosureList* closures) {
+ closure_to_execute* closures, size_t* num_closures) {
if (batch_data->batch.recv_trailing_metadata) {
// Add closure for deferred recv_initial_metadata_ready.
if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch !=
nullptr)) {
- GRPC_CLOSURE_INIT(&batch_data->recv_initial_metadata_ready,
- invoke_recv_initial_metadata_callback,
- retry_state->recv_initial_metadata_ready_deferred_batch,
- grpc_schedule_on_exec_ctx);
- closures->Add(&batch_data->recv_initial_metadata_ready,
- retry_state->recv_initial_metadata_error,
- "resuming recv_initial_metadata_ready");
+ closure_to_execute* closure = &closures[(*num_closures)++];
+ closure->closure = GRPC_CLOSURE_INIT(
+ &batch_data->recv_initial_metadata_ready,
+ invoke_recv_initial_metadata_callback,
+ retry_state->recv_initial_metadata_ready_deferred_batch,
+ grpc_schedule_on_exec_ctx);
+ closure->error = retry_state->recv_initial_metadata_error;
+ closure->reason = "resuming recv_initial_metadata_ready";
retry_state->recv_initial_metadata_ready_deferred_batch = nullptr;
}
// Add closure for deferred recv_message_ready.
if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch !=
nullptr)) {
- GRPC_CLOSURE_INIT(&batch_data->recv_message_ready,
- invoke_recv_message_callback,
- retry_state->recv_message_ready_deferred_batch,
- grpc_schedule_on_exec_ctx);
- closures->Add(&batch_data->recv_message_ready,
- retry_state->recv_message_error,
- "resuming recv_message_ready");
+ closure_to_execute* closure = &closures[(*num_closures)++];
+ closure->closure = GRPC_CLOSURE_INIT(
+ &batch_data->recv_message_ready, invoke_recv_message_callback,
+ retry_state->recv_message_ready_deferred_batch,
+ grpc_schedule_on_exec_ctx);
+ closure->error = retry_state->recv_message_error;
+ closure->reason = "resuming recv_message_ready";
retry_state->recv_message_ready_deferred_batch = nullptr;
}
}
}
-// Returns true if any op in the batch was not yet started.
-// Only looks at send ops, since recv ops are always started immediately.
-static bool pending_batch_is_unstarted(
- pending_batch* pending, call_data* calld,
- subchannel_call_retry_state* retry_state) {
- if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
- return false;
- }
- if (pending->batch->send_initial_metadata &&
- !retry_state->started_send_initial_metadata) {
- return true;
- }
- if (pending->batch->send_message &&
- retry_state->started_send_message_count < calld->send_messages->size()) {
- return true;
- }
- if (pending->batch->send_trailing_metadata &&
- !retry_state->started_send_trailing_metadata) {
- return true;
- }
- return false;
-}
-
-// For any pending batch containing an op that has not yet been started,
-// adds the pending batch's completion closures to closures.
-static void add_closures_to_fail_unstarted_pending_batches(
- grpc_call_element* elem, subchannel_call_retry_state* retry_state,
- grpc_error* error, grpc_core::CallCombinerClosureList* closures) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
- pending_batch* pending = &calld->pending_batches[i];
- if (pending_batch_is_unstarted(pending, calld, retry_state)) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO,
- "chand=%p calld=%p: failing unstarted pending batch at index "
- "%" PRIuPTR,
- chand, calld, i);
- }
- closures->Add(pending->batch->on_complete, GRPC_ERROR_REF(error),
- "failing on_complete for pending batch");
- pending->batch->on_complete = nullptr;
- maybe_clear_pending_batch(elem, pending);
- }
- }
- GRPC_ERROR_UNREF(error);
-}
-
-// Runs necessary closures upon completion of a call attempt.
-static void run_closures_for_completed_call(subchannel_batch_data* batch_data,
- grpc_error* error) {
- grpc_call_element* elem = batch_data->elem;
- call_data* calld = static_cast<call_data*>(elem->call_data);
- subchannel_call_retry_state* retry_state =
- static_cast<subchannel_call_retry_state*>(
- grpc_connected_subchannel_call_get_parent_data(
- batch_data->subchannel_call));
- // Construct list of closures to execute.
- grpc_core::CallCombinerClosureList closures;
- // First, add closure for recv_trailing_metadata_ready.
- add_closure_for_recv_trailing_metadata_ready(
- elem, batch_data, GRPC_ERROR_REF(error), &closures);
- // If there are deferred recv_initial_metadata_ready or recv_message_ready
- // callbacks, add them to closures.
- add_closures_for_deferred_recv_callbacks(batch_data, retry_state, &closures);
- // Add closures to fail any pending batches that have not yet been started.
- add_closures_to_fail_unstarted_pending_batches(
- elem, retry_state, GRPC_ERROR_REF(error), &closures);
- // Don't need batch_data anymore.
- batch_data_unref(batch_data);
- // Schedule all of the closures identified above.
- // Note: This will release the call combiner.
- closures.RunClosures(calld->call_combiner);
- GRPC_ERROR_UNREF(error);
-}
-
-// Intercepts recv_trailing_metadata_ready callback for retries.
-// Commits the call and returns the trailing metadata up the stack.
-static void recv_trailing_metadata_ready(void* arg, grpc_error* error) {
- subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
- grpc_call_element* elem = batch_data->elem;
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO,
- "chand=%p calld=%p: got recv_trailing_metadata_ready, error=%s",
- chand, calld, grpc_error_string(error));
- }
- subchannel_call_retry_state* retry_state =
- static_cast<subchannel_call_retry_state*>(
- grpc_connected_subchannel_call_get_parent_data(
- batch_data->subchannel_call));
- retry_state->completed_recv_trailing_metadata = true;
- // Get the call's status and check for server pushback metadata.
- grpc_status_code status = GRPC_STATUS_OK;
- grpc_mdelem* server_pushback_md = nullptr;
- get_call_status(batch_data, GRPC_ERROR_REF(error), &status,
- &server_pushback_md);
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand,
- calld, grpc_status_code_to_string(status));
- }
- // Check if we should retry.
- if (maybe_retry(elem, batch_data, status, server_pushback_md)) {
- // Unref batch_data for deferred recv_initial_metadata_ready or
- // recv_message_ready callbacks, if any.
- if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) {
- batch_data_unref(batch_data);
- GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
- }
- if (retry_state->recv_message_ready_deferred_batch != nullptr) {
- batch_data_unref(batch_data);
- GRPC_ERROR_UNREF(retry_state->recv_message_error);
- }
- batch_data_unref(batch_data);
- return;
- }
- // Not retrying, so commit the call.
- retry_commit(elem, retry_state);
- // Run any necessary closures.
- run_closures_for_completed_call(batch_data, GRPC_ERROR_REF(error));
-}
-
-//
-// on_complete callback handling
-//
-
-// Adds the on_complete closure for the pending batch completed in
-// batch_data to closures.
-static void add_closure_for_completed_pending_batch(
- grpc_call_element* elem, subchannel_batch_data* batch_data,
- subchannel_call_retry_state* retry_state, grpc_error* error,
- grpc_core::CallCombinerClosureList* closures) {
- pending_batch* pending = pending_batch_find(
- elem, "completed", [batch_data](grpc_transport_stream_op_batch* batch) {
- // Match the pending batch with the same set of send ops as the
- // subchannel batch we've just completed.
- return batch->on_complete != nullptr &&
- batch_data->batch.send_initial_metadata ==
- batch->send_initial_metadata &&
- batch_data->batch.send_message == batch->send_message &&
- batch_data->batch.send_trailing_metadata ==
- batch->send_trailing_metadata;
- });
- // If batch_data is a replay batch, then there will be no pending
- // batch to complete.
- if (pending == nullptr) {
- GRPC_ERROR_UNREF(error);
- return;
- }
- // Add closure.
- closures->Add(pending->batch->on_complete, error,
- "on_complete for pending batch");
- pending->batch->on_complete = nullptr;
- maybe_clear_pending_batch(elem, pending);
-}
-
// If there are any cached ops to replay or pending ops to start on the
// subchannel call, adds a closure to closures to invoke
-// start_retriable_subchannel_batches().
+// start_retriable_subchannel_batches(), updating *num_closures as needed.
static void add_closures_for_replay_or_pending_send_ops(
grpc_call_element* elem, subchannel_batch_data* batch_data,
- subchannel_call_retry_state* retry_state,
- grpc_core::CallCombinerClosureList* closures) {
+ subchannel_call_retry_state* retry_state, closure_to_execute* closures,
+ size_t* num_closures) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
bool have_pending_send_message_ops =
@@ -2001,12 +1929,93 @@ static void add_closures_for_replay_or_pending_send_ops(
"chand=%p calld=%p: starting next batch for pending send op(s)",
chand, calld);
}
- GRPC_CLOSURE_INIT(&batch_data->batch.handler_private.closure,
- start_retriable_subchannel_batches, elem,
- grpc_schedule_on_exec_ctx);
- closures->Add(&batch_data->batch.handler_private.closure, GRPC_ERROR_NONE,
- "starting next batch for send_* op(s)");
+ closure_to_execute* closure = &closures[(*num_closures)++];
+ closure->closure = GRPC_CLOSURE_INIT(
+ &batch_data->batch.handler_private.closure,
+ start_retriable_subchannel_batches, elem, grpc_schedule_on_exec_ctx);
+ closure->error = GRPC_ERROR_NONE;
+ closure->reason = "starting next batch for send_* op(s)";
+ }
+}
+
+// For any pending batch completed in batch_data, adds the necessary
+// completion closures to closures, updating *num_closures as needed.
+static void add_closures_for_completed_pending_batches(
+ grpc_call_element* elem, subchannel_batch_data* batch_data,
+ subchannel_call_retry_state* retry_state, grpc_error* error,
+ closure_to_execute* closures, size_t* num_closures) {
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
+ pending_batch* pending = &calld->pending_batches[i];
+ if (pending_batch_is_completed(pending, calld, retry_state)) {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "chand=%p calld=%p: pending batch completed at index %" PRIuPTR,
+ chand, calld, i);
+ }
+ // Copy the trailing metadata to return it to the surface.
+ if (batch_data->batch.recv_trailing_metadata) {
+ grpc_metadata_batch_move(&batch_data->recv_trailing_metadata,
+ pending->batch->payload->recv_trailing_metadata
+ .recv_trailing_metadata);
+ }
+ closure_to_execute* closure = &closures[(*num_closures)++];
+ closure->closure = pending->batch->on_complete;
+ closure->error = GRPC_ERROR_REF(error);
+ closure->reason = "on_complete for pending batch";
+ pending->batch->on_complete = nullptr;
+ maybe_clear_pending_batch(elem, pending);
+ }
}
+ GRPC_ERROR_UNREF(error);
+}
+
+// For any pending batch containing an op that has not yet been started,
+// adds the pending batch's completion closures to closures, updating
+// *num_closures as needed.
+static void add_closures_to_fail_unstarted_pending_batches(
+ grpc_call_element* elem, subchannel_call_retry_state* retry_state,
+ grpc_error* error, closure_to_execute* closures, size_t* num_closures) {
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
+ pending_batch* pending = &calld->pending_batches[i];
+ if (pending_batch_is_unstarted(pending, calld, retry_state)) {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "chand=%p calld=%p: failing unstarted pending batch at index "
+ "%" PRIuPTR,
+ chand, calld, i);
+ }
+ if (pending->batch->recv_initial_metadata) {
+ closure_to_execute* closure = &closures[(*num_closures)++];
+ closure->closure = pending->batch->payload->recv_initial_metadata
+ .recv_initial_metadata_ready;
+ closure->error = GRPC_ERROR_REF(error);
+ closure->reason =
+ "failing recv_initial_metadata_ready for pending batch";
+ pending->batch->payload->recv_initial_metadata
+ .recv_initial_metadata_ready = nullptr;
+ }
+ if (pending->batch->recv_message) {
+ *pending->batch->payload->recv_message.recv_message = nullptr;
+ closure_to_execute* closure = &closures[(*num_closures)++];
+ closure->closure =
+ pending->batch->payload->recv_message.recv_message_ready;
+ closure->error = GRPC_ERROR_REF(error);
+ closure->reason = "failing recv_message_ready for pending batch";
+ pending->batch->payload->recv_message.recv_message_ready = nullptr;
+ }
+ closure_to_execute* closure = &closures[(*num_closures)++];
+ closure->closure = pending->batch->on_complete;
+ closure->error = GRPC_ERROR_REF(error);
+ closure->reason = "failing on_complete for pending batch";
+ pending->batch->on_complete = nullptr;
+ maybe_clear_pending_batch(elem, pending);
+ }
+ }
+ GRPC_ERROR_UNREF(error);
}
// Callback used to intercept on_complete from subchannel calls.
@@ -2026,49 +2035,136 @@ static void on_complete(void* arg, grpc_error* error) {
static_cast<subchannel_call_retry_state*>(
grpc_connected_subchannel_call_get_parent_data(
batch_data->subchannel_call));
+ // If we have previously completed recv_trailing_metadata, then the
+ // call is finished.
+ bool call_finished = retry_state->completed_recv_trailing_metadata;
+ // Record whether we were already committed before receiving this callback.
+ const bool previously_committed = calld->retry_committed;
// Update bookkeeping in retry_state.
- if (batch_data->batch.send_initial_metadata) {
- retry_state->completed_send_initial_metadata = true;
- }
- if (batch_data->batch.send_message) {
- ++retry_state->completed_send_message_count;
- }
- if (batch_data->batch.send_trailing_metadata) {
- retry_state->completed_send_trailing_metadata = true;
+ update_retry_state_for_completed_batch(batch_data, retry_state);
+ if (call_finished) {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO, "chand=%p calld=%p: call already finished", chand,
+ calld);
+ }
+ } else {
+ // Check if this batch finished the call, and if so, get its status.
+ // The call is finished if either (a) this callback was invoked with
+ // an error or (b) we receive status.
+ grpc_status_code status = GRPC_STATUS_OK;
+ grpc_mdelem* server_pushback_md = nullptr;
+ if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { // Case (a).
+ call_finished = true;
+ grpc_error_get_status(error, calld->deadline, &status, nullptr, nullptr,
+ nullptr);
+ } else if (batch_data->batch.recv_trailing_metadata) { // Case (b).
+ call_finished = true;
+ grpc_metadata_batch* md_batch =
+ batch_data->batch.payload->recv_trailing_metadata
+ .recv_trailing_metadata;
+ GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
+ status = grpc_get_status_code_from_metadata(
+ md_batch->idx.named.grpc_status->md);
+ if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
+ server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
+ }
+ }
+ // If the call just finished, check if we should retry.
+ if (call_finished) {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand,
+ calld, grpc_status_code_to_string(status));
+ }
+ if (maybe_retry(elem, batch_data, status, server_pushback_md)) {
+ // Unref batch_data for deferred recv_initial_metadata_ready or
+ // recv_message_ready callbacks, if any.
+ if (batch_data->batch.recv_trailing_metadata &&
+ retry_state->recv_initial_metadata_ready_deferred_batch !=
+ nullptr) {
+ batch_data_unref(batch_data);
+ GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
+ }
+ if (batch_data->batch.recv_trailing_metadata &&
+ retry_state->recv_message_ready_deferred_batch != nullptr) {
+ batch_data_unref(batch_data);
+ GRPC_ERROR_UNREF(retry_state->recv_message_error);
+ }
+ // Track number of pending subchannel send batches and determine if
+ // this was the last one.
+ bool last_callback_complete = false;
+ if (batch_data->batch.send_initial_metadata ||
+ batch_data->batch.send_message ||
+ batch_data->batch.send_trailing_metadata) {
+ --calld->num_pending_retriable_subchannel_send_batches;
+ last_callback_complete =
+ calld->num_pending_retriable_subchannel_send_batches == 0;
+ }
+ batch_data_unref(batch_data);
+ // If we just completed the last subchannel send batch, unref the
+ // call stack.
+ if (last_callback_complete) {
+ GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches");
+ }
+ return;
+ }
+ // Not retrying, so commit the call.
+ retry_commit(elem, retry_state);
+ }
}
- // If the call is committed, free cached data for send ops that we've just
- // completed.
- if (calld->retry_committed) {
+ // If we were already committed before receiving this callback, free
+ // cached data for send ops that we've just completed. (If the call has
+ // just now finished, the call to retry_commit() above will have freed all
+ // cached send ops, so we don't need to do it here.)
+ if (previously_committed) {
free_cached_send_op_data_for_completed_batch(elem, batch_data, retry_state);
}
+ // Call not being retried.
// Construct list of closures to execute.
- grpc_core::CallCombinerClosureList closures;
- // If a retry was already dispatched, that means we saw
- // recv_trailing_metadata before this, so we do nothing here.
- // Otherwise, invoke the callback to return the result to the surface.
- if (!retry_state->retry_dispatched) {
- // Add closure for the completed pending batch, if any.
- add_closure_for_completed_pending_batch(elem, batch_data, retry_state,
- GRPC_ERROR_REF(error), &closures);
- // If needed, add a callback to start any replay or pending send ops on
- // the subchannel call.
- if (!retry_state->completed_recv_trailing_metadata) {
- add_closures_for_replay_or_pending_send_ops(elem, batch_data, retry_state,
- &closures);
- }
+ // Max number of closures is number of pending batches plus one for
+ // each of:
+ // - recv_initial_metadata_ready (either deferred or unstarted)
+ // - recv_message_ready (either deferred or unstarted)
+ // - starting a new batch for pending send ops
+ closure_to_execute closures[GPR_ARRAY_SIZE(calld->pending_batches) + 3];
+ size_t num_closures = 0;
+ // If there are deferred recv_initial_metadata_ready or recv_message_ready
+ // callbacks, add them to closures.
+ add_closures_for_deferred_recv_callbacks(batch_data, retry_state, closures,
+ &num_closures);
+ // Find pending batches whose ops are now complete and add their
+ // on_complete callbacks to closures.
+ add_closures_for_completed_pending_batches(elem, batch_data, retry_state,
+ GRPC_ERROR_REF(error), closures,
+ &num_closures);
+ // Add closures to handle any pending batches that have not yet been started.
+ // If the call is finished, we fail these batches; otherwise, we add a
+ // callback to start_retriable_subchannel_batches() to start them on
+ // the subchannel call.
+ if (call_finished) {
+ add_closures_to_fail_unstarted_pending_batches(
+ elem, retry_state, GRPC_ERROR_REF(error), closures, &num_closures);
+ } else {
+ add_closures_for_replay_or_pending_send_ops(elem, batch_data, retry_state,
+ closures, &num_closures);
}
// Track number of pending subchannel send batches and determine if this
// was the last one.
- --calld->num_pending_retriable_subchannel_send_batches;
- const bool last_send_batch_complete =
- calld->num_pending_retriable_subchannel_send_batches == 0;
+ bool last_callback_complete = false;
+ if (batch_data->batch.send_initial_metadata ||
+ batch_data->batch.send_message ||
+ batch_data->batch.send_trailing_metadata) {
+ --calld->num_pending_retriable_subchannel_send_batches;
+ last_callback_complete =
+ calld->num_pending_retriable_subchannel_send_batches == 0;
+ }
// Don't need batch_data anymore.
batch_data_unref(batch_data);
// Schedule all of the closures identified above.
// Note: This yeilds the call combiner.
- closures.RunClosures(calld->call_combiner);
- // If this was the last subchannel send batch, unref the call stack.
- if (last_send_batch_complete) {
+ execute_closures_in_call_combiner(elem, "on_complete", closures,
+ num_closures);
+ // If we just completed the last subchannel send batch, unref the call stack.
+ if (last_callback_complete) {
GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches");
}
}
@@ -2089,22 +2185,27 @@ static void start_batch_in_call_combiner(void* arg, grpc_error* ignored) {
// Adds a closure to closures that will execute batch in the call combiner.
static void add_closure_for_subchannel_batch(
- grpc_call_element* elem, grpc_transport_stream_op_batch* batch,
- grpc_core::CallCombinerClosureList* closures) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
+ call_data* calld, grpc_transport_stream_op_batch* batch,
+ closure_to_execute* closures, size_t* num_closures) {
batch->handler_private.extra_arg = calld->subchannel_call;
GRPC_CLOSURE_INIT(&batch->handler_private.closure,
start_batch_in_call_combiner, batch,
grpc_schedule_on_exec_ctx);
+ closure_to_execute* closure = &closures[(*num_closures)++];
+ closure->closure = &batch->handler_private.closure;
+ closure->error = GRPC_ERROR_NONE;
+ // If the tracer is enabled, we log a more detailed message, which
+ // requires dynamic allocation. This will be freed in
+ // start_retriable_subchannel_batches().
if (grpc_client_channel_trace.enabled()) {
char* batch_str = grpc_transport_stream_op_batch_string(batch);
- gpr_log(GPR_INFO, "chand=%p calld=%p: starting subchannel batch: %s", chand,
- calld, batch_str);
+ gpr_asprintf(const_cast<char**>(&closure->reason),
+ "starting batch in call combiner: %s", batch_str);
gpr_free(batch_str);
+ closure->free_reason = true;
+ } else {
+ closure->reason = "start_subchannel_batch";
}
- closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
- "start_subchannel_batch");
}
// Adds retriable send_initial_metadata op to batch_data.
@@ -2240,13 +2341,9 @@ static void add_retriable_recv_trailing_metadata_op(
grpc_metadata_batch_init(&batch_data->recv_trailing_metadata);
batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata =
&batch_data->recv_trailing_metadata;
- batch_data->batch.payload->recv_trailing_metadata.collect_stats =
+ batch_data->batch.collect_stats = true;
+ batch_data->batch.payload->collect_stats.collect_stats =
&batch_data->collect_stats;
- GRPC_CLOSURE_INIT(&batch_data->recv_trailing_metadata_ready,
- recv_trailing_metadata_ready, batch_data,
- grpc_schedule_on_exec_ctx);
- batch_data->batch.payload->recv_trailing_metadata
- .recv_trailing_metadata_ready = &batch_data->recv_trailing_metadata_ready;
}
// Helper function used to start a recv_trailing_metadata batch. This
@@ -2267,11 +2364,9 @@ static void start_internal_recv_trailing_metadata(grpc_call_element* elem) {
grpc_connected_subchannel_call_get_parent_data(
calld->subchannel_call));
// Create batch_data with 2 refs, since this batch will be unreffed twice:
- // once for the recv_trailing_metadata_ready callback when the subchannel
- // batch returns, and again when we actually get a recv_trailing_metadata
- // op from the surface.
- subchannel_batch_data* batch_data =
- batch_data_create(elem, 2, false /* set_on_complete */);
+ // once when the subchannel batch returns, and again when we actually get
+ // a recv_trailing_metadata op from the surface.
+ subchannel_batch_data* batch_data = batch_data_create(elem, 2);
add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
retry_state->recv_trailing_metadata_internal_batch = batch_data;
// Note: This will release the call combiner.
@@ -2296,7 +2391,7 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay(
"send_initial_metadata op",
chand, calld);
}
- replay_batch_data = batch_data_create(elem, 1, true /* set_on_complete */);
+ replay_batch_data = batch_data_create(elem, 1);
add_retriable_send_initial_metadata_op(calld, retry_state,
replay_batch_data);
}
@@ -2313,8 +2408,7 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay(
chand, calld);
}
if (replay_batch_data == nullptr) {
- replay_batch_data =
- batch_data_create(elem, 1, true /* set_on_complete */);
+ replay_batch_data = batch_data_create(elem, 1);
}
add_retriable_send_message_op(elem, retry_state, replay_batch_data);
}
@@ -2333,8 +2427,7 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay(
chand, calld);
}
if (replay_batch_data == nullptr) {
- replay_batch_data =
- batch_data_create(elem, 1, true /* set_on_complete */);
+ replay_batch_data = batch_data_create(elem, 1);
}
add_retriable_send_trailing_metadata_op(calld, retry_state,
replay_batch_data);
@@ -2346,7 +2439,7 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay(
// *num_batches as needed.
static void add_subchannel_batches_for_pending_batches(
grpc_call_element* elem, subchannel_call_retry_state* retry_state,
- grpc_core::CallCombinerClosureList* closures) {
+ closure_to_execute* closures, size_t* num_closures) {
call_data* calld = static_cast<call_data*>(elem->call_data);
for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
pending_batch* pending = &calld->pending_batches[i];
@@ -2402,11 +2495,13 @@ static void add_subchannel_batches_for_pending_batches(
if (retry_state->completed_recv_trailing_metadata) {
subchannel_batch_data* batch_data =
retry_state->recv_trailing_metadata_internal_batch;
+ closure_to_execute* closure = &closures[(*num_closures)++];
+ closure->closure = &batch_data->on_complete;
// Batches containing recv_trailing_metadata always succeed.
- closures->Add(
- &batch_data->recv_trailing_metadata_ready, GRPC_ERROR_NONE,
- "re-executing recv_trailing_metadata_ready to propagate "
- "internally triggered result");
+ closure->error = GRPC_ERROR_NONE;
+ closure->reason =
+ "re-executing on_complete for recv_trailing_metadata "
+ "to propagate internally triggered result";
} else {
batch_data_unref(retry_state->recv_trailing_metadata_internal_batch);
}
@@ -2418,19 +2513,14 @@ static void add_subchannel_batches_for_pending_batches(
if (calld->method_params == nullptr ||
calld->method_params->retry_policy() == nullptr ||
calld->retry_committed) {
- add_closure_for_subchannel_batch(elem, batch, closures);
+ add_closure_for_subchannel_batch(calld, batch, closures, num_closures);
pending_batch_clear(calld, pending);
continue;
}
// Create batch with the right number of callbacks.
- const bool has_send_ops = batch->send_initial_metadata ||
- batch->send_message ||
- batch->send_trailing_metadata;
- const int num_callbacks = has_send_ops + batch->recv_initial_metadata +
- batch->recv_message +
- batch->recv_trailing_metadata;
- subchannel_batch_data* batch_data = batch_data_create(
- elem, num_callbacks, has_send_ops /* set_on_complete */);
+ const int num_callbacks =
+ 1 + batch->recv_initial_metadata + batch->recv_message;
+ subchannel_batch_data* batch_data = batch_data_create(elem, num_callbacks);
// Cache send ops if needed.
maybe_cache_send_ops_for_batch(calld, pending);
// send_initial_metadata.
@@ -2457,9 +2547,11 @@ static void add_subchannel_batches_for_pending_batches(
}
// recv_trailing_metadata.
if (batch->recv_trailing_metadata) {
+ GPR_ASSERT(batch->collect_stats);
add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
}
- add_closure_for_subchannel_batch(elem, &batch_data->batch, closures);
+ add_closure_for_subchannel_batch(calld, &batch_data->batch, closures,
+ num_closures);
// Track number of pending subchannel send batches.
// If this is the first one, take a ref to the call stack.
if (batch->send_initial_metadata || batch->send_message ||
@@ -2487,13 +2579,15 @@ static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) {
grpc_connected_subchannel_call_get_parent_data(
calld->subchannel_call));
// Construct list of closures to execute, one for each pending batch.
- grpc_core::CallCombinerClosureList closures;
+ // We can start up to 6 batches.
+ closure_to_execute closures[GPR_ARRAY_SIZE(calld->pending_batches)];
+ size_t num_closures = 0;
// Replay previously-returned send_* ops if needed.
subchannel_batch_data* replay_batch_data =
maybe_create_subchannel_batch_for_replay(elem, retry_state);
if (replay_batch_data != nullptr) {
- add_closure_for_subchannel_batch(elem, &replay_batch_data->batch,
- &closures);
+ add_closure_for_subchannel_batch(calld, &replay_batch_data->batch, closures,
+ &num_closures);
// Track number of pending subchannel send batches.
// If this is the first one, take a ref to the call stack.
if (calld->num_pending_retriable_subchannel_send_batches == 0) {
@@ -2502,16 +2596,17 @@ static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) {
++calld->num_pending_retriable_subchannel_send_batches;
}
// Now add pending batches.
- add_subchannel_batches_for_pending_batches(elem, retry_state, &closures);
+ add_subchannel_batches_for_pending_batches(elem, retry_state, closures,
+ &num_closures);
// Start batches on subchannel call.
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: starting %" PRIuPTR
" retriable batches on subchannel_call=%p",
- chand, calld, closures.size(), calld->subchannel_call);
+ chand, calld, num_closures, calld->subchannel_call);
}
- // Note: This will yield the call combiner.
- closures.RunClosures(calld->call_combiner);
+ execute_closures_in_call_combiner(elem, "start_retriable_subchannel_batches",
+ closures, num_closures);
}
//
diff --git a/src/core/ext/filters/deadline/deadline_filter.cc b/src/core/ext/filters/deadline/deadline_filter.cc
index d23ad67ad5..e0a41a3637 100644
--- a/src/core/ext/filters/deadline/deadline_filter.cc
+++ b/src/core/ext/filters/deadline/deadline_filter.cc
@@ -128,25 +128,21 @@ static void cancel_timer_if_needed(grpc_deadline_state* deadline_state) {
}
}
-// Callback run when we receive trailing metadata.
-static void recv_trailing_metadata_ready(void* arg, grpc_error* error) {
+// Callback run when the call is complete.
+static void on_complete(void* arg, grpc_error* error) {
grpc_deadline_state* deadline_state = static_cast<grpc_deadline_state*>(arg);
cancel_timer_if_needed(deadline_state);
- // Invoke the original callback.
- GRPC_CLOSURE_RUN(deadline_state->original_recv_trailing_metadata_ready,
- GRPC_ERROR_REF(error));
+ // Invoke the next callback.
+ GRPC_CLOSURE_RUN(deadline_state->next_on_complete, GRPC_ERROR_REF(error));
}
-// Inject our own recv_trailing_metadata_ready callback into op.
-static void inject_recv_trailing_metadata_ready(
- grpc_deadline_state* deadline_state, grpc_transport_stream_op_batch* op) {
- deadline_state->original_recv_trailing_metadata_ready =
- op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
- GRPC_CLOSURE_INIT(&deadline_state->recv_trailing_metadata_ready,
- recv_trailing_metadata_ready, deadline_state,
+// Inject our own on_complete callback into op.
+static void inject_on_complete_cb(grpc_deadline_state* deadline_state,
+ grpc_transport_stream_op_batch* op) {
+ deadline_state->next_on_complete = op->on_complete;
+ GRPC_CLOSURE_INIT(&deadline_state->on_complete, on_complete, deadline_state,
grpc_schedule_on_exec_ctx);
- op->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
- &deadline_state->recv_trailing_metadata_ready;
+ op->on_complete = &deadline_state->on_complete;
}
// Callback and associated state for starting the timer after call stack
@@ -230,7 +226,7 @@ void grpc_deadline_state_client_start_transport_stream_op_batch(
// Make sure we know when the call is complete, so that we can cancel
// the timer.
if (op->recv_trailing_metadata) {
- inject_recv_trailing_metadata_ready(deadline_state, op);
+ inject_on_complete_cb(deadline_state, op);
}
}
}
@@ -326,7 +322,7 @@ static void server_start_transport_stream_op_batch(
// the client never sends trailing metadata, because this is the
// hook that tells us when the call is complete on the server side.
if (op->recv_trailing_metadata) {
- inject_recv_trailing_metadata_ready(&calld->base.deadline_state, op);
+ inject_on_complete_cb(&calld->base.deadline_state, op);
}
}
// Chain to next filter.
diff --git a/src/core/ext/filters/deadline/deadline_filter.h b/src/core/ext/filters/deadline/deadline_filter.h
index 1d797f445a..13207cbd6f 100644
--- a/src/core/ext/filters/deadline/deadline_filter.h
+++ b/src/core/ext/filters/deadline/deadline_filter.h
@@ -37,12 +37,12 @@ typedef struct grpc_deadline_state {
grpc_deadline_timer_state timer_state;
grpc_timer timer;
grpc_closure timer_callback;
- // Closure to invoke when we receive trailing metadata.
+ // Closure to invoke when the call is complete.
// We use this to cancel the timer.
- grpc_closure recv_trailing_metadata_ready;
- // The original recv_trailing_metadata_ready closure, which we chain to
- // after our own closure is invoked.
- grpc_closure* original_recv_trailing_metadata_ready;
+ grpc_closure on_complete;
+ // The original on_complete closure, which we chain to after our own
+ // closure is invoked.
+ grpc_closure* next_on_complete;
} grpc_deadline_state;
//
diff --git a/src/core/ext/filters/http/client/http_client_filter.cc b/src/core/ext/filters/http/client/http_client_filter.cc
index 1678051beb..ae94ce47b9 100644
--- a/src/core/ext/filters/http/client/http_client_filter.cc
+++ b/src/core/ext/filters/http/client/http_client_filter.cc
@@ -55,8 +55,8 @@ struct call_data {
grpc_closure recv_initial_metadata_ready;
// State for handling recv_trailing_metadata ops.
grpc_metadata_batch* recv_trailing_metadata;
- grpc_closure* original_recv_trailing_metadata_ready;
- grpc_closure recv_trailing_metadata_ready;
+ grpc_closure* original_recv_trailing_metadata_on_complete;
+ grpc_closure recv_trailing_metadata_on_complete;
// State for handling send_message ops.
grpc_transport_stream_op_batch* send_message_batch;
size_t send_message_bytes_read;
@@ -153,7 +153,8 @@ static void recv_initial_metadata_ready(void* user_data, grpc_error* error) {
GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready, error);
}
-static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) {
+static void recv_trailing_metadata_on_complete(void* user_data,
+ grpc_error* error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
if (error == GRPC_ERROR_NONE) {
@@ -162,7 +163,7 @@ static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) {
} else {
GRPC_ERROR_REF(error);
}
- GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, error);
+ GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_on_complete, error);
}
static void send_message_on_complete(void* arg, grpc_error* error) {
@@ -311,10 +312,8 @@ static void hc_start_transport_stream_op_batch(
/* substitute our callback for the higher callback */
calld->recv_trailing_metadata =
batch->payload->recv_trailing_metadata.recv_trailing_metadata;
- calld->original_recv_trailing_metadata_ready =
- batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
- batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
- &calld->recv_trailing_metadata_ready;
+ calld->original_recv_trailing_metadata_on_complete = batch->on_complete;
+ batch->on_complete = &calld->recv_trailing_metadata_on_complete;
}
grpc_error* error = GRPC_ERROR_NONE;
@@ -421,8 +420,8 @@ static grpc_error* init_call_elem(grpc_call_element* elem,
GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
recv_initial_metadata_ready, elem,
grpc_schedule_on_exec_ctx);
- GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,
- recv_trailing_metadata_ready, elem,
+ GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_on_complete,
+ recv_trailing_metadata_on_complete, elem,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&calld->send_message_on_complete, send_message_on_complete,
elem, grpc_schedule_on_exec_ctx);
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
index 0d6b72c66e..a8090d18a6 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
@@ -1149,10 +1149,12 @@ static void maybe_start_some_streams(grpc_chttp2_transport* t) {
}
}
+/* Flag that this closure barrier wants stats to be updated before finishing */
+#define CLOSURE_BARRIER_STATS_BIT (1 << 0)
/* Flag that this closure barrier may be covering a write in a pollset, and so
we should not complete this closure until we can prove that the write got
scheduled */
-#define CLOSURE_BARRIER_MAY_COVER_WRITE (1 << 0)
+#define CLOSURE_BARRIER_MAY_COVER_WRITE (1 << 1)
/* First bit of the reference count, stored in the high order bits (with the low
bits being used for flags defined above) */
#define CLOSURE_BARRIER_FIRST_REF_BIT (1 << 16)
@@ -1204,6 +1206,10 @@ void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
grpc_error_add_child(closure->error_data.error, error);
}
if (closure->next_data.scratch < CLOSURE_BARRIER_FIRST_REF_BIT) {
+ if (closure->next_data.scratch & CLOSURE_BARRIER_STATS_BIT) {
+ grpc_transport_move_stats(&s->stats, s->collecting_stats);
+ s->collecting_stats = nullptr;
+ }
if ((t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) ||
!(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) {
GRPC_CLOSURE_RUN(closure, closure->error_data.error);
@@ -1345,14 +1351,9 @@ static void perform_stream_op_locked(void* stream_op,
}
grpc_closure* on_complete = op->on_complete;
- // TODO(roth): This is a hack needed because we use data inside of the
- // closure itself to do the barrier calculation (i.e., to ensure that
- // we don't schedule the closure until all ops in the batch have been
- // completed). This can go away once we move to a new C++ closure API
- // that provides the ability to create a barrier closure.
if (on_complete == nullptr) {
- on_complete = GRPC_CLOSURE_INIT(&op->handler_private.closure, do_nothing,
- nullptr, grpc_schedule_on_exec_ctx);
+ on_complete =
+ GRPC_CLOSURE_CREATE(do_nothing, nullptr, grpc_schedule_on_exec_ctx);
}
/* use final_data as a barrier until enqueue time; the inital counter is
@@ -1360,6 +1361,12 @@ static void perform_stream_op_locked(void* stream_op,
on_complete->next_data.scratch = CLOSURE_BARRIER_FIRST_REF_BIT;
on_complete->error_data.error = GRPC_ERROR_NONE;
+ if (op->collect_stats) {
+ GPR_ASSERT(s->collecting_stats == nullptr);
+ s->collecting_stats = op_payload->collect_stats.collect_stats;
+ on_complete->next_data.scratch |= CLOSURE_BARRIER_STATS_BIT;
+ }
+
if (op->cancel_stream) {
GRPC_STATS_INC_HTTP2_OP_CANCEL();
grpc_chttp2_cancel_stream(t, s, op_payload->cancel_stream.cancel_error);
@@ -1593,11 +1600,8 @@ static void perform_stream_op_locked(void* stream_op,
if (op->recv_trailing_metadata) {
GRPC_STATS_INC_HTTP2_OP_RECV_TRAILING_METADATA();
- GPR_ASSERT(s->collecting_stats == nullptr);
- s->collecting_stats = op_payload->recv_trailing_metadata.collect_stats;
GPR_ASSERT(s->recv_trailing_metadata_finished == nullptr);
- s->recv_trailing_metadata_finished =
- op_payload->recv_trailing_metadata.recv_trailing_metadata_ready;
+ s->recv_trailing_metadata_finished = add_closure_barrier(on_complete);
s->recv_trailing_metadata =
op_payload->recv_trailing_metadata.recv_trailing_metadata;
s->final_metadata_requested = true;
@@ -1956,12 +1960,11 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t,
}
if (s->read_closed && s->frame_storage.length == 0 && !pending_data &&
s->recv_trailing_metadata_finished != nullptr) {
- grpc_transport_move_stats(&s->stats, s->collecting_stats);
- s->collecting_stats = nullptr;
grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[1],
s->recv_trailing_metadata);
- null_then_run_closure(&s->recv_trailing_metadata_finished,
- GRPC_ERROR_NONE);
+ grpc_chttp2_complete_closure_step(
+ t, s, &s->recv_trailing_metadata_finished, GRPC_ERROR_NONE,
+ "recv_trailing_metadata_finished");
}
}
}
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.cc b/src/core/ext/transport/cronet/transport/cronet_transport.cc
index 4a252d972d..420c2d13e1 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.cc
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.cc
@@ -925,10 +925,6 @@ static bool op_can_be_run(grpc_transport_stream_op_batch* curr_op,
result = false;
}
/* Check if every op that was asked for is done. */
- /* TODO(muxi): We should not consider the recv ops here, since they
- * have their own callbacks. We should invoke a batch's on_complete
- * as soon as all of the batch's send ops are complete, even if
- * there are still recv ops pending. */
else if (curr_op->send_initial_metadata &&
!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) {
CRONET_LOG(GPR_DEBUG, "Because");
@@ -1284,20 +1280,12 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
op_can_be_run(stream_op, s, &oas->state,
OP_RECV_TRAILING_METADATA)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_TRAILING_METADATA", oas);
- grpc_error* error = GRPC_ERROR_NONE;
- if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
- error = GRPC_ERROR_REF(stream_state->cancel_error);
- } else if (stream_state->state_op_done[OP_FAILED]) {
- error = make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable.");
- } else if (oas->s->state.rs.trailing_metadata_valid) {
+ if (oas->s->state.rs.trailing_metadata_valid) {
grpc_chttp2_incoming_metadata_buffer_publish(
&oas->s->state.rs.trailing_metadata,
stream_op->payload->recv_trailing_metadata.recv_trailing_metadata);
stream_state->rs.trailing_metadata_valid = false;
}
- GRPC_CLOSURE_SCHED(
- stream_op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
- error);
stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true;
result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_op->cancel_stream &&
@@ -1410,11 +1398,6 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
GRPC_CLOSURE_SCHED(op->payload->recv_message.recv_message_ready,
GRPC_ERROR_CANCELLED);
}
- if (op->recv_trailing_metadata) {
- GRPC_CLOSURE_SCHED(
- op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
- GRPC_ERROR_CANCELLED);
- }
GRPC_CLOSURE_SCHED(op->on_complete, GRPC_ERROR_CANCELLED);
return;
}
diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc
index b0ca7f8207..2c3bff5c1e 100644
--- a/src/core/ext/transport/inproc/inproc_transport.cc
+++ b/src/core/ext/transport/inproc/inproc_transport.cc
@@ -120,6 +120,7 @@ typedef struct inproc_stream {
struct inproc_stream* stream_list_next;
} inproc_stream;
+static grpc_closure do_nothing_closure;
static bool cancel_stream_locked(inproc_stream* s, grpc_error* error);
static void op_state_machine(void* arg, grpc_error* error);
@@ -372,10 +373,6 @@ static void complete_if_batch_end_locked(inproc_stream* s, grpc_error* error,
const char* msg) {
int is_sm = static_cast<int>(op == s->send_message_op);
int is_stm = static_cast<int>(op == s->send_trailing_md_op);
- // TODO(vjpai): We should not consider the recv ops here, since they
- // have their own callbacks. We should invoke a batch's on_complete
- // as soon as all of the batch's send ops are complete, even if there
- // are still recv ops pending.
int is_rim = static_cast<int>(op == s->recv_initial_md_op);
int is_rm = static_cast<int>(op == s->recv_message_op);
int is_rtm = static_cast<int>(op == s->recv_trailing_md_op);
@@ -499,11 +496,6 @@ static void fail_helper_locked(inproc_stream* s, grpc_error* error) {
s->send_trailing_md_op = nullptr;
}
if (s->recv_trailing_md_op) {
- INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-metadata-ready %p",
- s, error);
- GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata
- .recv_trailing_metadata_ready,
- GRPC_ERROR_REF(error));
INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-md-on-complete %p",
s, error);
complete_if_batch_end_locked(
@@ -647,12 +639,6 @@ static void op_state_machine(void* arg, grpc_error* error) {
s->trailing_md_sent = true;
if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
INPROC_LOG(GPR_INFO,
- "op_state_machine %p scheduling trailing-metadata-ready", s);
- GRPC_CLOSURE_SCHED(
- s->recv_trailing_md_op->payload->recv_trailing_metadata
- .recv_trailing_metadata_ready,
- GRPC_ERROR_NONE);
- INPROC_LOG(GPR_INFO,
"op_state_machine %p scheduling trailing-md-on-complete", s);
GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->on_complete,
GRPC_ERROR_NONE);
@@ -725,12 +711,6 @@ static void op_state_machine(void* arg, grpc_error* error) {
}
if (s->recv_trailing_md_op && s->t->is_client && other &&
other->send_message_op) {
- INPROC_LOG(GPR_INFO,
- "op_state_machine %p scheduling trailing-metadata-ready %p", s,
- GRPC_ERROR_NONE);
- GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata
- .recv_trailing_metadata_ready,
- GRPC_ERROR_NONE);
maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE);
}
if (s->to_read_trailing_md_filled) {
@@ -786,10 +766,6 @@ static void op_state_machine(void* arg, grpc_error* error) {
INPROC_LOG(GPR_INFO,
"op_state_machine %p scheduling trailing-md-on-complete %p",
s, new_err);
- GRPC_CLOSURE_SCHED(
- s->recv_trailing_md_op->payload->recv_trailing_metadata
- .recv_trailing_metadata_ready,
- GRPC_ERROR_REF(new_err));
GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->on_complete,
GRPC_ERROR_REF(new_err));
s->recv_trailing_md_op = nullptr;
@@ -883,9 +859,6 @@ static bool cancel_stream_locked(inproc_stream* s, grpc_error* error) {
// couldn't complete that because we hadn't yet sent out trailing
// md, now's the chance
if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
- GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata
- .recv_trailing_metadata_ready,
- GRPC_ERROR_REF(s->cancel_self_error));
complete_if_batch_end_locked(
s, s->cancel_self_error, s->recv_trailing_md_op,
"cancel_stream scheduling trailing-md-on-complete");
@@ -900,8 +873,6 @@ static bool cancel_stream_locked(inproc_stream* s, grpc_error* error) {
return ret;
}
-static void do_nothing(void* arg, grpc_error* error) {}
-
static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
grpc_transport_stream_op_batch* op) {
INPROC_LOG(GPR_INFO, "perform_stream_op %p %p %p", gt, gs, op);
@@ -921,14 +892,8 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
}
grpc_error* error = GRPC_ERROR_NONE;
grpc_closure* on_complete = op->on_complete;
- // TODO(roth): This is a hack needed because we use data inside of the
- // closure itself to do the barrier calculation (i.e., to ensure that
- // we don't schedule the closure until all ops in the batch have been
- // completed). This can go away once we move to a new C++ closure API
- // that provides the ability to create a barrier closure.
if (on_complete == nullptr) {
- on_complete = GRPC_CLOSURE_INIT(&op->handler_private.closure, do_nothing,
- nullptr, grpc_schedule_on_exec_ctx);
+ on_complete = &do_nothing_closure;
}
if (op->cancel_stream) {
@@ -1061,15 +1026,6 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
GRPC_CLOSURE_SCHED(op->payload->recv_message.recv_message_ready,
GRPC_ERROR_REF(error));
}
- if (op->recv_trailing_metadata) {
- INPROC_LOG(
- GPR_INFO,
- "perform_stream_op error %p scheduling trailing-metadata-ready %p",
- s, error);
- GRPC_CLOSURE_SCHED(
- op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
- GRPC_ERROR_REF(error));
- }
}
INPROC_LOG(GPR_INFO, "perform_stream_op %p scheduling on_complete %p", s,
error);
@@ -1173,8 +1129,12 @@ static grpc_endpoint* get_endpoint(grpc_transport* t) { return nullptr; }
/*******************************************************************************
* GLOBAL INIT AND DESTROY
*/
+static void do_nothing(void* arg, grpc_error* error) {}
+
void grpc_inproc_transport_init(void) {
grpc_core::ExecCtx exec_ctx;
+ GRPC_CLOSURE_INIT(&do_nothing_closure, do_nothing, nullptr,
+ grpc_schedule_on_exec_ctx);
g_empty_slice = grpc_slice_from_static_buffer(nullptr, 0);
grpc_slice key_tmp = grpc_slice_from_static_string(":path");
diff --git a/src/core/lib/channel/connected_channel.cc b/src/core/lib/channel/connected_channel.cc
index e2ea334ded..ddd3029402 100644
--- a/src/core/lib/channel/connected_channel.cc
+++ b/src/core/lib/channel/connected_channel.cc
@@ -51,7 +51,6 @@ typedef struct connected_channel_call_data {
callback_state on_complete[6]; // Max number of pending batches.
callback_state recv_initial_metadata_ready;
callback_state recv_message_ready;
- callback_state recv_trailing_metadata_ready;
} call_data;
static void run_in_call_combiner(void* arg, grpc_error* error) {
@@ -112,12 +111,6 @@ static void con_start_transport_stream_op_batch(
intercept_callback(calld, state, false, "recv_message_ready",
&batch->payload->recv_message.recv_message_ready);
}
- if (batch->recv_trailing_metadata) {
- callback_state* state = &calld->recv_trailing_metadata_ready;
- intercept_callback(
- calld, state, false, "recv_trailing_metadata_ready",
- &batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready);
- }
if (batch->cancel_stream) {
// There can be more than one cancellation batch in flight at any
// given time, so we can't just pick out a fixed index into
@@ -128,7 +121,7 @@ static void con_start_transport_stream_op_batch(
static_cast<callback_state*>(gpr_malloc(sizeof(*state)));
intercept_callback(calld, state, true, "on_complete (cancel_stream)",
&batch->on_complete);
- } else if (batch->on_complete != nullptr) {
+ } else {
callback_state* state = get_state_for_batch(calld, batch);
intercept_callback(calld, state, false, "on_complete", &batch->on_complete);
}
diff --git a/src/core/lib/gprpp/inlined_vector.h b/src/core/lib/gprpp/inlined_vector.h
index 0d2586e507..f36f6cb706 100644
--- a/src/core/lib/gprpp/inlined_vector.h
+++ b/src/core/lib/gprpp/inlined_vector.h
@@ -99,8 +99,6 @@ class InlinedVector {
void push_back(T&& value) { emplace_back(std::move(value)); }
size_t size() const { return size_; }
- bool empty() const { return size_ == 0; }
-
size_t capacity() const { return capacity_; }
void clear() {
diff --git a/src/core/lib/iomgr/call_combiner.h b/src/core/lib/iomgr/call_combiner.h
index 641fa18082..0ccd08ea57 100644
--- a/src/core/lib/iomgr/call_combiner.h
+++ b/src/core/lib/iomgr/call_combiner.h
@@ -26,7 +26,6 @@
#include <grpc/support/atm.h>
#include "src/core/lib/gpr/mpscq.h"
-#include "src/core/lib/gprpp/inlined_vector.h"
#include "src/core/lib/iomgr/closure.h"
// A simple, lock-free mechanism for serializing activity related to a
@@ -110,83 +109,4 @@ void grpc_call_combiner_set_notify_on_cancel(grpc_call_combiner* call_combiner,
void grpc_call_combiner_cancel(grpc_call_combiner* call_combiner,
grpc_error* error);
-namespace grpc_core {
-
-// Helper for running a list of closures in a call combiner.
-//
-// Each callback running in the call combiner will eventually be
-// returned to the surface, at which point the surface will yield the
-// call combiner. So when we are running in the call combiner and have
-// more than one callback to return to the surface, we need to re-enter
-// the call combiner for all but one of those callbacks.
-class CallCombinerClosureList {
- public:
- CallCombinerClosureList() {}
-
- // Adds a closure to the list. The closure must eventually result in
- // the call combiner being yielded.
- void Add(grpc_closure* closure, grpc_error* error, const char* reason) {
- closures_.emplace_back(closure, error, reason);
- }
-
- // Runs all closures in the call combiner and yields the call combiner.
- //
- // All but one of the closures in the list will be scheduled via
- // GRPC_CALL_COMBINER_START(), and the remaining closure will be
- // scheduled via GRPC_CLOSURE_SCHED(), which will eventually result in
- // yielding the call combiner. If the list is empty, then the call
- // combiner will be yielded immediately.
- void RunClosures(grpc_call_combiner* call_combiner) {
- if (closures_.empty()) {
- GRPC_CALL_COMBINER_STOP(call_combiner, "no closures to schedule");
- return;
- }
- for (size_t i = 1; i < closures_.size(); ++i) {
- auto& closure = closures_[i];
- GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error,
- closure.reason);
- }
- if (grpc_call_combiner_trace.enabled()) {
- gpr_log(GPR_INFO,
- "CallCombinerClosureList executing closure while already "
- "holding call_combiner %p: closure=%p error=%s reason=%s",
- call_combiner, closures_[0].closure,
- grpc_error_string(closures_[0].error), closures_[0].reason);
- }
- // This will release the call combiner.
- GRPC_CLOSURE_SCHED(closures_[0].closure, closures_[0].error);
- closures_.clear();
- }
-
- // Runs all closures in the call combiner, but does NOT yield the call
- // combiner. All closures will be scheduled via GRPC_CALL_COMBINER_START().
- void RunClosuresWithoutYielding(grpc_call_combiner* call_combiner) {
- for (size_t i = 0; i < closures_.size(); ++i) {
- auto& closure = closures_[i];
- GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error,
- closure.reason);
- }
- closures_.clear();
- }
-
- size_t size() const { return closures_.size(); }
-
- private:
- struct CallCombinerClosure {
- grpc_closure* closure;
- grpc_error* error;
- const char* reason;
-
- CallCombinerClosure(grpc_closure* closure, grpc_error* error,
- const char* reason)
- : closure(closure), error(error), reason(reason) {}
- };
-
- // There are generally a maximum of 6 closures to run in the call
- // combiner, one for each pending op.
- InlinedVector<CallCombinerClosure, 6> closures_;
-};
-
-} // namespace grpc_core
-
#endif /* GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H */
diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h
index f14c723844..34a494485d 100644
--- a/src/core/lib/iomgr/closure.h
+++ b/src/core/lib/iomgr/closure.h
@@ -283,10 +283,9 @@ inline void grpc_closure_sched(grpc_closure* c, grpc_error* error) {
if (c->scheduled) {
gpr_log(GPR_ERROR,
"Closure already scheduled. (closure: %p, created: [%s:%d], "
- "previously scheduled at: [%s: %d], newly scheduled at [%s: %d], "
- "run?: %s",
+ "previously scheduled at: [%s: %d] run?: %s",
c, c->file_created, c->line_created, c->file_initiated,
- c->line_initiated, file, line, c->run ? "true" : "false");
+ c->line_initiated, c->run ? "true" : "false");
abort();
}
c->scheduled = true;
diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc
index 8b224b6e7b..1cf8ea94e7 100644
--- a/src/core/lib/surface/call.cc
+++ b/src/core/lib/surface/call.cc
@@ -233,7 +233,6 @@ struct grpc_call {
grpc_closure receiving_slice_ready;
grpc_closure receiving_stream_ready;
grpc_closure receiving_initial_metadata_ready;
- grpc_closure receiving_trailing_metadata_ready;
uint32_t test_only_last_message_flags;
grpc_closure release_call;
@@ -271,17 +270,8 @@ struct grpc_call {
grpc_core::TraceFlag grpc_call_error_trace(false, "call_error");
grpc_core::TraceFlag grpc_compression_trace(false, "compression");
-/* Given a size, round up to the next multiple of sizeof(void*) */
-#define ROUND_UP_TO_ALIGNMENT_SIZE(x) \
- (((x) + GPR_MAX_ALIGNMENT - 1u) & ~(GPR_MAX_ALIGNMENT - 1u))
-
-#define CALL_STACK_FROM_CALL(call) \
- (grpc_call_stack*)((char*)(call) + \
- ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)))
-#define CALL_FROM_CALL_STACK(call_stack) \
- (grpc_call*)(((char*)(call_stack)) - \
- ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)))
-
+#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack*)((call) + 1))
+#define CALL_FROM_CALL_STACK(call_stack) (((grpc_call*)(call_stack)) - 1)
#define CALL_ELEM_FROM_CALL(call, idx) \
grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx)
#define CALL_FROM_TOP_ELEM(top_elem) \
@@ -352,9 +342,8 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
size_t initial_size = grpc_channel_get_call_size_estimate(args->channel);
GRPC_STATS_INC_CALL_INITIAL_SIZE(initial_size);
gpr_arena* arena = gpr_arena_create(initial_size);
- call = static_cast<grpc_call*>(
- gpr_arena_alloc(arena, ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)) +
- channel_stack->call_stack_size));
+ call = static_cast<grpc_call*>(gpr_arena_alloc(
+ arena, sizeof(grpc_call) + channel_stack->call_stack_size));
gpr_ref_init(&call->ext_ref, 1);
call->arena = arena;
grpc_call_combiner_init(&call->call_combiner);
@@ -1220,6 +1209,7 @@ static void post_batch_completion(batch_control* bctl) {
if (bctl->op.send_initial_metadata) {
grpc_metadata_batch_destroy(
+
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]);
}
if (bctl->op.send_message) {
@@ -1227,9 +1217,14 @@ static void post_batch_completion(batch_control* bctl) {
}
if (bctl->op.send_trailing_metadata) {
grpc_metadata_batch_destroy(
+
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]);
}
if (bctl->op.recv_trailing_metadata) {
+ grpc_metadata_batch* md =
+ &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
+ recv_trailing_filter(call, md);
+
/* propagate cancellation to any interested children */
gpr_atm_rel_store(&call->received_final_op_atm, 1);
parent_call* pc = get_parent_call(call);
@@ -1251,6 +1246,7 @@ static void post_batch_completion(batch_control* bctl) {
}
gpr_mu_unlock(&pc->child_list_mu);
}
+
if (call->is_client) {
get_final_status(call, set_status_value_directly,
call->final_op.client.status,
@@ -1260,6 +1256,7 @@ static void post_batch_completion(batch_control* bctl) {
get_final_status(call, set_cancelled_value,
call->final_op.server.cancelled, nullptr, nullptr);
}
+
GRPC_ERROR_UNREF(error);
error = GRPC_ERROR_NONE;
}
@@ -1541,19 +1538,6 @@ static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) {
finish_batch_step(bctl);
}
-static void receiving_trailing_metadata_ready(void* bctlp, grpc_error* error) {
- batch_control* bctl = static_cast<batch_control*>(bctlp);
- grpc_call* call = bctl->call;
- GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_trailing_metadata_ready");
- add_batch_error(bctl, GRPC_ERROR_REF(error), false);
- if (error == GRPC_ERROR_NONE) {
- grpc_metadata_batch* md =
- &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
- recv_trailing_filter(call, md);
- }
- finish_batch_step(bctl);
-}
-
static void finish_batch(void* bctlp, grpc_error* error) {
batch_control* bctl = static_cast<batch_control*>(bctlp);
grpc_call* call = bctl->call;
@@ -1574,8 +1558,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
size_t i;
const grpc_op* op;
batch_control* bctl;
- bool has_send_ops = false;
- int num_recv_ops = 0;
+ int num_completion_callbacks_needed = 1;
grpc_call_error error = GRPC_CALL_OK;
grpc_transport_stream_op_batch* stream_op;
grpc_transport_stream_op_batch_payload* stream_op_payload;
@@ -1681,7 +1664,6 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
stream_op_payload->send_initial_metadata.peer_string =
&call->peer_string;
}
- has_send_ops = true;
break;
}
case GRPC_OP_SEND_MESSAGE: {
@@ -1711,7 +1693,6 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
&op->data.send_message.send_message->data.raw.slice_buffer, flags);
stream_op_payload->send_message.send_message.reset(
call->sending_stream.get());
- has_send_ops = true;
break;
}
case GRPC_OP_SEND_CLOSE_FROM_CLIENT: {
@@ -1732,7 +1713,6 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
call->sent_final_op = true;
stream_op_payload->send_trailing_metadata.send_trailing_metadata =
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
- has_send_ops = true;
break;
}
case GRPC_OP_SEND_STATUS_FROM_SERVER: {
@@ -1797,7 +1777,6 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
}
stream_op_payload->send_trailing_metadata.send_trailing_metadata =
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
- has_send_ops = true;
break;
}
case GRPC_OP_RECV_INITIAL_METADATA: {
@@ -1825,7 +1804,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
stream_op_payload->recv_initial_metadata.peer_string =
&call->peer_string;
}
- ++num_recv_ops;
+ num_completion_callbacks_needed++;
break;
}
case GRPC_OP_RECV_MESSAGE: {
@@ -1847,7 +1826,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
grpc_schedule_on_exec_ctx);
stream_op_payload->recv_message.recv_message_ready =
&call->receiving_stream_ready;
- ++num_recv_ops;
+ num_completion_callbacks_needed++;
break;
}
case GRPC_OP_RECV_STATUS_ON_CLIENT: {
@@ -1873,16 +1852,11 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
call->final_op.client.error_string =
op->data.recv_status_on_client.error_string;
stream_op->recv_trailing_metadata = true;
+ stream_op->collect_stats = true;
stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
- stream_op_payload->recv_trailing_metadata.collect_stats =
+ stream_op_payload->collect_stats.collect_stats =
&call->final_info.stats.transport_stream_stats;
- GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
- receiving_trailing_metadata_ready, bctl,
- grpc_schedule_on_exec_ctx);
- stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
- &call->receiving_trailing_metadata_ready;
- ++num_recv_ops;
break;
}
case GRPC_OP_RECV_CLOSE_ON_SERVER: {
@@ -1903,16 +1877,11 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
call->final_op.server.cancelled =
op->data.recv_close_on_server.cancelled;
stream_op->recv_trailing_metadata = true;
+ stream_op->collect_stats = true;
stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
- stream_op_payload->recv_trailing_metadata.collect_stats =
+ stream_op_payload->collect_stats.collect_stats =
&call->final_info.stats.transport_stream_stats;
- GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
- receiving_trailing_metadata_ready, bctl,
- grpc_schedule_on_exec_ctx);
- stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
- &call->receiving_trailing_metadata_ready;
- ++num_recv_ops;
break;
}
}
@@ -1922,15 +1891,13 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
if (!is_notify_tag_closure) {
GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
}
- gpr_ref_init(&bctl->steps_to_complete, (has_send_ops ? 1 : 0) + num_recv_ops);
-
- if (has_send_ops) {
- GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl,
- grpc_schedule_on_exec_ctx);
- stream_op->on_complete = &bctl->finish_batch;
- }
+ gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed);
+ GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl,
+ grpc_schedule_on_exec_ctx);
+ stream_op->on_complete = &bctl->finish_batch;
gpr_atm_rel_store(&call->any_ops_sent_atm, 1);
+
execute_batch(call, stream_op, &bctl->start_batch);
done:
diff --git a/src/core/lib/transport/transport.cc b/src/core/lib/transport/transport.cc
index cbdb77c844..039d603394 100644
--- a/src/core/lib/transport/transport.cc
+++ b/src/core/lib/transport/transport.cc
@@ -212,32 +212,21 @@ void grpc_transport_stream_op_batch_finish_with_failure(
if (batch->send_message) {
batch->payload->send_message.send_message.reset();
}
- if (batch->cancel_stream) {
- GRPC_ERROR_UNREF(batch->payload->cancel_stream.cancel_error);
+ if (batch->recv_message) {
+ GRPC_CALL_COMBINER_START(
+ call_combiner, batch->payload->recv_message.recv_message_ready,
+ GRPC_ERROR_REF(error), "failing recv_message_ready");
}
- // Construct a list of closures to execute.
- grpc_core::CallCombinerClosureList closures;
if (batch->recv_initial_metadata) {
- closures.Add(
+ GRPC_CALL_COMBINER_START(
+ call_combiner,
batch->payload->recv_initial_metadata.recv_initial_metadata_ready,
GRPC_ERROR_REF(error), "failing recv_initial_metadata_ready");
}
- if (batch->recv_message) {
- closures.Add(batch->payload->recv_message.recv_message_ready,
- GRPC_ERROR_REF(error), "failing recv_message_ready");
- }
- if (batch->recv_trailing_metadata) {
- closures.Add(
- batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
- GRPC_ERROR_REF(error), "failing recv_trailing_metadata_ready");
- }
- if (batch->on_complete != nullptr) {
- closures.Add(batch->on_complete, GRPC_ERROR_REF(error),
- "failing on_complete");
+ GRPC_CLOSURE_SCHED(batch->on_complete, error);
+ if (batch->cancel_stream) {
+ GRPC_ERROR_UNREF(batch->payload->cancel_stream.cancel_error);
}
- // Execute closures.
- closures.RunClosures(call_combiner);
- GRPC_ERROR_UNREF(error);
}
typedef struct {
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index 585b9dfae9..b2e252d939 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -122,15 +122,9 @@ typedef struct grpc_transport_stream_op_batch_payload
/* Transport stream op: a set of operations to perform on a transport
against a single stream */
typedef struct grpc_transport_stream_op_batch {
- /** Should be scheduled when all of the non-recv operations in the batch
- are complete.
-
- The recv ops (recv_initial_metadata, recv_message, and
- recv_trailing_metadata) each have their own callbacks. If a batch
- contains both recv ops and non-recv ops, on_complete should be
- scheduled as soon as the non-recv ops are complete, regardless of
- whether or not the recv ops are complete. If a batch contains
- only recv ops, on_complete can be null. */
+ /** Should be enqueued when all requested operations (excluding recv_message
+ and recv_initial_metadata which have their own closures) in a given batch
+ have been completed. */
grpc_closure* on_complete;
/** Values for the stream op (fields set are determined by flags above) */
@@ -155,6 +149,9 @@ typedef struct grpc_transport_stream_op_batch {
*/
bool recv_trailing_metadata : 1;
+ /** Collect any stats into provided buffer, zero internal stat counters */
+ bool collect_stats : 1;
+
/** Cancel this stream with the provided error */
bool cancel_stream : 1;
@@ -222,11 +219,12 @@ struct grpc_transport_stream_op_batch_payload {
struct {
grpc_metadata_batch* recv_trailing_metadata;
- grpc_transport_stream_stats* collect_stats;
- /** Should be enqueued when initial metadata is ready to be processed. */
- grpc_closure* recv_trailing_metadata_ready;
} recv_trailing_metadata;
+ struct {
+ grpc_transport_stream_stats* collect_stats;
+ } collect_stats;
+
/** Forcefully close this stream.
The HTTP2 semantics should be:
- server side: if cancel_error has GRPC_ERROR_INT_GRPC_STATUS, and
diff --git a/src/core/lib/transport/transport_op_string.cc b/src/core/lib/transport/transport_op_string.cc
index 8c7db642a5..25ab492f3a 100644
--- a/src/core/lib/transport/transport_op_string.cc
+++ b/src/core/lib/transport/transport_op_string.cc
@@ -120,6 +120,13 @@ char* grpc_transport_stream_op_batch_string(
gpr_strvec_add(&b, tmp);
}
+ if (op->collect_stats) {
+ gpr_strvec_add(&b, gpr_strdup(" "));
+ gpr_asprintf(&tmp, "COLLECT_STATS:%p",
+ op->payload->collect_stats.collect_stats);
+ gpr_strvec_add(&b, tmp);
+ }
+
out = gpr_strvec_flatten(&b, nullptr);
gpr_strvec_destroy(&b);
diff --git a/test/core/gprpp/inlined_vector_test.cc b/test/core/gprpp/inlined_vector_test.cc
index 41f4338f8a..ae34947718 100644
--- a/test/core/gprpp/inlined_vector_test.cc
+++ b/test/core/gprpp/inlined_vector_test.cc
@@ -27,12 +27,10 @@ namespace testing {
TEST(InlinedVectorTest, CreateAndIterate) {
const int kNumElements = 9;
InlinedVector<int, 2> v;
- EXPECT_TRUE(v.empty());
for (int i = 0; i < kNumElements; ++i) {
v.push_back(i);
}
EXPECT_EQ(static_cast<size_t>(kNumElements), v.size());
- EXPECT_FALSE(v.empty());
for (int i = 0; i < kNumElements; ++i) {
EXPECT_EQ(i, v[i]);
EXPECT_EQ(i, &v[i] - &v[0]); // Ensure contiguous allocation.
diff --git a/test/cpp/microbenchmarks/bm_call_create.cc b/test/cpp/microbenchmarks/bm_call_create.cc
index dd1610dc3d..831b29c506 100644
--- a/test/cpp/microbenchmarks/bm_call_create.cc
+++ b/test/cpp/microbenchmarks/bm_call_create.cc
@@ -621,26 +621,18 @@ typedef struct {
static void StartTransportStreamOp(grpc_call_element* elem,
grpc_transport_stream_op_batch* op) {
call_data* calld = static_cast<call_data*>(elem->call_data);
- // Construct list of closures to return.
- grpc_core::CallCombinerClosureList closures;
if (op->recv_initial_metadata) {
- closures.Add(op->payload->recv_initial_metadata.recv_initial_metadata_ready,
- GRPC_ERROR_NONE, "recv_initial_metadata");
+ GRPC_CALL_COMBINER_START(
+ calld->call_combiner,
+ op->payload->recv_initial_metadata.recv_initial_metadata_ready,
+ GRPC_ERROR_NONE, "recv_initial_metadata");
}
if (op->recv_message) {
- closures.Add(op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE,
- "recv_message");
+ GRPC_CALL_COMBINER_START(calld->call_combiner,
+ op->payload->recv_message.recv_message_ready,
+ GRPC_ERROR_NONE, "recv_message");
}
- if (op->recv_trailing_metadata) {
- closures.Add(
- op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
- GRPC_ERROR_NONE, "recv_trailing_metadata");
- }
- if (op->on_complete != nullptr) {
- closures.Add(op->on_complete, GRPC_ERROR_NONE, "on_complete");
- }
- // Execute closures.
- closures.RunClosures(calld->call_combiner);
+ GRPC_CLOSURE_SCHED(op->on_complete, GRPC_ERROR_NONE);
}
static void StartTransportOp(grpc_channel_element* elem,