aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc891
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc4
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h14
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc93
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc155
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h4
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc11
-rw-r--r--src/core/ext/filters/deadline/deadline_filter.cc28
-rw-r--r--src/core/ext/filters/deadline/deadline_filter.h10
-rw-r--r--src/core/ext/filters/http/client/http_client_filter.cc19
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.cc35
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.cc19
-rw-r--r--src/core/ext/transport/inproc/inproc_transport.cc52
-rw-r--r--src/core/lib/channel/connected_channel.cc9
-rw-r--r--src/core/lib/gprpp/inlined_vector.h2
-rw-r--r--src/core/lib/iomgr/call_combiner.h80
-rw-r--r--src/core/lib/iomgr/closure.h5
-rw-r--r--src/core/lib/surface/call.cc81
-rw-r--r--src/core/lib/transport/transport.cc29
-rw-r--r--src/core/lib/transport/transport.h22
-rw-r--r--src/core/lib/transport/transport_op_string.cc7
-rw-r--r--src/core/tsi/ssl_transport_security.cc5
-rw-r--r--src/php/README.md3
23 files changed, 835 insertions, 743 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index ea6775a8d8..34ea97e23e 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -817,6 +817,7 @@ typedef struct {
// For intercepting recv_trailing_metadata.
grpc_metadata_batch recv_trailing_metadata;
grpc_transport_stream_stats collect_stats;
+ grpc_closure recv_trailing_metadata_ready;
// For intercepting on_complete.
grpc_closure on_complete;
} subchannel_batch_data;
@@ -1192,35 +1193,24 @@ static void pending_batches_fail(grpc_call_element* elem, grpc_error* error,
"chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
elem->channel_data, calld, num_batches, grpc_error_string(error));
}
- grpc_transport_stream_op_batch*
- batches[GPR_ARRAY_SIZE(calld->pending_batches)];
- size_t num_batches = 0;
+ grpc_core::CallCombinerClosureList closures;
for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
pending_batch* pending = &calld->pending_batches[i];
grpc_transport_stream_op_batch* batch = pending->batch;
if (batch != nullptr) {
- batches[num_batches++] = batch;
+ batch->handler_private.extra_arg = calld;
+ GRPC_CLOSURE_INIT(&batch->handler_private.closure,
+ fail_pending_batch_in_call_combiner, batch,
+ grpc_schedule_on_exec_ctx);
+ closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error),
+ "pending_batches_fail");
pending_batch_clear(calld, pending);
}
}
- for (size_t i = yield_call_combiner ? 1 : 0; i < num_batches; ++i) {
- grpc_transport_stream_op_batch* batch = batches[i];
- batch->handler_private.extra_arg = calld;
- GRPC_CLOSURE_INIT(&batch->handler_private.closure,
- fail_pending_batch_in_call_combiner, batch,
- grpc_schedule_on_exec_ctx);
- GRPC_CALL_COMBINER_START(calld->call_combiner,
- &batch->handler_private.closure,
- GRPC_ERROR_REF(error), "pending_batches_fail");
- }
if (yield_call_combiner) {
- if (num_batches > 0) {
- // Note: This will release the call combiner.
- grpc_transport_stream_op_batch_finish_with_failure(
- batches[0], GRPC_ERROR_REF(error), calld->call_combiner);
- } else {
- GRPC_CALL_COMBINER_STOP(calld->call_combiner, "pending_batches_fail");
- }
+ closures.RunClosures(calld->call_combiner);
+ } else {
+ closures.RunClosuresWithoutYielding(calld->call_combiner);
}
GRPC_ERROR_UNREF(error);
}
@@ -1255,30 +1245,22 @@ static void pending_batches_resume(grpc_call_element* elem) {
" pending batches on subchannel_call=%p",
chand, calld, num_batches, calld->subchannel_call);
}
- grpc_transport_stream_op_batch*
- batches[GPR_ARRAY_SIZE(calld->pending_batches)];
- size_t num_batches = 0;
+ grpc_core::CallCombinerClosureList closures;
for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
pending_batch* pending = &calld->pending_batches[i];
grpc_transport_stream_op_batch* batch = pending->batch;
if (batch != nullptr) {
- batches[num_batches++] = batch;
+ batch->handler_private.extra_arg = calld->subchannel_call;
+ GRPC_CLOSURE_INIT(&batch->handler_private.closure,
+ resume_pending_batch_in_call_combiner, batch,
+ grpc_schedule_on_exec_ctx);
+ closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
+ "pending_batches_resume");
pending_batch_clear(calld, pending);
}
}
- for (size_t i = 1; i < num_batches; ++i) {
- grpc_transport_stream_op_batch* batch = batches[i];
- batch->handler_private.extra_arg = calld->subchannel_call;
- GRPC_CLOSURE_INIT(&batch->handler_private.closure,
- resume_pending_batch_in_call_combiner, batch,
- grpc_schedule_on_exec_ctx);
- GRPC_CALL_COMBINER_START(calld->call_combiner,
- &batch->handler_private.closure, GRPC_ERROR_NONE,
- "pending_batches_resume");
- }
- GPR_ASSERT(num_batches > 0);
// Note: This will release the call combiner.
- grpc_subchannel_call_process_op(calld->subchannel_call, batches[0]);
+ closures.RunClosures(calld->call_combiner);
}
static void maybe_clear_pending_batch(grpc_call_element* elem,
@@ -1293,7 +1275,10 @@ static void maybe_clear_pending_batch(grpc_call_element* elem,
batch->payload->recv_initial_metadata.recv_initial_metadata_ready ==
nullptr) &&
(!batch->recv_message ||
- batch->payload->recv_message.recv_message_ready == nullptr)) {
+ batch->payload->recv_message.recv_message_ready == nullptr) &&
+ (!batch->recv_trailing_metadata ||
+ batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready ==
+ nullptr)) {
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand,
calld);
@@ -1302,75 +1287,27 @@ static void maybe_clear_pending_batch(grpc_call_element* elem,
}
}
-// Returns true if all ops in the pending batch have been completed.
-static bool pending_batch_is_completed(
- pending_batch* pending, call_data* calld,
- subchannel_call_retry_state* retry_state) {
- if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
- return false;
- }
- if (pending->batch->send_initial_metadata &&
- !retry_state->completed_send_initial_metadata) {
- return false;
- }
- if (pending->batch->send_message &&
- retry_state->completed_send_message_count <
- calld->send_messages->size()) {
- return false;
- }
- if (pending->batch->send_trailing_metadata &&
- !retry_state->completed_send_trailing_metadata) {
- return false;
- }
- if (pending->batch->recv_initial_metadata &&
- !retry_state->completed_recv_initial_metadata) {
- return false;
- }
- if (pending->batch->recv_message &&
- retry_state->completed_recv_message_count <
- retry_state->started_recv_message_count) {
- return false;
- }
- if (pending->batch->recv_trailing_metadata &&
- !retry_state->completed_recv_trailing_metadata) {
- return false;
- }
- return true;
-}
-
-// Returns true if any op in the batch was not yet started.
-static bool pending_batch_is_unstarted(
- pending_batch* pending, call_data* calld,
- subchannel_call_retry_state* retry_state) {
- if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
- return false;
- }
- if (pending->batch->send_initial_metadata &&
- !retry_state->started_send_initial_metadata) {
- return true;
- }
- if (pending->batch->send_message &&
- retry_state->started_send_message_count < calld->send_messages->size()) {
- return true;
- }
- if (pending->batch->send_trailing_metadata &&
- !retry_state->started_send_trailing_metadata) {
- return true;
- }
- if (pending->batch->recv_initial_metadata &&
- !retry_state->started_recv_initial_metadata) {
- return true;
- }
- if (pending->batch->recv_message &&
- retry_state->completed_recv_message_count ==
- retry_state->started_recv_message_count) {
- return true;
- }
- if (pending->batch->recv_trailing_metadata &&
- !retry_state->started_recv_trailing_metadata) {
- return true;
+// Returns a pointer to the first pending batch for which predicate(batch)
+// returns true, or null if not found.
+template <typename Predicate>
+static pending_batch* pending_batch_find(grpc_call_element* elem,
+ const char* log_message,
+ Predicate predicate) {
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
+ pending_batch* pending = &calld->pending_batches[i];
+ grpc_transport_stream_op_batch* batch = pending->batch;
+ if (batch != nullptr && predicate(batch)) {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "chand=%p calld=%p: %s pending batch at index %" PRIuPTR, chand,
+ calld, log_message, i);
+ }
+ return pending;
+ }
}
- return false;
+ return nullptr;
}
//
@@ -1557,8 +1494,13 @@ static bool maybe_retry(grpc_call_element* elem,
// subchannel_batch_data
//
+// Creates a subchannel_batch_data object on the call's arena with the
+// specified refcount. If set_on_complete is true, the batch's
+// on_complete callback will be set to point to on_complete();
+// otherwise, the batch's on_complete callback will be null.
static subchannel_batch_data* batch_data_create(grpc_call_element* elem,
- int refcount) {
+ int refcount,
+ bool set_on_complete) {
call_data* calld = static_cast<call_data*>(elem->call_data);
subchannel_call_retry_state* retry_state =
static_cast<subchannel_call_retry_state*>(
@@ -1571,9 +1513,11 @@ static subchannel_batch_data* batch_data_create(grpc_call_element* elem,
GRPC_SUBCHANNEL_CALL_REF(calld->subchannel_call, "batch_data_create");
batch_data->batch.payload = &retry_state->batch_payload;
gpr_ref_init(&batch_data->refs, refcount);
- GRPC_CLOSURE_INIT(&batch_data->on_complete, on_complete, batch_data,
- grpc_schedule_on_exec_ctx);
- batch_data->batch.on_complete = &batch_data->on_complete;
+ if (set_on_complete) {
+ GRPC_CLOSURE_INIT(&batch_data->on_complete, on_complete, batch_data,
+ grpc_schedule_on_exec_ctx);
+ batch_data->batch.on_complete = &batch_data->on_complete;
+ }
GRPC_CALL_STACK_REF(calld->owning_call, "batch_data");
return batch_data;
}
@@ -1606,26 +1550,14 @@ static void batch_data_unref(subchannel_batch_data* batch_data) {
static void invoke_recv_initial_metadata_callback(void* arg,
grpc_error* error) {
subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
- channel_data* chand =
- static_cast<channel_data*>(batch_data->elem->channel_data);
- call_data* calld = static_cast<call_data*>(batch_data->elem->call_data);
// Find pending batch.
- pending_batch* pending = nullptr;
- for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
- grpc_transport_stream_op_batch* batch = calld->pending_batches[i].batch;
- if (batch != nullptr && batch->recv_initial_metadata &&
- batch->payload->recv_initial_metadata.recv_initial_metadata_ready !=
- nullptr) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO,
- "chand=%p calld=%p: invoking recv_initial_metadata_ready for "
- "pending batch at index %" PRIuPTR,
- chand, calld, i);
- }
- pending = &calld->pending_batches[i];
- break;
- }
- }
+ pending_batch* pending = pending_batch_find(
+ batch_data->elem, "invoking recv_initial_metadata_ready for",
+ [](grpc_transport_stream_op_batch* batch) {
+ return batch->recv_initial_metadata &&
+ batch->payload->recv_initial_metadata
+ .recv_initial_metadata_ready != nullptr;
+ });
GPR_ASSERT(pending != nullptr);
// Return metadata.
grpc_metadata_batch_move(
@@ -1661,10 +1593,19 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
static_cast<subchannel_call_retry_state*>(
grpc_connected_subchannel_call_get_parent_data(
batch_data->subchannel_call));
+ retry_state->completed_recv_initial_metadata = true;
+ // If a retry was already dispatched, then we're not going to use the
+ // result of this recv_initial_metadata op, so do nothing.
+ if (retry_state->retry_dispatched) {
+ GRPC_CALL_COMBINER_STOP(
+ calld->call_combiner,
+ "recv_initial_metadata_ready after retry dispatched");
+ return;
+ }
// If we got an error or a Trailers-Only response and have not yet gotten
- // the recv_trailing_metadata on_complete callback, then defer
- // propagating this callback back to the surface. We can evaluate whether
- // to retry when recv_trailing_metadata comes back.
+ // the recv_trailing_metadata_ready callback, then defer propagating this
+ // callback back to the surface. We can evaluate whether to retry when
+ // recv_trailing_metadata comes back.
if (GPR_UNLIKELY((batch_data->trailing_metadata_available ||
error != GRPC_ERROR_NONE) &&
!retry_state->completed_recv_trailing_metadata)) {
@@ -1689,9 +1630,9 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
}
// Received valid initial metadata, so commit the call.
retry_commit(elem, retry_state);
+ // Invoke the callback to return the result to the surface.
// Manually invoking a callback function; it does not take ownership of error.
invoke_recv_initial_metadata_callback(batch_data, error);
- GRPC_ERROR_UNREF(error);
}
//
@@ -1701,25 +1642,13 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
// Invokes recv_message_ready for a subchannel batch.
static void invoke_recv_message_callback(void* arg, grpc_error* error) {
subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
- channel_data* chand =
- static_cast<channel_data*>(batch_data->elem->channel_data);
- call_data* calld = static_cast<call_data*>(batch_data->elem->call_data);
// Find pending op.
- pending_batch* pending = nullptr;
- for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
- grpc_transport_stream_op_batch* batch = calld->pending_batches[i].batch;
- if (batch != nullptr && batch->recv_message &&
- batch->payload->recv_message.recv_message_ready != nullptr) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO,
- "chand=%p calld=%p: invoking recv_message_ready for "
- "pending batch at index %" PRIuPTR,
- chand, calld, i);
- }
- pending = &calld->pending_batches[i];
- break;
- }
- }
+ pending_batch* pending = pending_batch_find(
+ batch_data->elem, "invoking recv_message_ready for",
+ [](grpc_transport_stream_op_batch* batch) {
+ return batch->recv_message &&
+ batch->payload->recv_message.recv_message_ready != nullptr;
+ });
GPR_ASSERT(pending != nullptr);
// Return payload.
*pending->batch->payload->recv_message.recv_message =
@@ -1751,10 +1680,18 @@ static void recv_message_ready(void* arg, grpc_error* error) {
static_cast<subchannel_call_retry_state*>(
grpc_connected_subchannel_call_get_parent_data(
batch_data->subchannel_call));
+ ++retry_state->completed_recv_message_count;
+ // If a retry was already dispatched, then we're not going to use the
+ // result of this recv_message op, so do nothing.
+ if (retry_state->retry_dispatched) {
+ GRPC_CALL_COMBINER_STOP(calld->call_combiner,
+ "recv_message_ready after retry dispatched");
+ return;
+ }
// If we got an error or the payload was nullptr and we have not yet gotten
- // the recv_trailing_metadata on_complete callback, then defer
- // propagating this callback back to the surface. We can evaluate whether
- // to retry when recv_trailing_metadata comes back.
+ // the recv_trailing_metadata_ready callback, then defer propagating this
+ // callback back to the surface. We can evaluate whether to retry when
+ // recv_trailing_metadata comes back.
if (GPR_UNLIKELY(
(batch_data->recv_message == nullptr || error != GRPC_ERROR_NONE) &&
!retry_state->completed_recv_trailing_metadata)) {
@@ -1777,133 +1714,268 @@ static void recv_message_ready(void* arg, grpc_error* error) {
}
// Received a valid message, so commit the call.
retry_commit(elem, retry_state);
+ // Invoke the callback to return the result to the surface.
// Manually invoking a callback function; it does not take ownership of error.
invoke_recv_message_callback(batch_data, error);
- GRPC_ERROR_UNREF(error);
}
//
-// list of closures to execute in call combiner
+// recv_trailing_metadata handling
//
-// Represents a closure that needs to run in the call combiner as part of
-// starting or completing a batch.
-typedef struct {
- grpc_closure* closure;
- grpc_error* error;
- const char* reason;
- bool free_reason = false;
-} closure_to_execute;
-
-static void execute_closures_in_call_combiner(grpc_call_element* elem,
- const char* caller,
- closure_to_execute* closures,
- size_t num_closures) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+// Sets *status and *server_pushback_md based on batch_data and error.
+static void get_call_status(subchannel_batch_data* batch_data,
+ grpc_error* error, grpc_status_code* status,
+ grpc_mdelem** server_pushback_md) {
+ grpc_call_element* elem = batch_data->elem;
call_data* calld = static_cast<call_data*>(elem->call_data);
- // Note that the call combiner will be yielded for each closure that
- // we schedule. We're already running in the call combiner, so one of
- // the closures can be scheduled directly, but the others will
- // have to re-enter the call combiner.
- if (num_closures > 0) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: %s starting closure: %s", chand,
- calld, caller, closures[0].reason);
- }
- GRPC_CLOSURE_SCHED(closures[0].closure, closures[0].error);
- if (closures[0].free_reason) {
- gpr_free(const_cast<char*>(closures[0].reason));
- }
- for (size_t i = 1; i < num_closures; ++i) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO,
- "chand=%p calld=%p: %s starting closure in call combiner: %s",
- chand, calld, caller, closures[i].reason);
- }
- GRPC_CALL_COMBINER_START(calld->call_combiner, closures[i].closure,
- closures[i].error, closures[i].reason);
- if (closures[i].free_reason) {
- gpr_free(const_cast<char*>(closures[i].reason));
- }
- }
+ if (error != GRPC_ERROR_NONE) {
+ grpc_error_get_status(error, calld->deadline, status, nullptr, nullptr,
+ nullptr);
} else {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: no closures to run for %s", chand,
- calld, caller);
+ grpc_metadata_batch* md_batch =
+ batch_data->batch.payload->recv_trailing_metadata
+ .recv_trailing_metadata;
+ GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
+ *status =
+ grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md);
+ if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
+ *server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
}
- GRPC_CALL_COMBINER_STOP(calld->call_combiner, "no closures to run");
}
+ GRPC_ERROR_UNREF(error);
}
-//
-// on_complete callback handling
-//
-
-// Updates retry_state to reflect the ops completed in batch_data.
-static void update_retry_state_for_completed_batch(
- subchannel_batch_data* batch_data,
- subchannel_call_retry_state* retry_state) {
- if (batch_data->batch.send_initial_metadata) {
- retry_state->completed_send_initial_metadata = true;
- }
- if (batch_data->batch.send_message) {
- ++retry_state->completed_send_message_count;
- }
- if (batch_data->batch.send_trailing_metadata) {
- retry_state->completed_send_trailing_metadata = true;
- }
- if (batch_data->batch.recv_initial_metadata) {
- retry_state->completed_recv_initial_metadata = true;
- }
- if (batch_data->batch.recv_message) {
- ++retry_state->completed_recv_message_count;
- }
- if (batch_data->batch.recv_trailing_metadata) {
- retry_state->completed_recv_trailing_metadata = true;
+// Adds recv_trailing_metadata_ready closure to closures.
+static void add_closure_for_recv_trailing_metadata_ready(
+ grpc_call_element* elem, subchannel_batch_data* batch_data,
+ grpc_error* error, grpc_core::CallCombinerClosureList* closures) {
+ // Find pending batch.
+ pending_batch* pending = pending_batch_find(
+ elem, "invoking recv_trailing_metadata for",
+ [](grpc_transport_stream_op_batch* batch) {
+ return batch->recv_trailing_metadata &&
+ batch->payload->recv_trailing_metadata
+ .recv_trailing_metadata_ready != nullptr;
+ });
+ // If we generated the recv_trailing_metadata op internally via
+ // start_internal_recv_trailing_metadata(), then there will be no
+ // pending batch.
+ if (pending == nullptr) {
+ GRPC_ERROR_UNREF(error);
+ return;
}
+ // Return metadata.
+ grpc_metadata_batch_move(
+ &batch_data->recv_trailing_metadata,
+ pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata);
+ // Add closure.
+ closures->Add(pending->batch->payload->recv_trailing_metadata
+ .recv_trailing_metadata_ready,
+ error, "recv_trailing_metadata_ready for pending batch");
+ // Update bookkeeping.
+ pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
+ nullptr;
+ maybe_clear_pending_batch(elem, pending);
}
// Adds any necessary closures for deferred recv_initial_metadata and
-// recv_message callbacks to closures, updating *num_closures as needed.
+// recv_message callbacks to closures.
static void add_closures_for_deferred_recv_callbacks(
subchannel_batch_data* batch_data, subchannel_call_retry_state* retry_state,
- closure_to_execute* closures, size_t* num_closures) {
+ grpc_core::CallCombinerClosureList* closures) {
if (batch_data->batch.recv_trailing_metadata) {
// Add closure for deferred recv_initial_metadata_ready.
if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch !=
nullptr)) {
- closure_to_execute* closure = &closures[(*num_closures)++];
- closure->closure = GRPC_CLOSURE_INIT(
- &batch_data->recv_initial_metadata_ready,
- invoke_recv_initial_metadata_callback,
- retry_state->recv_initial_metadata_ready_deferred_batch,
- grpc_schedule_on_exec_ctx);
- closure->error = retry_state->recv_initial_metadata_error;
- closure->reason = "resuming recv_initial_metadata_ready";
+ GRPC_CLOSURE_INIT(&batch_data->recv_initial_metadata_ready,
+ invoke_recv_initial_metadata_callback,
+ retry_state->recv_initial_metadata_ready_deferred_batch,
+ grpc_schedule_on_exec_ctx);
+ closures->Add(&batch_data->recv_initial_metadata_ready,
+ retry_state->recv_initial_metadata_error,
+ "resuming recv_initial_metadata_ready");
retry_state->recv_initial_metadata_ready_deferred_batch = nullptr;
}
// Add closure for deferred recv_message_ready.
if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch !=
nullptr)) {
- closure_to_execute* closure = &closures[(*num_closures)++];
- closure->closure = GRPC_CLOSURE_INIT(
- &batch_data->recv_message_ready, invoke_recv_message_callback,
- retry_state->recv_message_ready_deferred_batch,
- grpc_schedule_on_exec_ctx);
- closure->error = retry_state->recv_message_error;
- closure->reason = "resuming recv_message_ready";
+ GRPC_CLOSURE_INIT(&batch_data->recv_message_ready,
+ invoke_recv_message_callback,
+ retry_state->recv_message_ready_deferred_batch,
+ grpc_schedule_on_exec_ctx);
+ closures->Add(&batch_data->recv_message_ready,
+ retry_state->recv_message_error,
+ "resuming recv_message_ready");
retry_state->recv_message_ready_deferred_batch = nullptr;
}
}
}
+// Returns true if any op in the batch was not yet started.
+// Only looks at send ops, since recv ops are always started immediately.
+static bool pending_batch_is_unstarted(
+ pending_batch* pending, call_data* calld,
+ subchannel_call_retry_state* retry_state) {
+ if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
+ return false;
+ }
+ if (pending->batch->send_initial_metadata &&
+ !retry_state->started_send_initial_metadata) {
+ return true;
+ }
+ if (pending->batch->send_message &&
+ retry_state->started_send_message_count < calld->send_messages->size()) {
+ return true;
+ }
+ if (pending->batch->send_trailing_metadata &&
+ !retry_state->started_send_trailing_metadata) {
+ return true;
+ }
+ return false;
+}
+
+// For any pending batch containing an op that has not yet been started,
+// adds the pending batch's completion closures to closures.
+static void add_closures_to_fail_unstarted_pending_batches(
+ grpc_call_element* elem, subchannel_call_retry_state* retry_state,
+ grpc_error* error, grpc_core::CallCombinerClosureList* closures) {
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
+ pending_batch* pending = &calld->pending_batches[i];
+ if (pending_batch_is_unstarted(pending, calld, retry_state)) {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "chand=%p calld=%p: failing unstarted pending batch at index "
+ "%" PRIuPTR,
+ chand, calld, i);
+ }
+ closures->Add(pending->batch->on_complete, GRPC_ERROR_REF(error),
+ "failing on_complete for pending batch");
+ pending->batch->on_complete = nullptr;
+ maybe_clear_pending_batch(elem, pending);
+ }
+ }
+ GRPC_ERROR_UNREF(error);
+}
+
+// Runs necessary closures upon completion of a call attempt.
+static void run_closures_for_completed_call(subchannel_batch_data* batch_data,
+ grpc_error* error) {
+ grpc_call_element* elem = batch_data->elem;
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ subchannel_call_retry_state* retry_state =
+ static_cast<subchannel_call_retry_state*>(
+ grpc_connected_subchannel_call_get_parent_data(
+ batch_data->subchannel_call));
+ // Construct list of closures to execute.
+ grpc_core::CallCombinerClosureList closures;
+ // First, add closure for recv_trailing_metadata_ready.
+ add_closure_for_recv_trailing_metadata_ready(
+ elem, batch_data, GRPC_ERROR_REF(error), &closures);
+ // If there are deferred recv_initial_metadata_ready or recv_message_ready
+ // callbacks, add them to closures.
+ add_closures_for_deferred_recv_callbacks(batch_data, retry_state, &closures);
+ // Add closures to fail any pending batches that have not yet been started.
+ add_closures_to_fail_unstarted_pending_batches(
+ elem, retry_state, GRPC_ERROR_REF(error), &closures);
+ // Don't need batch_data anymore.
+ batch_data_unref(batch_data);
+ // Schedule all of the closures identified above.
+ // Note: This will release the call combiner.
+ closures.RunClosures(calld->call_combiner);
+ GRPC_ERROR_UNREF(error);
+}
+
+// Intercepts recv_trailing_metadata_ready callback for retries.
+// Commits the call and returns the trailing metadata up the stack.
+static void recv_trailing_metadata_ready(void* arg, grpc_error* error) {
+ subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
+ grpc_call_element* elem = batch_data->elem;
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "chand=%p calld=%p: got recv_trailing_metadata_ready, error=%s",
+ chand, calld, grpc_error_string(error));
+ }
+ subchannel_call_retry_state* retry_state =
+ static_cast<subchannel_call_retry_state*>(
+ grpc_connected_subchannel_call_get_parent_data(
+ batch_data->subchannel_call));
+ retry_state->completed_recv_trailing_metadata = true;
+ // Get the call's status and check for server pushback metadata.
+ grpc_status_code status = GRPC_STATUS_OK;
+ grpc_mdelem* server_pushback_md = nullptr;
+ get_call_status(batch_data, GRPC_ERROR_REF(error), &status,
+ &server_pushback_md);
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand,
+ calld, grpc_status_code_to_string(status));
+ }
+ // Check if we should retry.
+ if (maybe_retry(elem, batch_data, status, server_pushback_md)) {
+ // Unref batch_data for deferred recv_initial_metadata_ready or
+ // recv_message_ready callbacks, if any.
+ if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) {
+ batch_data_unref(batch_data);
+ GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
+ }
+ if (retry_state->recv_message_ready_deferred_batch != nullptr) {
+ batch_data_unref(batch_data);
+ GRPC_ERROR_UNREF(retry_state->recv_message_error);
+ }
+ batch_data_unref(batch_data);
+ return;
+ }
+ // Not retrying, so commit the call.
+ retry_commit(elem, retry_state);
+ // Run any necessary closures.
+ run_closures_for_completed_call(batch_data, GRPC_ERROR_REF(error));
+}
+
+//
+// on_complete callback handling
+//
+
+// Adds the on_complete closure for the pending batch completed in
+// batch_data to closures.
+static void add_closure_for_completed_pending_batch(
+ grpc_call_element* elem, subchannel_batch_data* batch_data,
+ subchannel_call_retry_state* retry_state, grpc_error* error,
+ grpc_core::CallCombinerClosureList* closures) {
+ pending_batch* pending = pending_batch_find(
+ elem, "completed", [batch_data](grpc_transport_stream_op_batch* batch) {
+ // Match the pending batch with the same set of send ops as the
+ // subchannel batch we've just completed.
+ return batch->on_complete != nullptr &&
+ batch_data->batch.send_initial_metadata ==
+ batch->send_initial_metadata &&
+ batch_data->batch.send_message == batch->send_message &&
+ batch_data->batch.send_trailing_metadata ==
+ batch->send_trailing_metadata;
+ });
+ // If batch_data is a replay batch, then there will be no pending
+ // batch to complete.
+ if (pending == nullptr) {
+ GRPC_ERROR_UNREF(error);
+ return;
+ }
+ // Add closure.
+ closures->Add(pending->batch->on_complete, error,
+ "on_complete for pending batch");
+ pending->batch->on_complete = nullptr;
+ maybe_clear_pending_batch(elem, pending);
+}
+
// If there are any cached ops to replay or pending ops to start on the
// subchannel call, adds a closure to closures to invoke
-// start_retriable_subchannel_batches(), updating *num_closures as needed.
+// start_retriable_subchannel_batches().
static void add_closures_for_replay_or_pending_send_ops(
grpc_call_element* elem, subchannel_batch_data* batch_data,
- subchannel_call_retry_state* retry_state, closure_to_execute* closures,
- size_t* num_closures) {
+ subchannel_call_retry_state* retry_state,
+ grpc_core::CallCombinerClosureList* closures) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
bool have_pending_send_message_ops =
@@ -1929,93 +2001,12 @@ static void add_closures_for_replay_or_pending_send_ops(
"chand=%p calld=%p: starting next batch for pending send op(s)",
chand, calld);
}
- closure_to_execute* closure = &closures[(*num_closures)++];
- closure->closure = GRPC_CLOSURE_INIT(
- &batch_data->batch.handler_private.closure,
- start_retriable_subchannel_batches, elem, grpc_schedule_on_exec_ctx);
- closure->error = GRPC_ERROR_NONE;
- closure->reason = "starting next batch for send_* op(s)";
- }
-}
-
-// For any pending batch completed in batch_data, adds the necessary
-// completion closures to closures, updating *num_closures as needed.
-static void add_closures_for_completed_pending_batches(
- grpc_call_element* elem, subchannel_batch_data* batch_data,
- subchannel_call_retry_state* retry_state, grpc_error* error,
- closure_to_execute* closures, size_t* num_closures) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
- pending_batch* pending = &calld->pending_batches[i];
- if (pending_batch_is_completed(pending, calld, retry_state)) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO,
- "chand=%p calld=%p: pending batch completed at index %" PRIuPTR,
- chand, calld, i);
- }
- // Copy the trailing metadata to return it to the surface.
- if (batch_data->batch.recv_trailing_metadata) {
- grpc_metadata_batch_move(&batch_data->recv_trailing_metadata,
- pending->batch->payload->recv_trailing_metadata
- .recv_trailing_metadata);
- }
- closure_to_execute* closure = &closures[(*num_closures)++];
- closure->closure = pending->batch->on_complete;
- closure->error = GRPC_ERROR_REF(error);
- closure->reason = "on_complete for pending batch";
- pending->batch->on_complete = nullptr;
- maybe_clear_pending_batch(elem, pending);
- }
+ GRPC_CLOSURE_INIT(&batch_data->batch.handler_private.closure,
+ start_retriable_subchannel_batches, elem,
+ grpc_schedule_on_exec_ctx);
+ closures->Add(&batch_data->batch.handler_private.closure, GRPC_ERROR_NONE,
+ "starting next batch for send_* op(s)");
}
- GRPC_ERROR_UNREF(error);
-}
-
-// For any pending batch containing an op that has not yet been started,
-// adds the pending batch's completion closures to closures, updating
-// *num_closures as needed.
-static void add_closures_to_fail_unstarted_pending_batches(
- grpc_call_element* elem, subchannel_call_retry_state* retry_state,
- grpc_error* error, closure_to_execute* closures, size_t* num_closures) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
- pending_batch* pending = &calld->pending_batches[i];
- if (pending_batch_is_unstarted(pending, calld, retry_state)) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO,
- "chand=%p calld=%p: failing unstarted pending batch at index "
- "%" PRIuPTR,
- chand, calld, i);
- }
- if (pending->batch->recv_initial_metadata) {
- closure_to_execute* closure = &closures[(*num_closures)++];
- closure->closure = pending->batch->payload->recv_initial_metadata
- .recv_initial_metadata_ready;
- closure->error = GRPC_ERROR_REF(error);
- closure->reason =
- "failing recv_initial_metadata_ready for pending batch";
- pending->batch->payload->recv_initial_metadata
- .recv_initial_metadata_ready = nullptr;
- }
- if (pending->batch->recv_message) {
- *pending->batch->payload->recv_message.recv_message = nullptr;
- closure_to_execute* closure = &closures[(*num_closures)++];
- closure->closure =
- pending->batch->payload->recv_message.recv_message_ready;
- closure->error = GRPC_ERROR_REF(error);
- closure->reason = "failing recv_message_ready for pending batch";
- pending->batch->payload->recv_message.recv_message_ready = nullptr;
- }
- closure_to_execute* closure = &closures[(*num_closures)++];
- closure->closure = pending->batch->on_complete;
- closure->error = GRPC_ERROR_REF(error);
- closure->reason = "failing on_complete for pending batch";
- pending->batch->on_complete = nullptr;
- maybe_clear_pending_batch(elem, pending);
- }
- }
- GRPC_ERROR_UNREF(error);
}
// Callback used to intercept on_complete from subchannel calls.
@@ -2035,136 +2026,49 @@ static void on_complete(void* arg, grpc_error* error) {
static_cast<subchannel_call_retry_state*>(
grpc_connected_subchannel_call_get_parent_data(
batch_data->subchannel_call));
- // If we have previously completed recv_trailing_metadata, then the
- // call is finished.
- bool call_finished = retry_state->completed_recv_trailing_metadata;
- // Record whether we were already committed before receiving this callback.
- const bool previously_committed = calld->retry_committed;
// Update bookkeeping in retry_state.
- update_retry_state_for_completed_batch(batch_data, retry_state);
- if (call_finished) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: call already finished", chand,
- calld);
- }
- } else {
- // Check if this batch finished the call, and if so, get its status.
- // The call is finished if either (a) this callback was invoked with
- // an error or (b) we receive status.
- grpc_status_code status = GRPC_STATUS_OK;
- grpc_mdelem* server_pushback_md = nullptr;
- if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { // Case (a).
- call_finished = true;
- grpc_error_get_status(error, calld->deadline, &status, nullptr, nullptr,
- nullptr);
- } else if (batch_data->batch.recv_trailing_metadata) { // Case (b).
- call_finished = true;
- grpc_metadata_batch* md_batch =
- batch_data->batch.payload->recv_trailing_metadata
- .recv_trailing_metadata;
- GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
- status = grpc_get_status_code_from_metadata(
- md_batch->idx.named.grpc_status->md);
- if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
- server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
- }
- }
- // If the call just finished, check if we should retry.
- if (call_finished) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand,
- calld, grpc_status_code_to_string(status));
- }
- if (maybe_retry(elem, batch_data, status, server_pushback_md)) {
- // Unref batch_data for deferred recv_initial_metadata_ready or
- // recv_message_ready callbacks, if any.
- if (batch_data->batch.recv_trailing_metadata &&
- retry_state->recv_initial_metadata_ready_deferred_batch !=
- nullptr) {
- batch_data_unref(batch_data);
- GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
- }
- if (batch_data->batch.recv_trailing_metadata &&
- retry_state->recv_message_ready_deferred_batch != nullptr) {
- batch_data_unref(batch_data);
- GRPC_ERROR_UNREF(retry_state->recv_message_error);
- }
- // Track number of pending subchannel send batches and determine if
- // this was the last one.
- bool last_callback_complete = false;
- if (batch_data->batch.send_initial_metadata ||
- batch_data->batch.send_message ||
- batch_data->batch.send_trailing_metadata) {
- --calld->num_pending_retriable_subchannel_send_batches;
- last_callback_complete =
- calld->num_pending_retriable_subchannel_send_batches == 0;
- }
- batch_data_unref(batch_data);
- // If we just completed the last subchannel send batch, unref the
- // call stack.
- if (last_callback_complete) {
- GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches");
- }
- return;
- }
- // Not retrying, so commit the call.
- retry_commit(elem, retry_state);
- }
+ if (batch_data->batch.send_initial_metadata) {
+ retry_state->completed_send_initial_metadata = true;
+ }
+ if (batch_data->batch.send_message) {
+ ++retry_state->completed_send_message_count;
}
- // If we were already committed before receiving this callback, free
- // cached data for send ops that we've just completed. (If the call has
- // just now finished, the call to retry_commit() above will have freed all
- // cached send ops, so we don't need to do it here.)
- if (previously_committed) {
+ if (batch_data->batch.send_trailing_metadata) {
+ retry_state->completed_send_trailing_metadata = true;
+ }
+ // If the call is committed, free cached data for send ops that we've just
+ // completed.
+ if (calld->retry_committed) {
free_cached_send_op_data_for_completed_batch(elem, batch_data, retry_state);
}
- // Call not being retried.
// Construct list of closures to execute.
- // Max number of closures is number of pending batches plus one for
- // each of:
- // - recv_initial_metadata_ready (either deferred or unstarted)
- // - recv_message_ready (either deferred or unstarted)
- // - starting a new batch for pending send ops
- closure_to_execute closures[GPR_ARRAY_SIZE(calld->pending_batches) + 3];
- size_t num_closures = 0;
- // If there are deferred recv_initial_metadata_ready or recv_message_ready
- // callbacks, add them to closures.
- add_closures_for_deferred_recv_callbacks(batch_data, retry_state, closures,
- &num_closures);
- // Find pending batches whose ops are now complete and add their
- // on_complete callbacks to closures.
- add_closures_for_completed_pending_batches(elem, batch_data, retry_state,
- GRPC_ERROR_REF(error), closures,
- &num_closures);
- // Add closures to handle any pending batches that have not yet been started.
- // If the call is finished, we fail these batches; otherwise, we add a
- // callback to start_retriable_subchannel_batches() to start them on
- // the subchannel call.
- if (call_finished) {
- add_closures_to_fail_unstarted_pending_batches(
- elem, retry_state, GRPC_ERROR_REF(error), closures, &num_closures);
- } else {
- add_closures_for_replay_or_pending_send_ops(elem, batch_data, retry_state,
- closures, &num_closures);
+ grpc_core::CallCombinerClosureList closures;
+ // If a retry was already dispatched, that means we saw
+ // recv_trailing_metadata before this, so we do nothing here.
+ // Otherwise, invoke the callback to return the result to the surface.
+ if (!retry_state->retry_dispatched) {
+ // Add closure for the completed pending batch, if any.
+ add_closure_for_completed_pending_batch(elem, batch_data, retry_state,
+ GRPC_ERROR_REF(error), &closures);
+ // If needed, add a callback to start any replay or pending send ops on
+ // the subchannel call.
+ if (!retry_state->completed_recv_trailing_metadata) {
+ add_closures_for_replay_or_pending_send_ops(elem, batch_data, retry_state,
+ &closures);
+ }
}
// Track number of pending subchannel send batches and determine if this
// was the last one.
- bool last_callback_complete = false;
- if (batch_data->batch.send_initial_metadata ||
- batch_data->batch.send_message ||
- batch_data->batch.send_trailing_metadata) {
- --calld->num_pending_retriable_subchannel_send_batches;
- last_callback_complete =
- calld->num_pending_retriable_subchannel_send_batches == 0;
- }
+ --calld->num_pending_retriable_subchannel_send_batches;
+ const bool last_send_batch_complete =
+ calld->num_pending_retriable_subchannel_send_batches == 0;
// Don't need batch_data anymore.
batch_data_unref(batch_data);
// Schedule all of the closures identified above.
// Note: This yeilds the call combiner.
- execute_closures_in_call_combiner(elem, "on_complete", closures,
- num_closures);
- // If we just completed the last subchannel send batch, unref the call stack.
- if (last_callback_complete) {
+ closures.RunClosures(calld->call_combiner);
+ // If this was the last subchannel send batch, unref the call stack.
+ if (last_send_batch_complete) {
GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches");
}
}
@@ -2185,27 +2089,22 @@ static void start_batch_in_call_combiner(void* arg, grpc_error* ignored) {
// Adds a closure to closures that will execute batch in the call combiner.
static void add_closure_for_subchannel_batch(
- call_data* calld, grpc_transport_stream_op_batch* batch,
- closure_to_execute* closures, size_t* num_closures) {
+ grpc_call_element* elem, grpc_transport_stream_op_batch* batch,
+ grpc_core::CallCombinerClosureList* closures) {
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
batch->handler_private.extra_arg = calld->subchannel_call;
GRPC_CLOSURE_INIT(&batch->handler_private.closure,
start_batch_in_call_combiner, batch,
grpc_schedule_on_exec_ctx);
- closure_to_execute* closure = &closures[(*num_closures)++];
- closure->closure = &batch->handler_private.closure;
- closure->error = GRPC_ERROR_NONE;
- // If the tracer is enabled, we log a more detailed message, which
- // requires dynamic allocation. This will be freed in
- // start_retriable_subchannel_batches().
if (grpc_client_channel_trace.enabled()) {
char* batch_str = grpc_transport_stream_op_batch_string(batch);
- gpr_asprintf(const_cast<char**>(&closure->reason),
- "starting batch in call combiner: %s", batch_str);
+ gpr_log(GPR_INFO, "chand=%p calld=%p: starting subchannel batch: %s", chand,
+ calld, batch_str);
gpr_free(batch_str);
- closure->free_reason = true;
- } else {
- closure->reason = "start_subchannel_batch";
}
+ closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
+ "start_subchannel_batch");
}
// Adds retriable send_initial_metadata op to batch_data.
@@ -2341,9 +2240,13 @@ static void add_retriable_recv_trailing_metadata_op(
grpc_metadata_batch_init(&batch_data->recv_trailing_metadata);
batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata =
&batch_data->recv_trailing_metadata;
- batch_data->batch.collect_stats = true;
- batch_data->batch.payload->collect_stats.collect_stats =
+ batch_data->batch.payload->recv_trailing_metadata.collect_stats =
&batch_data->collect_stats;
+ GRPC_CLOSURE_INIT(&batch_data->recv_trailing_metadata_ready,
+ recv_trailing_metadata_ready, batch_data,
+ grpc_schedule_on_exec_ctx);
+ batch_data->batch.payload->recv_trailing_metadata
+ .recv_trailing_metadata_ready = &batch_data->recv_trailing_metadata_ready;
}
// Helper function used to start a recv_trailing_metadata batch. This
@@ -2364,9 +2267,11 @@ static void start_internal_recv_trailing_metadata(grpc_call_element* elem) {
grpc_connected_subchannel_call_get_parent_data(
calld->subchannel_call));
// Create batch_data with 2 refs, since this batch will be unreffed twice:
- // once when the subchannel batch returns, and again when we actually get
- // a recv_trailing_metadata op from the surface.
- subchannel_batch_data* batch_data = batch_data_create(elem, 2);
+ // once for the recv_trailing_metadata_ready callback when the subchannel
+ // batch returns, and again when we actually get a recv_trailing_metadata
+ // op from the surface.
+ subchannel_batch_data* batch_data =
+ batch_data_create(elem, 2, false /* set_on_complete */);
add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
retry_state->recv_trailing_metadata_internal_batch = batch_data;
// Note: This will release the call combiner.
@@ -2391,7 +2296,7 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay(
"send_initial_metadata op",
chand, calld);
}
- replay_batch_data = batch_data_create(elem, 1);
+ replay_batch_data = batch_data_create(elem, 1, true /* set_on_complete */);
add_retriable_send_initial_metadata_op(calld, retry_state,
replay_batch_data);
}
@@ -2408,7 +2313,8 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay(
chand, calld);
}
if (replay_batch_data == nullptr) {
- replay_batch_data = batch_data_create(elem, 1);
+ replay_batch_data =
+ batch_data_create(elem, 1, true /* set_on_complete */);
}
add_retriable_send_message_op(elem, retry_state, replay_batch_data);
}
@@ -2427,7 +2333,8 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay(
chand, calld);
}
if (replay_batch_data == nullptr) {
- replay_batch_data = batch_data_create(elem, 1);
+ replay_batch_data =
+ batch_data_create(elem, 1, true /* set_on_complete */);
}
add_retriable_send_trailing_metadata_op(calld, retry_state,
replay_batch_data);
@@ -2439,7 +2346,7 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay(
// *num_batches as needed.
static void add_subchannel_batches_for_pending_batches(
grpc_call_element* elem, subchannel_call_retry_state* retry_state,
- closure_to_execute* closures, size_t* num_closures) {
+ grpc_core::CallCombinerClosureList* closures) {
call_data* calld = static_cast<call_data*>(elem->call_data);
for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
pending_batch* pending = &calld->pending_batches[i];
@@ -2495,13 +2402,11 @@ static void add_subchannel_batches_for_pending_batches(
if (retry_state->completed_recv_trailing_metadata) {
subchannel_batch_data* batch_data =
retry_state->recv_trailing_metadata_internal_batch;
- closure_to_execute* closure = &closures[(*num_closures)++];
- closure->closure = &batch_data->on_complete;
// Batches containing recv_trailing_metadata always succeed.
- closure->error = GRPC_ERROR_NONE;
- closure->reason =
- "re-executing on_complete for recv_trailing_metadata "
- "to propagate internally triggered result";
+ closures->Add(
+ &batch_data->recv_trailing_metadata_ready, GRPC_ERROR_NONE,
+ "re-executing recv_trailing_metadata_ready to propagate "
+ "internally triggered result");
} else {
batch_data_unref(retry_state->recv_trailing_metadata_internal_batch);
}
@@ -2513,14 +2418,19 @@ static void add_subchannel_batches_for_pending_batches(
if (calld->method_params == nullptr ||
calld->method_params->retry_policy() == nullptr ||
calld->retry_committed) {
- add_closure_for_subchannel_batch(calld, batch, closures, num_closures);
+ add_closure_for_subchannel_batch(elem, batch, closures);
pending_batch_clear(calld, pending);
continue;
}
// Create batch with the right number of callbacks.
- const int num_callbacks =
- 1 + batch->recv_initial_metadata + batch->recv_message;
- subchannel_batch_data* batch_data = batch_data_create(elem, num_callbacks);
+ const bool has_send_ops = batch->send_initial_metadata ||
+ batch->send_message ||
+ batch->send_trailing_metadata;
+ const int num_callbacks = has_send_ops + batch->recv_initial_metadata +
+ batch->recv_message +
+ batch->recv_trailing_metadata;
+ subchannel_batch_data* batch_data = batch_data_create(
+ elem, num_callbacks, has_send_ops /* set_on_complete */);
// Cache send ops if needed.
maybe_cache_send_ops_for_batch(calld, pending);
// send_initial_metadata.
@@ -2547,11 +2457,9 @@ static void add_subchannel_batches_for_pending_batches(
}
// recv_trailing_metadata.
if (batch->recv_trailing_metadata) {
- GPR_ASSERT(batch->collect_stats);
add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
}
- add_closure_for_subchannel_batch(calld, &batch_data->batch, closures,
- num_closures);
+ add_closure_for_subchannel_batch(elem, &batch_data->batch, closures);
// Track number of pending subchannel send batches.
// If this is the first one, take a ref to the call stack.
if (batch->send_initial_metadata || batch->send_message ||
@@ -2579,15 +2487,13 @@ static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) {
grpc_connected_subchannel_call_get_parent_data(
calld->subchannel_call));
// Construct list of closures to execute, one for each pending batch.
- // We can start up to 6 batches.
- closure_to_execute closures[GPR_ARRAY_SIZE(calld->pending_batches)];
- size_t num_closures = 0;
+ grpc_core::CallCombinerClosureList closures;
// Replay previously-returned send_* ops if needed.
subchannel_batch_data* replay_batch_data =
maybe_create_subchannel_batch_for_replay(elem, retry_state);
if (replay_batch_data != nullptr) {
- add_closure_for_subchannel_batch(calld, &replay_batch_data->batch, closures,
- &num_closures);
+ add_closure_for_subchannel_batch(elem, &replay_batch_data->batch,
+ &closures);
// Track number of pending subchannel send batches.
// If this is the first one, take a ref to the call stack.
if (calld->num_pending_retriable_subchannel_send_batches == 0) {
@@ -2596,17 +2502,16 @@ static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) {
++calld->num_pending_retriable_subchannel_send_batches;
}
// Now add pending batches.
- add_subchannel_batches_for_pending_batches(elem, retry_state, closures,
- &num_closures);
+ add_subchannel_batches_for_pending_batches(elem, retry_state, &closures);
// Start batches on subchannel call.
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: starting %" PRIuPTR
" retriable batches on subchannel_call=%p",
- chand, calld, num_closures, calld->subchannel_call);
+ chand, calld, closures.size(), calld->subchannel_call);
}
- execute_closures_in_call_combiner(elem, "start_retriable_subchannel_batches",
- closures, num_closures);
+ // Note: This will yield the call combiner.
+ closures.RunClosures(calld->call_combiner);
}
//
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
index 3c40ae14b8..f4f6444c5f 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
@@ -414,10 +414,10 @@ void AresDnsResolver::StartResolvingLocked() {
resolving_ = true;
lb_addresses_ = nullptr;
service_config_json_ = nullptr;
- pending_request_ = grpc_dns_lookup_ares(
+ pending_request_ = grpc_dns_lookup_ares_locked(
dns_server_, name_to_resolve_, kDefaultPort, interested_parties_,
&on_resolved_, &lb_addresses_, true /* check_grpclb */,
- request_service_config_ ? &service_config_json_ : nullptr);
+ request_service_config_ ? &service_config_json_ : nullptr, combiner());
last_resolution_timestamp_ = grpc_core::ExecCtx::Get()->Now();
}
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h
index 6239549534..27d1511d94 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h
@@ -29,25 +29,27 @@ typedef struct grpc_ares_ev_driver grpc_ares_ev_driver;
/* Start \a ev_driver. It will keep working until all IO on its ares_channel is
done, or grpc_ares_ev_driver_destroy() is called. It may notify the callbacks
bound to its ares_channel when necessary. */
-void grpc_ares_ev_driver_start(grpc_ares_ev_driver* ev_driver);
+void grpc_ares_ev_driver_start_locked(grpc_ares_ev_driver* ev_driver);
/* Returns the ares_channel owned by \a ev_driver. To bind a c-ares query to
\a ev_driver, use the ares_channel owned by \a ev_driver as the arg of the
query. */
-ares_channel* grpc_ares_ev_driver_get_channel(grpc_ares_ev_driver* ev_driver);
+ares_channel* grpc_ares_ev_driver_get_channel_locked(
+ grpc_ares_ev_driver* ev_driver);
/* Creates a new grpc_ares_ev_driver. Returns GRPC_ERROR_NONE if \a ev_driver is
created successfully. */
-grpc_error* grpc_ares_ev_driver_create(grpc_ares_ev_driver** ev_driver,
- grpc_pollset_set* pollset_set);
+grpc_error* grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver** ev_driver,
+ grpc_pollset_set* pollset_set,
+ grpc_combiner* combiner);
/* Destroys \a ev_driver asynchronously. Pending lookups made on \a ev_driver
will be cancelled and their on_done callbacks will be invoked with a status
of ARES_ECANCELLED. */
-void grpc_ares_ev_driver_destroy(grpc_ares_ev_driver* ev_driver);
+void grpc_ares_ev_driver_destroy_locked(grpc_ares_ev_driver* ev_driver);
/* Shutdown all the grpc_fds used by \a ev_driver */
-void grpc_ares_ev_driver_shutdown(grpc_ares_ev_driver* ev_driver);
+void grpc_ares_ev_driver_shutdown_locked(grpc_ares_ev_driver* ev_driver);
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_C_ARES_GRPC_ARES_EV_DRIVER_H \
*/
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
index f496e9694d..b73e979e9f 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
@@ -39,17 +39,15 @@
typedef struct fd_node {
/** the owner of this fd node */
grpc_ares_ev_driver* ev_driver;
- /** a closure wrapping on_readable_cb, which should be invoked when the
- grpc_fd in this node becomes readable. */
+ /** a closure wrapping on_readable_locked, which should be
+ invoked when the grpc_fd in this node becomes readable. */
grpc_closure read_closure;
- /** a closure wrapping on_writable_cb, which should be invoked when the
- grpc_fd in this node becomes writable. */
+ /** a closure wrapping on_writable_locked, which should be
+ invoked when the grpc_fd in this node becomes writable. */
grpc_closure write_closure;
/** next fd node in the list */
struct fd_node* next;
- /** mutex guarding the rest of the state */
- gpr_mu mu;
/** the grpc_fd owned by this fd node */
grpc_fd* fd;
/** if the readable closure has been registered */
@@ -68,8 +66,8 @@ struct grpc_ares_ev_driver {
/** refcount of the event driver */
gpr_refcount refs;
- /** mutex guarding the rest of the state */
- gpr_mu mu;
+ /** combiner to synchronize c-ares and I/O callbacks on */
+ grpc_combiner* combiner;
/** a list of grpc_fd that this event driver is currently using. */
fd_node* fds;
/** is this event driver currently working? */
@@ -92,19 +90,18 @@ static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver* ev_driver) {
if (gpr_unref(&ev_driver->refs)) {
gpr_log(GPR_DEBUG, "destroy ev_driver %" PRIuPTR, (uintptr_t)ev_driver);
GPR_ASSERT(ev_driver->fds == nullptr);
- gpr_mu_destroy(&ev_driver->mu);
+ GRPC_COMBINER_UNREF(ev_driver->combiner, "free ares event driver");
ares_destroy(ev_driver->channel);
gpr_free(ev_driver);
}
}
-static void fd_node_destroy(fd_node* fdn) {
+static void fd_node_destroy_locked(fd_node* fdn) {
gpr_log(GPR_DEBUG, "delete fd: %d", grpc_fd_wrapped_fd(fdn->fd));
GPR_ASSERT(!fdn->readable_registered);
GPR_ASSERT(!fdn->writable_registered);
GPR_ASSERT(fdn->already_shutdown);
- gpr_mu_destroy(&fdn->mu);
- /* c-ares library has closed the fd inside grpc_fd. This fd may be picked up
+ /* c-ares library will close the fd inside grpc_fd. This fd may be picked up
immediately by another thread, and should not be closed by the following
grpc_fd_orphan. */
int dummy_release_fd;
@@ -119,15 +116,16 @@ static void fd_node_shutdown_locked(fd_node* fdn, const char* reason) {
}
}
-grpc_error* grpc_ares_ev_driver_create(grpc_ares_ev_driver** ev_driver,
- grpc_pollset_set* pollset_set) {
+grpc_error* grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver** ev_driver,
+ grpc_pollset_set* pollset_set,
+ grpc_combiner* combiner) {
*ev_driver = static_cast<grpc_ares_ev_driver*>(
gpr_malloc(sizeof(grpc_ares_ev_driver)));
ares_options opts;
memset(&opts, 0, sizeof(opts));
opts.flags |= ARES_FLAG_STAYOPEN;
int status = ares_init_options(&(*ev_driver)->channel, &opts, ARES_OPT_FLAGS);
- gpr_log(GPR_DEBUG, "grpc_ares_ev_driver_create");
+ gpr_log(GPR_DEBUG, "grpc_ares_ev_driver_create_locked");
if (status != ARES_SUCCESS) {
char* err_msg;
gpr_asprintf(&err_msg, "Failed to init ares channel. C-ares error: %s",
@@ -137,7 +135,7 @@ grpc_error* grpc_ares_ev_driver_create(grpc_ares_ev_driver** ev_driver,
gpr_free(*ev_driver);
return err;
}
- gpr_mu_init(&(*ev_driver)->mu);
+ (*ev_driver)->combiner = GRPC_COMBINER_REF(combiner, "ares event driver");
gpr_ref_init(&(*ev_driver)->refs, 1);
(*ev_driver)->pollset_set = pollset_set;
(*ev_driver)->fds = nullptr;
@@ -146,34 +144,26 @@ grpc_error* grpc_ares_ev_driver_create(grpc_ares_ev_driver** ev_driver,
return GRPC_ERROR_NONE;
}
-void grpc_ares_ev_driver_destroy(grpc_ares_ev_driver* ev_driver) {
- // It's not safe to shut down remaining fds here directly, becauses
- // ares_host_callback does not provide an exec_ctx. We mark the event driver
- // as being shut down. If the event driver is working,
- // grpc_ares_notify_on_event_locked will shut down the fds; if it's not
- // working, there are no fds to shut down.
- gpr_mu_lock(&ev_driver->mu);
+void grpc_ares_ev_driver_destroy_locked(grpc_ares_ev_driver* ev_driver) {
+ // We mark the event driver as being shut down. If the event driver
+ // is working, grpc_ares_notify_on_event_locked will shut down the
+ // fds; if it's not working, there are no fds to shut down.
ev_driver->shutting_down = true;
- gpr_mu_unlock(&ev_driver->mu);
grpc_ares_ev_driver_unref(ev_driver);
}
-void grpc_ares_ev_driver_shutdown(grpc_ares_ev_driver* ev_driver) {
- gpr_mu_lock(&ev_driver->mu);
+void grpc_ares_ev_driver_shutdown_locked(grpc_ares_ev_driver* ev_driver) {
ev_driver->shutting_down = true;
fd_node* fn = ev_driver->fds;
while (fn != nullptr) {
- gpr_mu_lock(&fn->mu);
fd_node_shutdown_locked(fn, "grpc_ares_ev_driver_shutdown");
- gpr_mu_unlock(&fn->mu);
fn = fn->next;
}
- gpr_mu_unlock(&ev_driver->mu);
}
// Search fd in the fd_node list head. This is an O(n) search, the max possible
// value of n is ARES_GETSOCK_MAXNUM (16). n is typically 1 - 2 in our tests.
-static fd_node* pop_fd_node(fd_node** head, int fd) {
+static fd_node* pop_fd_node_locked(fd_node** head, int fd) {
fd_node dummy_head;
dummy_head.next = *head;
fd_node* node = &dummy_head;
@@ -190,24 +180,22 @@ static fd_node* pop_fd_node(fd_node** head, int fd) {
}
/* Check if \a fd is still readable */
-static bool grpc_ares_is_fd_still_readable(grpc_ares_ev_driver* ev_driver,
- int fd) {
+static bool grpc_ares_is_fd_still_readable_locked(
+ grpc_ares_ev_driver* ev_driver, int fd) {
size_t bytes_available = 0;
return ioctl(fd, FIONREAD, &bytes_available) == 0 && bytes_available > 0;
}
-static void on_readable_cb(void* arg, grpc_error* error) {
+static void on_readable_locked(void* arg, grpc_error* error) {
fd_node* fdn = static_cast<fd_node*>(arg);
grpc_ares_ev_driver* ev_driver = fdn->ev_driver;
- gpr_mu_lock(&fdn->mu);
const int fd = grpc_fd_wrapped_fd(fdn->fd);
fdn->readable_registered = false;
- gpr_mu_unlock(&fdn->mu);
gpr_log(GPR_DEBUG, "readable on %d", fd);
if (error == GRPC_ERROR_NONE) {
do {
ares_process_fd(ev_driver->channel, fd, ARES_SOCKET_BAD);
- } while (grpc_ares_is_fd_still_readable(ev_driver, fd));
+ } while (grpc_ares_is_fd_still_readable_locked(ev_driver, fd));
} else {
// If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or
// timed out. The pending lookups made on this ev_driver will be cancelled
@@ -217,19 +205,15 @@ static void on_readable_cb(void* arg, grpc_error* error) {
// grpc_ares_notify_on_event_locked().
ares_cancel(ev_driver->channel);
}
- gpr_mu_lock(&ev_driver->mu);
grpc_ares_notify_on_event_locked(ev_driver);
- gpr_mu_unlock(&ev_driver->mu);
grpc_ares_ev_driver_unref(ev_driver);
}
-static void on_writable_cb(void* arg, grpc_error* error) {
+static void on_writable_locked(void* arg, grpc_error* error) {
fd_node* fdn = static_cast<fd_node*>(arg);
grpc_ares_ev_driver* ev_driver = fdn->ev_driver;
- gpr_mu_lock(&fdn->mu);
const int fd = grpc_fd_wrapped_fd(fdn->fd);
fdn->writable_registered = false;
- gpr_mu_unlock(&fdn->mu);
gpr_log(GPR_DEBUG, "writable on %d", fd);
if (error == GRPC_ERROR_NONE) {
ares_process_fd(ev_driver->channel, ARES_SOCKET_BAD, fd);
@@ -242,13 +226,12 @@ static void on_writable_cb(void* arg, grpc_error* error) {
// grpc_ares_notify_on_event_locked().
ares_cancel(ev_driver->channel);
}
- gpr_mu_lock(&ev_driver->mu);
grpc_ares_notify_on_event_locked(ev_driver);
- gpr_mu_unlock(&ev_driver->mu);
grpc_ares_ev_driver_unref(ev_driver);
}
-ares_channel* grpc_ares_ev_driver_get_channel(grpc_ares_ev_driver* ev_driver) {
+ares_channel* grpc_ares_ev_driver_get_channel_locked(
+ grpc_ares_ev_driver* ev_driver) {
return &ev_driver->channel;
}
@@ -263,7 +246,7 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) {
for (size_t i = 0; i < ARES_GETSOCK_MAXNUM; i++) {
if (ARES_GETSOCK_READABLE(socks_bitmask, i) ||
ARES_GETSOCK_WRITABLE(socks_bitmask, i)) {
- fd_node* fdn = pop_fd_node(&ev_driver->fds, socks[i]);
+ fd_node* fdn = pop_fd_node_locked(&ev_driver->fds, socks[i]);
// Create a new fd_node if sock[i] is not in the fd_node list.
if (fdn == nullptr) {
char* fd_name;
@@ -275,17 +258,15 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) {
fdn->readable_registered = false;
fdn->writable_registered = false;
fdn->already_shutdown = false;
- gpr_mu_init(&fdn->mu);
- GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable_cb, fdn,
- grpc_schedule_on_exec_ctx);
- GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable_cb, fdn,
- grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable_locked, fdn,
+ grpc_combiner_scheduler(ev_driver->combiner));
+ GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable_locked, fdn,
+ grpc_combiner_scheduler(ev_driver->combiner));
grpc_pollset_set_add_fd(ev_driver->pollset_set, fdn->fd);
gpr_free(fd_name);
}
fdn->next = new_list;
new_list = fdn;
- gpr_mu_lock(&fdn->mu);
// Register read_closure if the socket is readable and read_closure has
// not been registered with this socket.
if (ARES_GETSOCK_READABLE(socks_bitmask, i) &&
@@ -305,7 +286,6 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) {
grpc_fd_notify_on_write(fdn->fd, &fdn->write_closure);
fdn->writable_registered = true;
}
- gpr_mu_unlock(&fdn->mu);
}
}
}
@@ -315,15 +295,12 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) {
while (ev_driver->fds != nullptr) {
fd_node* cur = ev_driver->fds;
ev_driver->fds = ev_driver->fds->next;
- gpr_mu_lock(&cur->mu);
fd_node_shutdown_locked(cur, "c-ares fd shutdown");
if (!cur->readable_registered && !cur->writable_registered) {
- gpr_mu_unlock(&cur->mu);
- fd_node_destroy(cur);
+ fd_node_destroy_locked(cur);
} else {
cur->next = new_list;
new_list = cur;
- gpr_mu_unlock(&cur->mu);
}
}
ev_driver->fds = new_list;
@@ -334,13 +311,11 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) {
}
}
-void grpc_ares_ev_driver_start(grpc_ares_ev_driver* ev_driver) {
- gpr_mu_lock(&ev_driver->mu);
+void grpc_ares_ev_driver_start_locked(grpc_ares_ev_driver* ev_driver) {
if (!ev_driver->working) {
ev_driver->working = true;
grpc_ares_notify_on_event_locked(ev_driver);
}
- gpr_mu_unlock(&ev_driver->mu);
}
#endif /* GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) */
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
index 18d0a7b9f6..471de58e8c 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
@@ -65,8 +65,6 @@ struct grpc_ares_request {
/** number of ongoing queries */
gpr_refcount pending_queries;
- /** mutex guarding the rest of the state */
- gpr_mu mu;
/** is there at least one successful query, set in on_done_cb */
bool success;
/** the errors explaining the request failure, set in on_done_cb */
@@ -74,7 +72,8 @@ struct grpc_ares_request {
};
typedef struct grpc_ares_hostbyname_request {
- /** following members are set in create_hostbyname_request */
+ /** following members are set in create_hostbyname_request_locked
+ */
/** the top-level request instance */
grpc_ares_request* parent_request;
/** host to resolve, parsed from the name to resolve */
@@ -96,10 +95,6 @@ static uint16_t strhtons(const char* port) {
return htons(static_cast<unsigned short>(atoi(port)));
}
-static void grpc_ares_request_ref(grpc_ares_request* r) {
- gpr_ref(&r->pending_queries);
-}
-
static void log_address_sorting_list(grpc_lb_addresses* lb_addrs,
const char* input_output_str) {
for (size_t i = 0; i < lb_addrs->num_addresses; i++) {
@@ -149,7 +144,11 @@ void grpc_cares_wrapper_test_only_address_sorting_sort(
grpc_cares_wrapper_address_sorting_sort(lb_addrs);
}
-static void grpc_ares_request_unref(grpc_ares_request* r) {
+static void grpc_ares_request_ref_locked(grpc_ares_request* r) {
+ gpr_ref(&r->pending_queries);
+}
+
+static void grpc_ares_request_unref_locked(grpc_ares_request* r) {
/* If there are no pending queries, invoke on_done callback and destroy the
request */
if (gpr_unref(&r->pending_queries)) {
@@ -158,13 +157,12 @@ static void grpc_ares_request_unref(grpc_ares_request* r) {
grpc_cares_wrapper_address_sorting_sort(lb_addrs);
}
GRPC_CLOSURE_SCHED(r->on_done, r->error);
- gpr_mu_destroy(&r->mu);
- grpc_ares_ev_driver_destroy(r->ev_driver);
+ grpc_ares_ev_driver_destroy_locked(r->ev_driver);
gpr_free(r);
}
}
-static grpc_ares_hostbyname_request* create_hostbyname_request(
+static grpc_ares_hostbyname_request* create_hostbyname_request_locked(
grpc_ares_request* parent_request, char* host, uint16_t port,
bool is_balancer) {
grpc_ares_hostbyname_request* hr = static_cast<grpc_ares_hostbyname_request*>(
@@ -173,22 +171,22 @@ static grpc_ares_hostbyname_request* create_hostbyname_request(
hr->host = gpr_strdup(host);
hr->port = port;
hr->is_balancer = is_balancer;
- grpc_ares_request_ref(parent_request);
+ grpc_ares_request_ref_locked(parent_request);
return hr;
}
-static void destroy_hostbyname_request(grpc_ares_hostbyname_request* hr) {
- grpc_ares_request_unref(hr->parent_request);
+static void destroy_hostbyname_request_locked(
+ grpc_ares_hostbyname_request* hr) {
+ grpc_ares_request_unref_locked(hr->parent_request);
gpr_free(hr->host);
gpr_free(hr);
}
-static void on_hostbyname_done_cb(void* arg, int status, int timeouts,
- struct hostent* hostent) {
+static void on_hostbyname_done_locked(void* arg, int status, int timeouts,
+ struct hostent* hostent) {
grpc_ares_hostbyname_request* hr =
static_cast<grpc_ares_hostbyname_request*>(arg);
grpc_ares_request* r = hr->parent_request;
- gpr_mu_lock(&r->mu);
if (status == ARES_SUCCESS) {
GRPC_ERROR_UNREF(r->error);
r->error = GRPC_ERROR_NONE;
@@ -263,33 +261,33 @@ static void on_hostbyname_done_cb(void* arg, int status, int timeouts,
r->error = grpc_error_add_child(error, r->error);
}
}
- gpr_mu_unlock(&r->mu);
- destroy_hostbyname_request(hr);
+ destroy_hostbyname_request_locked(hr);
}
-static void on_srv_query_done_cb(void* arg, int status, int timeouts,
- unsigned char* abuf, int alen) {
+static void on_srv_query_done_locked(void* arg, int status, int timeouts,
+ unsigned char* abuf, int alen) {
grpc_ares_request* r = static_cast<grpc_ares_request*>(arg);
- gpr_log(GPR_DEBUG, "on_query_srv_done_cb");
+ gpr_log(GPR_DEBUG, "on_query_srv_done_locked");
if (status == ARES_SUCCESS) {
- gpr_log(GPR_DEBUG, "on_query_srv_done_cb ARES_SUCCESS");
+ gpr_log(GPR_DEBUG, "on_query_srv_done_locked ARES_SUCCESS");
struct ares_srv_reply* reply;
const int parse_status = ares_parse_srv_reply(abuf, alen, &reply);
if (parse_status == ARES_SUCCESS) {
- ares_channel* channel = grpc_ares_ev_driver_get_channel(r->ev_driver);
+ ares_channel* channel =
+ grpc_ares_ev_driver_get_channel_locked(r->ev_driver);
for (struct ares_srv_reply* srv_it = reply; srv_it != nullptr;
srv_it = srv_it->next) {
if (grpc_ipv6_loopback_available()) {
- grpc_ares_hostbyname_request* hr = create_hostbyname_request(
+ grpc_ares_hostbyname_request* hr = create_hostbyname_request_locked(
r, srv_it->host, htons(srv_it->port), true /* is_balancer */);
ares_gethostbyname(*channel, hr->host, AF_INET6,
- on_hostbyname_done_cb, hr);
+ on_hostbyname_done_locked, hr);
}
- grpc_ares_hostbyname_request* hr = create_hostbyname_request(
+ grpc_ares_hostbyname_request* hr = create_hostbyname_request_locked(
r, srv_it->host, htons(srv_it->port), true /* is_balancer */);
- ares_gethostbyname(*channel, hr->host, AF_INET, on_hostbyname_done_cb,
- hr);
- grpc_ares_ev_driver_start(r->ev_driver);
+ ares_gethostbyname(*channel, hr->host, AF_INET,
+ on_hostbyname_done_locked, hr);
+ grpc_ares_ev_driver_start_locked(r->ev_driver);
}
}
if (reply != nullptr) {
@@ -307,21 +305,20 @@ static void on_srv_query_done_cb(void* arg, int status, int timeouts,
r->error = grpc_error_add_child(error, r->error);
}
}
- grpc_ares_request_unref(r);
+ grpc_ares_request_unref_locked(r);
}
static const char g_service_config_attribute_prefix[] = "grpc_config=";
-static void on_txt_done_cb(void* arg, int status, int timeouts,
- unsigned char* buf, int len) {
- gpr_log(GPR_DEBUG, "on_txt_done_cb");
+static void on_txt_done_locked(void* arg, int status, int timeouts,
+ unsigned char* buf, int len) {
+ gpr_log(GPR_DEBUG, "on_txt_done_locked");
char* error_msg;
grpc_ares_request* r = static_cast<grpc_ares_request*>(arg);
const size_t prefix_len = sizeof(g_service_config_attribute_prefix) - 1;
struct ares_txt_ext* result = nullptr;
struct ares_txt_ext* reply = nullptr;
grpc_error* error = GRPC_ERROR_NONE;
- gpr_mu_lock(&r->mu);
if (status != ARES_SUCCESS) goto fail;
status = ares_parse_txt_reply_ext(buf, len, &reply);
if (status != ARES_SUCCESS) goto fail;
@@ -366,14 +363,14 @@ fail:
r->error = grpc_error_add_child(error, r->error);
}
done:
- gpr_mu_unlock(&r->mu);
- grpc_ares_request_unref(r);
+ grpc_ares_request_unref_locked(r);
}
-static grpc_ares_request* grpc_dns_lookup_ares_impl(
+static grpc_ares_request* grpc_dns_lookup_ares_locked_impl(
const char* dns_server, const char* name, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done,
- grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json) {
+ grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json,
+ grpc_combiner* combiner) {
grpc_error* error = GRPC_ERROR_NONE;
grpc_ares_hostbyname_request* hr = nullptr;
grpc_ares_request* r = nullptr;
@@ -402,20 +399,19 @@ static grpc_ares_request* grpc_dns_lookup_ares_impl(
}
port = gpr_strdup(default_port);
}
-
grpc_ares_ev_driver* ev_driver;
- error = grpc_ares_ev_driver_create(&ev_driver, interested_parties);
+ error = grpc_ares_ev_driver_create_locked(&ev_driver, interested_parties,
+ combiner);
if (error != GRPC_ERROR_NONE) goto error_cleanup;
r = static_cast<grpc_ares_request*>(gpr_zalloc(sizeof(grpc_ares_request)));
- gpr_mu_init(&r->mu);
r->ev_driver = ev_driver;
r->on_done = on_done;
r->lb_addrs_out = addrs;
r->service_config_json_out = service_config_json;
r->success = false;
r->error = GRPC_ERROR_NONE;
- channel = grpc_ares_ev_driver_get_channel(r->ev_driver);
+ channel = grpc_ares_ev_driver_get_channel_locked(r->ev_driver);
// If dns_server is specified, use it.
if (dns_server != nullptr) {
@@ -457,32 +453,34 @@ static grpc_ares_request* grpc_dns_lookup_ares_impl(
}
gpr_ref_init(&r->pending_queries, 1);
if (grpc_ipv6_loopback_available()) {
- hr = create_hostbyname_request(r, host, strhtons(port),
- false /* is_balancer */);
- ares_gethostbyname(*channel, hr->host, AF_INET6, on_hostbyname_done_cb, hr);
+ hr = create_hostbyname_request_locked(r, host, strhtons(port),
+ false /* is_balancer */);
+ ares_gethostbyname(*channel, hr->host, AF_INET6, on_hostbyname_done_locked,
+ hr);
}
- hr = create_hostbyname_request(r, host, strhtons(port),
- false /* is_balancer */);
- ares_gethostbyname(*channel, hr->host, AF_INET, on_hostbyname_done_cb, hr);
+ hr = create_hostbyname_request_locked(r, host, strhtons(port),
+ false /* is_balancer */);
+ ares_gethostbyname(*channel, hr->host, AF_INET, on_hostbyname_done_locked,
+ hr);
if (check_grpclb) {
/* Query the SRV record */
- grpc_ares_request_ref(r);
+ grpc_ares_request_ref_locked(r);
char* service_name;
gpr_asprintf(&service_name, "_grpclb._tcp.%s", host);
- ares_query(*channel, service_name, ns_c_in, ns_t_srv, on_srv_query_done_cb,
- r);
+ ares_query(*channel, service_name, ns_c_in, ns_t_srv,
+ on_srv_query_done_locked, r);
gpr_free(service_name);
}
if (service_config_json != nullptr) {
- grpc_ares_request_ref(r);
+ grpc_ares_request_ref_locked(r);
char* config_name;
gpr_asprintf(&config_name, "_grpc_config.%s", host);
- ares_search(*channel, config_name, ns_c_in, ns_t_txt, on_txt_done_cb, r);
+ ares_search(*channel, config_name, ns_c_in, ns_t_txt, on_txt_done_locked,
+ r);
gpr_free(config_name);
}
- /* TODO(zyc): Handle CNAME records here. */
- grpc_ares_ev_driver_start(r->ev_driver);
- grpc_ares_request_unref(r);
+ grpc_ares_ev_driver_start_locked(r->ev_driver);
+ grpc_ares_request_unref_locked(r);
gpr_free(host);
gpr_free(port);
return r;
@@ -494,15 +492,15 @@ error_cleanup:
return nullptr;
}
-grpc_ares_request* (*grpc_dns_lookup_ares)(
+grpc_ares_request* (*grpc_dns_lookup_ares_locked)(
const char* dns_server, const char* name, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done,
- grpc_lb_addresses** addrs, bool check_grpclb,
- char** service_config_json) = grpc_dns_lookup_ares_impl;
+ grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json,
+ grpc_combiner* combiner) = grpc_dns_lookup_ares_locked_impl;
void grpc_cancel_ares_request(grpc_ares_request* r) {
- if (grpc_dns_lookup_ares == grpc_dns_lookup_ares_impl) {
- grpc_ares_ev_driver_shutdown(r->ev_driver);
+ if (grpc_dns_lookup_ares_locked == grpc_dns_lookup_ares_locked_impl) {
+ grpc_ares_ev_driver_shutdown_locked(r->ev_driver);
}
}
@@ -534,6 +532,8 @@ void grpc_ares_cleanup(void) {
*/
typedef struct grpc_resolve_address_ares_request {
+ /* combiner that queries and related callbacks run under */
+ grpc_combiner* combiner;
/** the pointer to receive the resolved addresses */
grpc_resolved_addresses** addrs_out;
/** currently resolving lb addresses */
@@ -541,8 +541,14 @@ typedef struct grpc_resolve_address_ares_request {
/** closure to call when the resolve_address_ares request completes */
grpc_closure* on_resolve_address_done;
/** a closure wrapping on_dns_lookup_done_cb, which should be invoked when the
- grpc_dns_lookup_ares operation is done. */
+ grpc_dns_lookup_ares_locked operation is done. */
grpc_closure on_dns_lookup_done;
+ /* target name */
+ const char* name;
+ /* default port to use if none is specified */
+ const char* default_port;
+ /* pollset_set to be driven by */
+ grpc_pollset_set* interested_parties;
} grpc_resolve_address_ares_request;
static void on_dns_lookup_done_cb(void* arg, grpc_error* error) {
@@ -566,9 +572,20 @@ static void on_dns_lookup_done_cb(void* arg, grpc_error* error) {
}
GRPC_CLOSURE_SCHED(r->on_resolve_address_done, GRPC_ERROR_REF(error));
if (r->lb_addrs != nullptr) grpc_lb_addresses_destroy(r->lb_addrs);
+ GRPC_COMBINER_UNREF(r->combiner, "on_dns_lookup_done_cb");
gpr_free(r);
}
+static void grpc_resolve_address_invoke_dns_lookup_ares_locked(
+ void* arg, grpc_error* unused_error) {
+ grpc_resolve_address_ares_request* r =
+ static_cast<grpc_resolve_address_ares_request*>(arg);
+ grpc_dns_lookup_ares_locked(
+ nullptr /* dns_server */, r->name, r->default_port, r->interested_parties,
+ &r->on_dns_lookup_done, &r->lb_addrs, false /* check_grpclb */,
+ nullptr /* service_config_json */, r->combiner);
+}
+
static void grpc_resolve_address_ares_impl(const char* name,
const char* default_port,
grpc_pollset_set* interested_parties,
@@ -577,14 +594,18 @@ static void grpc_resolve_address_ares_impl(const char* name,
grpc_resolve_address_ares_request* r =
static_cast<grpc_resolve_address_ares_request*>(
gpr_zalloc(sizeof(grpc_resolve_address_ares_request)));
+ r->combiner = grpc_combiner_create();
r->addrs_out = addrs;
r->on_resolve_address_done = on_done;
GRPC_CLOSURE_INIT(&r->on_dns_lookup_done, on_dns_lookup_done_cb, r,
grpc_schedule_on_exec_ctx);
- grpc_dns_lookup_ares(nullptr /* dns_server */, name, default_port,
- interested_parties, &r->on_dns_lookup_done, &r->lb_addrs,
- false /* check_grpclb */,
- nullptr /* service_config_json */);
+ r->name = name;
+ r->default_port = default_port;
+ r->interested_parties = interested_parties;
+ GRPC_CLOSURE_SCHED(
+ GRPC_CLOSURE_CREATE(grpc_resolve_address_invoke_dns_lookup_ares_locked, r,
+ grpc_combiner_scheduler(r->combiner)),
+ GRPC_ERROR_NONE);
}
void (*grpc_resolve_address_ares)(
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
index 2d84a038d6..9e93d0cf94 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
@@ -48,11 +48,11 @@ extern void (*grpc_resolve_address_ares)(const char* name,
function. \a on_done may be called directly in this function without being
scheduled with \a exec_ctx, so it must not try to acquire locks that are
being held by the caller. */
-extern grpc_ares_request* (*grpc_dns_lookup_ares)(
+extern grpc_ares_request* (*grpc_dns_lookup_ares_locked)(
const char* dns_server, const char* name, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done,
grpc_lb_addresses** addresses, bool check_grpclb,
- char** service_config_json);
+ char** service_config_json, grpc_combiner* combiner);
/* Cancel the pending grpc_ares_request \a request */
void grpc_cancel_ares_request(grpc_ares_request* request);
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc
index 5096e480bc..d6a76fc8b6 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc
@@ -26,18 +26,19 @@ struct grpc_ares_request {
char val;
};
-static grpc_ares_request* grpc_dns_lookup_ares_impl(
+static grpc_ares_request* grpc_dns_lookup_ares_locked_impl(
const char* dns_server, const char* name, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done,
- grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json) {
+ grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json,
+ grpc_combiner* combiner) {
return NULL;
}
-grpc_ares_request* (*grpc_dns_lookup_ares)(
+grpc_ares_request* (*grpc_dns_lookup_ares_locked)(
const char* dns_server, const char* name, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done,
- grpc_lb_addresses** addrs, bool check_grpclb,
- char** service_config_json) = grpc_dns_lookup_ares_impl;
+ grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json,
+ grpc_combiner* combiner) = grpc_dns_lookup_ares_locked_impl;
void grpc_cancel_ares_request(grpc_ares_request* r) {}
diff --git a/src/core/ext/filters/deadline/deadline_filter.cc b/src/core/ext/filters/deadline/deadline_filter.cc
index e0a41a3637..d23ad67ad5 100644
--- a/src/core/ext/filters/deadline/deadline_filter.cc
+++ b/src/core/ext/filters/deadline/deadline_filter.cc
@@ -128,21 +128,25 @@ static void cancel_timer_if_needed(grpc_deadline_state* deadline_state) {
}
}
-// Callback run when the call is complete.
-static void on_complete(void* arg, grpc_error* error) {
+// Callback run when we receive trailing metadata.
+static void recv_trailing_metadata_ready(void* arg, grpc_error* error) {
grpc_deadline_state* deadline_state = static_cast<grpc_deadline_state*>(arg);
cancel_timer_if_needed(deadline_state);
- // Invoke the next callback.
- GRPC_CLOSURE_RUN(deadline_state->next_on_complete, GRPC_ERROR_REF(error));
+ // Invoke the original callback.
+ GRPC_CLOSURE_RUN(deadline_state->original_recv_trailing_metadata_ready,
+ GRPC_ERROR_REF(error));
}
-// Inject our own on_complete callback into op.
-static void inject_on_complete_cb(grpc_deadline_state* deadline_state,
- grpc_transport_stream_op_batch* op) {
- deadline_state->next_on_complete = op->on_complete;
- GRPC_CLOSURE_INIT(&deadline_state->on_complete, on_complete, deadline_state,
+// Inject our own recv_trailing_metadata_ready callback into op.
+static void inject_recv_trailing_metadata_ready(
+ grpc_deadline_state* deadline_state, grpc_transport_stream_op_batch* op) {
+ deadline_state->original_recv_trailing_metadata_ready =
+ op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
+ GRPC_CLOSURE_INIT(&deadline_state->recv_trailing_metadata_ready,
+ recv_trailing_metadata_ready, deadline_state,
grpc_schedule_on_exec_ctx);
- op->on_complete = &deadline_state->on_complete;
+ op->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
+ &deadline_state->recv_trailing_metadata_ready;
}
// Callback and associated state for starting the timer after call stack
@@ -226,7 +230,7 @@ void grpc_deadline_state_client_start_transport_stream_op_batch(
// Make sure we know when the call is complete, so that we can cancel
// the timer.
if (op->recv_trailing_metadata) {
- inject_on_complete_cb(deadline_state, op);
+ inject_recv_trailing_metadata_ready(deadline_state, op);
}
}
}
@@ -322,7 +326,7 @@ static void server_start_transport_stream_op_batch(
// the client never sends trailing metadata, because this is the
// hook that tells us when the call is complete on the server side.
if (op->recv_trailing_metadata) {
- inject_on_complete_cb(&calld->base.deadline_state, op);
+ inject_recv_trailing_metadata_ready(&calld->base.deadline_state, op);
}
}
// Chain to next filter.
diff --git a/src/core/ext/filters/deadline/deadline_filter.h b/src/core/ext/filters/deadline/deadline_filter.h
index 13207cbd6f..1d797f445a 100644
--- a/src/core/ext/filters/deadline/deadline_filter.h
+++ b/src/core/ext/filters/deadline/deadline_filter.h
@@ -37,12 +37,12 @@ typedef struct grpc_deadline_state {
grpc_deadline_timer_state timer_state;
grpc_timer timer;
grpc_closure timer_callback;
- // Closure to invoke when the call is complete.
+ // Closure to invoke when we receive trailing metadata.
// We use this to cancel the timer.
- grpc_closure on_complete;
- // The original on_complete closure, which we chain to after our own
- // closure is invoked.
- grpc_closure* next_on_complete;
+ grpc_closure recv_trailing_metadata_ready;
+ // The original recv_trailing_metadata_ready closure, which we chain to
+ // after our own closure is invoked.
+ grpc_closure* original_recv_trailing_metadata_ready;
} grpc_deadline_state;
//
diff --git a/src/core/ext/filters/http/client/http_client_filter.cc b/src/core/ext/filters/http/client/http_client_filter.cc
index ae94ce47b9..1678051beb 100644
--- a/src/core/ext/filters/http/client/http_client_filter.cc
+++ b/src/core/ext/filters/http/client/http_client_filter.cc
@@ -55,8 +55,8 @@ struct call_data {
grpc_closure recv_initial_metadata_ready;
// State for handling recv_trailing_metadata ops.
grpc_metadata_batch* recv_trailing_metadata;
- grpc_closure* original_recv_trailing_metadata_on_complete;
- grpc_closure recv_trailing_metadata_on_complete;
+ grpc_closure* original_recv_trailing_metadata_ready;
+ grpc_closure recv_trailing_metadata_ready;
// State for handling send_message ops.
grpc_transport_stream_op_batch* send_message_batch;
size_t send_message_bytes_read;
@@ -153,8 +153,7 @@ static void recv_initial_metadata_ready(void* user_data, grpc_error* error) {
GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready, error);
}
-static void recv_trailing_metadata_on_complete(void* user_data,
- grpc_error* error) {
+static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
if (error == GRPC_ERROR_NONE) {
@@ -163,7 +162,7 @@ static void recv_trailing_metadata_on_complete(void* user_data,
} else {
GRPC_ERROR_REF(error);
}
- GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_on_complete, error);
+ GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, error);
}
static void send_message_on_complete(void* arg, grpc_error* error) {
@@ -312,8 +311,10 @@ static void hc_start_transport_stream_op_batch(
/* substitute our callback for the higher callback */
calld->recv_trailing_metadata =
batch->payload->recv_trailing_metadata.recv_trailing_metadata;
- calld->original_recv_trailing_metadata_on_complete = batch->on_complete;
- batch->on_complete = &calld->recv_trailing_metadata_on_complete;
+ calld->original_recv_trailing_metadata_ready =
+ batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
+ batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
+ &calld->recv_trailing_metadata_ready;
}
grpc_error* error = GRPC_ERROR_NONE;
@@ -420,8 +421,8 @@ static grpc_error* init_call_elem(grpc_call_element* elem,
GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
recv_initial_metadata_ready, elem,
grpc_schedule_on_exec_ctx);
- GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_on_complete,
- recv_trailing_metadata_on_complete, elem,
+ GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,
+ recv_trailing_metadata_ready, elem,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&calld->send_message_on_complete, send_message_on_complete,
elem, grpc_schedule_on_exec_ctx);
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
index a8090d18a6..0d6b72c66e 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
@@ -1149,12 +1149,10 @@ static void maybe_start_some_streams(grpc_chttp2_transport* t) {
}
}
-/* Flag that this closure barrier wants stats to be updated before finishing */
-#define CLOSURE_BARRIER_STATS_BIT (1 << 0)
/* Flag that this closure barrier may be covering a write in a pollset, and so
we should not complete this closure until we can prove that the write got
scheduled */
-#define CLOSURE_BARRIER_MAY_COVER_WRITE (1 << 1)
+#define CLOSURE_BARRIER_MAY_COVER_WRITE (1 << 0)
/* First bit of the reference count, stored in the high order bits (with the low
bits being used for flags defined above) */
#define CLOSURE_BARRIER_FIRST_REF_BIT (1 << 16)
@@ -1206,10 +1204,6 @@ void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
grpc_error_add_child(closure->error_data.error, error);
}
if (closure->next_data.scratch < CLOSURE_BARRIER_FIRST_REF_BIT) {
- if (closure->next_data.scratch & CLOSURE_BARRIER_STATS_BIT) {
- grpc_transport_move_stats(&s->stats, s->collecting_stats);
- s->collecting_stats = nullptr;
- }
if ((t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) ||
!(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) {
GRPC_CLOSURE_RUN(closure, closure->error_data.error);
@@ -1351,9 +1345,14 @@ static void perform_stream_op_locked(void* stream_op,
}
grpc_closure* on_complete = op->on_complete;
+ // TODO(roth): This is a hack needed because we use data inside of the
+ // closure itself to do the barrier calculation (i.e., to ensure that
+ // we don't schedule the closure until all ops in the batch have been
+ // completed). This can go away once we move to a new C++ closure API
+ // that provides the ability to create a barrier closure.
if (on_complete == nullptr) {
- on_complete =
- GRPC_CLOSURE_CREATE(do_nothing, nullptr, grpc_schedule_on_exec_ctx);
+ on_complete = GRPC_CLOSURE_INIT(&op->handler_private.closure, do_nothing,
+ nullptr, grpc_schedule_on_exec_ctx);
}
/* use final_data as a barrier until enqueue time; the inital counter is
@@ -1361,12 +1360,6 @@ static void perform_stream_op_locked(void* stream_op,
on_complete->next_data.scratch = CLOSURE_BARRIER_FIRST_REF_BIT;
on_complete->error_data.error = GRPC_ERROR_NONE;
- if (op->collect_stats) {
- GPR_ASSERT(s->collecting_stats == nullptr);
- s->collecting_stats = op_payload->collect_stats.collect_stats;
- on_complete->next_data.scratch |= CLOSURE_BARRIER_STATS_BIT;
- }
-
if (op->cancel_stream) {
GRPC_STATS_INC_HTTP2_OP_CANCEL();
grpc_chttp2_cancel_stream(t, s, op_payload->cancel_stream.cancel_error);
@@ -1600,8 +1593,11 @@ static void perform_stream_op_locked(void* stream_op,
if (op->recv_trailing_metadata) {
GRPC_STATS_INC_HTTP2_OP_RECV_TRAILING_METADATA();
+ GPR_ASSERT(s->collecting_stats == nullptr);
+ s->collecting_stats = op_payload->recv_trailing_metadata.collect_stats;
GPR_ASSERT(s->recv_trailing_metadata_finished == nullptr);
- s->recv_trailing_metadata_finished = add_closure_barrier(on_complete);
+ s->recv_trailing_metadata_finished =
+ op_payload->recv_trailing_metadata.recv_trailing_metadata_ready;
s->recv_trailing_metadata =
op_payload->recv_trailing_metadata.recv_trailing_metadata;
s->final_metadata_requested = true;
@@ -1960,11 +1956,12 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t,
}
if (s->read_closed && s->frame_storage.length == 0 && !pending_data &&
s->recv_trailing_metadata_finished != nullptr) {
+ grpc_transport_move_stats(&s->stats, s->collecting_stats);
+ s->collecting_stats = nullptr;
grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[1],
s->recv_trailing_metadata);
- grpc_chttp2_complete_closure_step(
- t, s, &s->recv_trailing_metadata_finished, GRPC_ERROR_NONE,
- "recv_trailing_metadata_finished");
+ null_then_run_closure(&s->recv_trailing_metadata_finished,
+ GRPC_ERROR_NONE);
}
}
}
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.cc b/src/core/ext/transport/cronet/transport/cronet_transport.cc
index 420c2d13e1..4a252d972d 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.cc
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.cc
@@ -925,6 +925,10 @@ static bool op_can_be_run(grpc_transport_stream_op_batch* curr_op,
result = false;
}
/* Check if every op that was asked for is done. */
+ /* TODO(muxi): We should not consider the recv ops here, since they
+ * have their own callbacks. We should invoke a batch's on_complete
+ * as soon as all of the batch's send ops are complete, even if
+ * there are still recv ops pending. */
else if (curr_op->send_initial_metadata &&
!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) {
CRONET_LOG(GPR_DEBUG, "Because");
@@ -1280,12 +1284,20 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
op_can_be_run(stream_op, s, &oas->state,
OP_RECV_TRAILING_METADATA)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_TRAILING_METADATA", oas);
- if (oas->s->state.rs.trailing_metadata_valid) {
+ grpc_error* error = GRPC_ERROR_NONE;
+ if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
+ error = GRPC_ERROR_REF(stream_state->cancel_error);
+ } else if (stream_state->state_op_done[OP_FAILED]) {
+ error = make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable.");
+ } else if (oas->s->state.rs.trailing_metadata_valid) {
grpc_chttp2_incoming_metadata_buffer_publish(
&oas->s->state.rs.trailing_metadata,
stream_op->payload->recv_trailing_metadata.recv_trailing_metadata);
stream_state->rs.trailing_metadata_valid = false;
}
+ GRPC_CLOSURE_SCHED(
+ stream_op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
+ error);
stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true;
result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_op->cancel_stream &&
@@ -1398,6 +1410,11 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
GRPC_CLOSURE_SCHED(op->payload->recv_message.recv_message_ready,
GRPC_ERROR_CANCELLED);
}
+ if (op->recv_trailing_metadata) {
+ GRPC_CLOSURE_SCHED(
+ op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
+ GRPC_ERROR_CANCELLED);
+ }
GRPC_CLOSURE_SCHED(op->on_complete, GRPC_ERROR_CANCELLED);
return;
}
diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc
index 2c3bff5c1e..b0ca7f8207 100644
--- a/src/core/ext/transport/inproc/inproc_transport.cc
+++ b/src/core/ext/transport/inproc/inproc_transport.cc
@@ -120,7 +120,6 @@ typedef struct inproc_stream {
struct inproc_stream* stream_list_next;
} inproc_stream;
-static grpc_closure do_nothing_closure;
static bool cancel_stream_locked(inproc_stream* s, grpc_error* error);
static void op_state_machine(void* arg, grpc_error* error);
@@ -373,6 +372,10 @@ static void complete_if_batch_end_locked(inproc_stream* s, grpc_error* error,
const char* msg) {
int is_sm = static_cast<int>(op == s->send_message_op);
int is_stm = static_cast<int>(op == s->send_trailing_md_op);
+ // TODO(vjpai): We should not consider the recv ops here, since they
+ // have their own callbacks. We should invoke a batch's on_complete
+ // as soon as all of the batch's send ops are complete, even if there
+ // are still recv ops pending.
int is_rim = static_cast<int>(op == s->recv_initial_md_op);
int is_rm = static_cast<int>(op == s->recv_message_op);
int is_rtm = static_cast<int>(op == s->recv_trailing_md_op);
@@ -496,6 +499,11 @@ static void fail_helper_locked(inproc_stream* s, grpc_error* error) {
s->send_trailing_md_op = nullptr;
}
if (s->recv_trailing_md_op) {
+ INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-metadata-ready %p",
+ s, error);
+ GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata
+ .recv_trailing_metadata_ready,
+ GRPC_ERROR_REF(error));
INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-md-on-complete %p",
s, error);
complete_if_batch_end_locked(
@@ -639,6 +647,12 @@ static void op_state_machine(void* arg, grpc_error* error) {
s->trailing_md_sent = true;
if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
INPROC_LOG(GPR_INFO,
+ "op_state_machine %p scheduling trailing-metadata-ready", s);
+ GRPC_CLOSURE_SCHED(
+ s->recv_trailing_md_op->payload->recv_trailing_metadata
+ .recv_trailing_metadata_ready,
+ GRPC_ERROR_NONE);
+ INPROC_LOG(GPR_INFO,
"op_state_machine %p scheduling trailing-md-on-complete", s);
GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->on_complete,
GRPC_ERROR_NONE);
@@ -711,6 +725,12 @@ static void op_state_machine(void* arg, grpc_error* error) {
}
if (s->recv_trailing_md_op && s->t->is_client && other &&
other->send_message_op) {
+ INPROC_LOG(GPR_INFO,
+ "op_state_machine %p scheduling trailing-metadata-ready %p", s,
+ GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata
+ .recv_trailing_metadata_ready,
+ GRPC_ERROR_NONE);
maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE);
}
if (s->to_read_trailing_md_filled) {
@@ -766,6 +786,10 @@ static void op_state_machine(void* arg, grpc_error* error) {
INPROC_LOG(GPR_INFO,
"op_state_machine %p scheduling trailing-md-on-complete %p",
s, new_err);
+ GRPC_CLOSURE_SCHED(
+ s->recv_trailing_md_op->payload->recv_trailing_metadata
+ .recv_trailing_metadata_ready,
+ GRPC_ERROR_REF(new_err));
GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->on_complete,
GRPC_ERROR_REF(new_err));
s->recv_trailing_md_op = nullptr;
@@ -859,6 +883,9 @@ static bool cancel_stream_locked(inproc_stream* s, grpc_error* error) {
// couldn't complete that because we hadn't yet sent out trailing
// md, now's the chance
if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
+ GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata
+ .recv_trailing_metadata_ready,
+ GRPC_ERROR_REF(s->cancel_self_error));
complete_if_batch_end_locked(
s, s->cancel_self_error, s->recv_trailing_md_op,
"cancel_stream scheduling trailing-md-on-complete");
@@ -873,6 +900,8 @@ static bool cancel_stream_locked(inproc_stream* s, grpc_error* error) {
return ret;
}
+static void do_nothing(void* arg, grpc_error* error) {}
+
static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
grpc_transport_stream_op_batch* op) {
INPROC_LOG(GPR_INFO, "perform_stream_op %p %p %p", gt, gs, op);
@@ -892,8 +921,14 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
}
grpc_error* error = GRPC_ERROR_NONE;
grpc_closure* on_complete = op->on_complete;
+ // TODO(roth): This is a hack needed because we use data inside of the
+ // closure itself to do the barrier calculation (i.e., to ensure that
+ // we don't schedule the closure until all ops in the batch have been
+ // completed). This can go away once we move to a new C++ closure API
+ // that provides the ability to create a barrier closure.
if (on_complete == nullptr) {
- on_complete = &do_nothing_closure;
+ on_complete = GRPC_CLOSURE_INIT(&op->handler_private.closure, do_nothing,
+ nullptr, grpc_schedule_on_exec_ctx);
}
if (op->cancel_stream) {
@@ -1026,6 +1061,15 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
GRPC_CLOSURE_SCHED(op->payload->recv_message.recv_message_ready,
GRPC_ERROR_REF(error));
}
+ if (op->recv_trailing_metadata) {
+ INPROC_LOG(
+ GPR_INFO,
+ "perform_stream_op error %p scheduling trailing-metadata-ready %p",
+ s, error);
+ GRPC_CLOSURE_SCHED(
+ op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
+ GRPC_ERROR_REF(error));
+ }
}
INPROC_LOG(GPR_INFO, "perform_stream_op %p scheduling on_complete %p", s,
error);
@@ -1129,12 +1173,8 @@ static grpc_endpoint* get_endpoint(grpc_transport* t) { return nullptr; }
/*******************************************************************************
* GLOBAL INIT AND DESTROY
*/
-static void do_nothing(void* arg, grpc_error* error) {}
-
void grpc_inproc_transport_init(void) {
grpc_core::ExecCtx exec_ctx;
- GRPC_CLOSURE_INIT(&do_nothing_closure, do_nothing, nullptr,
- grpc_schedule_on_exec_ctx);
g_empty_slice = grpc_slice_from_static_buffer(nullptr, 0);
grpc_slice key_tmp = grpc_slice_from_static_string(":path");
diff --git a/src/core/lib/channel/connected_channel.cc b/src/core/lib/channel/connected_channel.cc
index ddd3029402..e2ea334ded 100644
--- a/src/core/lib/channel/connected_channel.cc
+++ b/src/core/lib/channel/connected_channel.cc
@@ -51,6 +51,7 @@ typedef struct connected_channel_call_data {
callback_state on_complete[6]; // Max number of pending batches.
callback_state recv_initial_metadata_ready;
callback_state recv_message_ready;
+ callback_state recv_trailing_metadata_ready;
} call_data;
static void run_in_call_combiner(void* arg, grpc_error* error) {
@@ -111,6 +112,12 @@ static void con_start_transport_stream_op_batch(
intercept_callback(calld, state, false, "recv_message_ready",
&batch->payload->recv_message.recv_message_ready);
}
+ if (batch->recv_trailing_metadata) {
+ callback_state* state = &calld->recv_trailing_metadata_ready;
+ intercept_callback(
+ calld, state, false, "recv_trailing_metadata_ready",
+ &batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready);
+ }
if (batch->cancel_stream) {
// There can be more than one cancellation batch in flight at any
// given time, so we can't just pick out a fixed index into
@@ -121,7 +128,7 @@ static void con_start_transport_stream_op_batch(
static_cast<callback_state*>(gpr_malloc(sizeof(*state)));
intercept_callback(calld, state, true, "on_complete (cancel_stream)",
&batch->on_complete);
- } else {
+ } else if (batch->on_complete != nullptr) {
callback_state* state = get_state_for_batch(calld, batch);
intercept_callback(calld, state, false, "on_complete", &batch->on_complete);
}
diff --git a/src/core/lib/gprpp/inlined_vector.h b/src/core/lib/gprpp/inlined_vector.h
index f36f6cb706..0d2586e507 100644
--- a/src/core/lib/gprpp/inlined_vector.h
+++ b/src/core/lib/gprpp/inlined_vector.h
@@ -99,6 +99,8 @@ class InlinedVector {
void push_back(T&& value) { emplace_back(std::move(value)); }
size_t size() const { return size_; }
+ bool empty() const { return size_ == 0; }
+
size_t capacity() const { return capacity_; }
void clear() {
diff --git a/src/core/lib/iomgr/call_combiner.h b/src/core/lib/iomgr/call_combiner.h
index 0ccd08ea57..641fa18082 100644
--- a/src/core/lib/iomgr/call_combiner.h
+++ b/src/core/lib/iomgr/call_combiner.h
@@ -26,6 +26,7 @@
#include <grpc/support/atm.h>
#include "src/core/lib/gpr/mpscq.h"
+#include "src/core/lib/gprpp/inlined_vector.h"
#include "src/core/lib/iomgr/closure.h"
// A simple, lock-free mechanism for serializing activity related to a
@@ -109,4 +110,83 @@ void grpc_call_combiner_set_notify_on_cancel(grpc_call_combiner* call_combiner,
void grpc_call_combiner_cancel(grpc_call_combiner* call_combiner,
grpc_error* error);
+namespace grpc_core {
+
+// Helper for running a list of closures in a call combiner.
+//
+// Each callback running in the call combiner will eventually be
+// returned to the surface, at which point the surface will yield the
+// call combiner. So when we are running in the call combiner and have
+// more than one callback to return to the surface, we need to re-enter
+// the call combiner for all but one of those callbacks.
+class CallCombinerClosureList {
+ public:
+ CallCombinerClosureList() {}
+
+ // Adds a closure to the list. The closure must eventually result in
+ // the call combiner being yielded.
+ void Add(grpc_closure* closure, grpc_error* error, const char* reason) {
+ closures_.emplace_back(closure, error, reason);
+ }
+
+ // Runs all closures in the call combiner and yields the call combiner.
+ //
+ // All but one of the closures in the list will be scheduled via
+ // GRPC_CALL_COMBINER_START(), and the remaining closure will be
+ // scheduled via GRPC_CLOSURE_SCHED(), which will eventually result in
+ // yielding the call combiner. If the list is empty, then the call
+ // combiner will be yielded immediately.
+ void RunClosures(grpc_call_combiner* call_combiner) {
+ if (closures_.empty()) {
+ GRPC_CALL_COMBINER_STOP(call_combiner, "no closures to schedule");
+ return;
+ }
+ for (size_t i = 1; i < closures_.size(); ++i) {
+ auto& closure = closures_[i];
+ GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error,
+ closure.reason);
+ }
+ if (grpc_call_combiner_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "CallCombinerClosureList executing closure while already "
+ "holding call_combiner %p: closure=%p error=%s reason=%s",
+ call_combiner, closures_[0].closure,
+ grpc_error_string(closures_[0].error), closures_[0].reason);
+ }
+ // This will release the call combiner.
+ GRPC_CLOSURE_SCHED(closures_[0].closure, closures_[0].error);
+ closures_.clear();
+ }
+
+ // Runs all closures in the call combiner, but does NOT yield the call
+ // combiner. All closures will be scheduled via GRPC_CALL_COMBINER_START().
+ void RunClosuresWithoutYielding(grpc_call_combiner* call_combiner) {
+ for (size_t i = 0; i < closures_.size(); ++i) {
+ auto& closure = closures_[i];
+ GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error,
+ closure.reason);
+ }
+ closures_.clear();
+ }
+
+ size_t size() const { return closures_.size(); }
+
+ private:
+ struct CallCombinerClosure {
+ grpc_closure* closure;
+ grpc_error* error;
+ const char* reason;
+
+ CallCombinerClosure(grpc_closure* closure, grpc_error* error,
+ const char* reason)
+ : closure(closure), error(error), reason(reason) {}
+ };
+
+ // There are generally a maximum of 6 closures to run in the call
+ // combiner, one for each pending op.
+ InlinedVector<CallCombinerClosure, 6> closures_;
+};
+
+} // namespace grpc_core
+
#endif /* GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H */
diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h
index 34a494485d..f14c723844 100644
--- a/src/core/lib/iomgr/closure.h
+++ b/src/core/lib/iomgr/closure.h
@@ -283,9 +283,10 @@ inline void grpc_closure_sched(grpc_closure* c, grpc_error* error) {
if (c->scheduled) {
gpr_log(GPR_ERROR,
"Closure already scheduled. (closure: %p, created: [%s:%d], "
- "previously scheduled at: [%s: %d] run?: %s",
+ "previously scheduled at: [%s: %d], newly scheduled at [%s: %d], "
+ "run?: %s",
c, c->file_created, c->line_created, c->file_initiated,
- c->line_initiated, c->run ? "true" : "false");
+ c->line_initiated, file, line, c->run ? "true" : "false");
abort();
}
c->scheduled = true;
diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc
index 1cf8ea94e7..8b224b6e7b 100644
--- a/src/core/lib/surface/call.cc
+++ b/src/core/lib/surface/call.cc
@@ -233,6 +233,7 @@ struct grpc_call {
grpc_closure receiving_slice_ready;
grpc_closure receiving_stream_ready;
grpc_closure receiving_initial_metadata_ready;
+ grpc_closure receiving_trailing_metadata_ready;
uint32_t test_only_last_message_flags;
grpc_closure release_call;
@@ -270,8 +271,17 @@ struct grpc_call {
grpc_core::TraceFlag grpc_call_error_trace(false, "call_error");
grpc_core::TraceFlag grpc_compression_trace(false, "compression");
-#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack*)((call) + 1))
-#define CALL_FROM_CALL_STACK(call_stack) (((grpc_call*)(call_stack)) - 1)
+/* Given a size, round up to the next multiple of sizeof(void*) */
+#define ROUND_UP_TO_ALIGNMENT_SIZE(x) \
+ (((x) + GPR_MAX_ALIGNMENT - 1u) & ~(GPR_MAX_ALIGNMENT - 1u))
+
+#define CALL_STACK_FROM_CALL(call) \
+ (grpc_call_stack*)((char*)(call) + \
+ ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)))
+#define CALL_FROM_CALL_STACK(call_stack) \
+ (grpc_call*)(((char*)(call_stack)) - \
+ ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)))
+
#define CALL_ELEM_FROM_CALL(call, idx) \
grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx)
#define CALL_FROM_TOP_ELEM(top_elem) \
@@ -342,8 +352,9 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
size_t initial_size = grpc_channel_get_call_size_estimate(args->channel);
GRPC_STATS_INC_CALL_INITIAL_SIZE(initial_size);
gpr_arena* arena = gpr_arena_create(initial_size);
- call = static_cast<grpc_call*>(gpr_arena_alloc(
- arena, sizeof(grpc_call) + channel_stack->call_stack_size));
+ call = static_cast<grpc_call*>(
+ gpr_arena_alloc(arena, ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)) +
+ channel_stack->call_stack_size));
gpr_ref_init(&call->ext_ref, 1);
call->arena = arena;
grpc_call_combiner_init(&call->call_combiner);
@@ -1209,7 +1220,6 @@ static void post_batch_completion(batch_control* bctl) {
if (bctl->op.send_initial_metadata) {
grpc_metadata_batch_destroy(
-
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]);
}
if (bctl->op.send_message) {
@@ -1217,14 +1227,9 @@ static void post_batch_completion(batch_control* bctl) {
}
if (bctl->op.send_trailing_metadata) {
grpc_metadata_batch_destroy(
-
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]);
}
if (bctl->op.recv_trailing_metadata) {
- grpc_metadata_batch* md =
- &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
- recv_trailing_filter(call, md);
-
/* propagate cancellation to any interested children */
gpr_atm_rel_store(&call->received_final_op_atm, 1);
parent_call* pc = get_parent_call(call);
@@ -1246,7 +1251,6 @@ static void post_batch_completion(batch_control* bctl) {
}
gpr_mu_unlock(&pc->child_list_mu);
}
-
if (call->is_client) {
get_final_status(call, set_status_value_directly,
call->final_op.client.status,
@@ -1256,7 +1260,6 @@ static void post_batch_completion(batch_control* bctl) {
get_final_status(call, set_cancelled_value,
call->final_op.server.cancelled, nullptr, nullptr);
}
-
GRPC_ERROR_UNREF(error);
error = GRPC_ERROR_NONE;
}
@@ -1538,6 +1541,19 @@ static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) {
finish_batch_step(bctl);
}
+static void receiving_trailing_metadata_ready(void* bctlp, grpc_error* error) {
+ batch_control* bctl = static_cast<batch_control*>(bctlp);
+ grpc_call* call = bctl->call;
+ GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_trailing_metadata_ready");
+ add_batch_error(bctl, GRPC_ERROR_REF(error), false);
+ if (error == GRPC_ERROR_NONE) {
+ grpc_metadata_batch* md =
+ &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
+ recv_trailing_filter(call, md);
+ }
+ finish_batch_step(bctl);
+}
+
static void finish_batch(void* bctlp, grpc_error* error) {
batch_control* bctl = static_cast<batch_control*>(bctlp);
grpc_call* call = bctl->call;
@@ -1558,7 +1574,8 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
size_t i;
const grpc_op* op;
batch_control* bctl;
- int num_completion_callbacks_needed = 1;
+ bool has_send_ops = false;
+ int num_recv_ops = 0;
grpc_call_error error = GRPC_CALL_OK;
grpc_transport_stream_op_batch* stream_op;
grpc_transport_stream_op_batch_payload* stream_op_payload;
@@ -1664,6 +1681,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
stream_op_payload->send_initial_metadata.peer_string =
&call->peer_string;
}
+ has_send_ops = true;
break;
}
case GRPC_OP_SEND_MESSAGE: {
@@ -1693,6 +1711,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
&op->data.send_message.send_message->data.raw.slice_buffer, flags);
stream_op_payload->send_message.send_message.reset(
call->sending_stream.get());
+ has_send_ops = true;
break;
}
case GRPC_OP_SEND_CLOSE_FROM_CLIENT: {
@@ -1713,6 +1732,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
call->sent_final_op = true;
stream_op_payload->send_trailing_metadata.send_trailing_metadata =
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
+ has_send_ops = true;
break;
}
case GRPC_OP_SEND_STATUS_FROM_SERVER: {
@@ -1777,6 +1797,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
}
stream_op_payload->send_trailing_metadata.send_trailing_metadata =
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
+ has_send_ops = true;
break;
}
case GRPC_OP_RECV_INITIAL_METADATA: {
@@ -1804,7 +1825,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
stream_op_payload->recv_initial_metadata.peer_string =
&call->peer_string;
}
- num_completion_callbacks_needed++;
+ ++num_recv_ops;
break;
}
case GRPC_OP_RECV_MESSAGE: {
@@ -1826,7 +1847,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
grpc_schedule_on_exec_ctx);
stream_op_payload->recv_message.recv_message_ready =
&call->receiving_stream_ready;
- num_completion_callbacks_needed++;
+ ++num_recv_ops;
break;
}
case GRPC_OP_RECV_STATUS_ON_CLIENT: {
@@ -1852,11 +1873,16 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
call->final_op.client.error_string =
op->data.recv_status_on_client.error_string;
stream_op->recv_trailing_metadata = true;
- stream_op->collect_stats = true;
stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
- stream_op_payload->collect_stats.collect_stats =
+ stream_op_payload->recv_trailing_metadata.collect_stats =
&call->final_info.stats.transport_stream_stats;
+ GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
+ receiving_trailing_metadata_ready, bctl,
+ grpc_schedule_on_exec_ctx);
+ stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
+ &call->receiving_trailing_metadata_ready;
+ ++num_recv_ops;
break;
}
case GRPC_OP_RECV_CLOSE_ON_SERVER: {
@@ -1877,11 +1903,16 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
call->final_op.server.cancelled =
op->data.recv_close_on_server.cancelled;
stream_op->recv_trailing_metadata = true;
- stream_op->collect_stats = true;
stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
- stream_op_payload->collect_stats.collect_stats =
+ stream_op_payload->recv_trailing_metadata.collect_stats =
&call->final_info.stats.transport_stream_stats;
+ GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
+ receiving_trailing_metadata_ready, bctl,
+ grpc_schedule_on_exec_ctx);
+ stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
+ &call->receiving_trailing_metadata_ready;
+ ++num_recv_ops;
break;
}
}
@@ -1891,13 +1922,15 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
if (!is_notify_tag_closure) {
GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
}
- gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed);
+ gpr_ref_init(&bctl->steps_to_complete, (has_send_ops ? 1 : 0) + num_recv_ops);
- GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl,
- grpc_schedule_on_exec_ctx);
- stream_op->on_complete = &bctl->finish_batch;
- gpr_atm_rel_store(&call->any_ops_sent_atm, 1);
+ if (has_send_ops) {
+ GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl,
+ grpc_schedule_on_exec_ctx);
+ stream_op->on_complete = &bctl->finish_batch;
+ }
+ gpr_atm_rel_store(&call->any_ops_sent_atm, 1);
execute_batch(call, stream_op, &bctl->start_batch);
done:
diff --git a/src/core/lib/transport/transport.cc b/src/core/lib/transport/transport.cc
index 039d603394..cbdb77c844 100644
--- a/src/core/lib/transport/transport.cc
+++ b/src/core/lib/transport/transport.cc
@@ -212,21 +212,32 @@ void grpc_transport_stream_op_batch_finish_with_failure(
if (batch->send_message) {
batch->payload->send_message.send_message.reset();
}
- if (batch->recv_message) {
- GRPC_CALL_COMBINER_START(
- call_combiner, batch->payload->recv_message.recv_message_ready,
- GRPC_ERROR_REF(error), "failing recv_message_ready");
+ if (batch->cancel_stream) {
+ GRPC_ERROR_UNREF(batch->payload->cancel_stream.cancel_error);
}
+ // Construct a list of closures to execute.
+ grpc_core::CallCombinerClosureList closures;
if (batch->recv_initial_metadata) {
- GRPC_CALL_COMBINER_START(
- call_combiner,
+ closures.Add(
batch->payload->recv_initial_metadata.recv_initial_metadata_ready,
GRPC_ERROR_REF(error), "failing recv_initial_metadata_ready");
}
- GRPC_CLOSURE_SCHED(batch->on_complete, error);
- if (batch->cancel_stream) {
- GRPC_ERROR_UNREF(batch->payload->cancel_stream.cancel_error);
+ if (batch->recv_message) {
+ closures.Add(batch->payload->recv_message.recv_message_ready,
+ GRPC_ERROR_REF(error), "failing recv_message_ready");
+ }
+ if (batch->recv_trailing_metadata) {
+ closures.Add(
+ batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
+ GRPC_ERROR_REF(error), "failing recv_trailing_metadata_ready");
+ }
+ if (batch->on_complete != nullptr) {
+ closures.Add(batch->on_complete, GRPC_ERROR_REF(error),
+ "failing on_complete");
}
+ // Execute closures.
+ closures.RunClosures(call_combiner);
+ GRPC_ERROR_UNREF(error);
}
typedef struct {
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index b2e252d939..585b9dfae9 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -122,9 +122,15 @@ typedef struct grpc_transport_stream_op_batch_payload
/* Transport stream op: a set of operations to perform on a transport
against a single stream */
typedef struct grpc_transport_stream_op_batch {
- /** Should be enqueued when all requested operations (excluding recv_message
- and recv_initial_metadata which have their own closures) in a given batch
- have been completed. */
+ /** Should be scheduled when all of the non-recv operations in the batch
+ are complete.
+
+ The recv ops (recv_initial_metadata, recv_message, and
+ recv_trailing_metadata) each have their own callbacks. If a batch
+ contains both recv ops and non-recv ops, on_complete should be
+ scheduled as soon as the non-recv ops are complete, regardless of
+ whether or not the recv ops are complete. If a batch contains
+ only recv ops, on_complete can be null. */
grpc_closure* on_complete;
/** Values for the stream op (fields set are determined by flags above) */
@@ -149,9 +155,6 @@ typedef struct grpc_transport_stream_op_batch {
*/
bool recv_trailing_metadata : 1;
- /** Collect any stats into provided buffer, zero internal stat counters */
- bool collect_stats : 1;
-
/** Cancel this stream with the provided error */
bool cancel_stream : 1;
@@ -219,11 +222,10 @@ struct grpc_transport_stream_op_batch_payload {
struct {
grpc_metadata_batch* recv_trailing_metadata;
- } recv_trailing_metadata;
-
- struct {
grpc_transport_stream_stats* collect_stats;
- } collect_stats;
+ /** Should be enqueued when initial metadata is ready to be processed. */
+ grpc_closure* recv_trailing_metadata_ready;
+ } recv_trailing_metadata;
/** Forcefully close this stream.
The HTTP2 semantics should be:
diff --git a/src/core/lib/transport/transport_op_string.cc b/src/core/lib/transport/transport_op_string.cc
index 25ab492f3a..8c7db642a5 100644
--- a/src/core/lib/transport/transport_op_string.cc
+++ b/src/core/lib/transport/transport_op_string.cc
@@ -120,13 +120,6 @@ char* grpc_transport_stream_op_batch_string(
gpr_strvec_add(&b, tmp);
}
- if (op->collect_stats) {
- gpr_strvec_add(&b, gpr_strdup(" "));
- gpr_asprintf(&tmp, "COLLECT_STATS:%p",
- op->payload->collect_stats.collect_stats);
- gpr_strvec_add(&b, tmp);
- }
-
out = gpr_strvec_flatten(&b, nullptr);
gpr_strvec_destroy(&b);
diff --git a/src/core/tsi/ssl_transport_security.cc b/src/core/tsi/ssl_transport_security.cc
index 8065a8b185..e66fc9ba03 100644
--- a/src/core/tsi/ssl_transport_security.cc
+++ b/src/core/tsi/ssl_transport_security.cc
@@ -260,14 +260,13 @@ static tsi_result ssl_get_x509_common_name(X509* cert, unsigned char** utf8,
X509_NAME* subject_name = X509_get_subject_name(cert);
int utf8_returned_size = 0;
if (subject_name == nullptr) {
- gpr_log(GPR_ERROR, "Could not get subject name from certificate.");
+ gpr_log(GPR_INFO, "Could not get subject name from certificate.");
return TSI_NOT_FOUND;
}
common_name_index =
X509_NAME_get_index_by_NID(subject_name, NID_commonName, -1);
if (common_name_index == -1) {
- gpr_log(GPR_ERROR,
- "Could not get common name of subject from certificate.");
+ gpr_log(GPR_INFO, "Could not get common name of subject from certificate.");
return TSI_NOT_FOUND;
}
common_name_entry = X509_NAME_get_entry(subject_name, common_name_index);
diff --git a/src/php/README.md b/src/php/README.md
index 36e242fe01..5e9fa38763 100644
--- a/src/php/README.md
+++ b/src/php/README.md
@@ -9,7 +9,8 @@ gRPC PHP installation instructions for Google Cloud Platform is in
## Environment
-###Prerequisite:
+### Prerequisite:
+
* `php` 5.5 or above, 7.0 or above
* `pecl`
* `composer`