aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar ncteisen <ncteisen@gmail.com>2018-06-19 16:03:12 -0700
committerGravatar ncteisen <ncteisen@gmail.com>2018-06-19 16:03:12 -0700
commit6a025c0f2597453dc3418bf86176e902fe0ac003 (patch)
tree06abef34ba9576217055b6c2ae4f1e776d9b6412 /src
parent68d4f50f77641f80794134f0d4149034df1706f0 (diff)
parentd8772cf88f3a910160349eb21950dcd9619d1b6a (diff)
Merge branch 'master' of https://github.com/grpc/grpc into channelz
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc129
1 files changed, 72 insertions, 57 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index 34ea97e23e..520431e63b 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -794,6 +794,15 @@ typedef struct {
// The batch to use in the subchannel call.
// Its payload field points to subchannel_call_retry_state.batch_payload.
grpc_transport_stream_op_batch batch;
+ // For intercepting on_complete.
+ grpc_closure on_complete;
+} subchannel_batch_data;
+
+// Retry state associated with a subchannel call.
+// Stored in the parent_data of the subchannel call object.
+typedef struct {
+ // subchannel_batch_data.batch.payload points to this.
+ grpc_transport_stream_op_batch_payload batch_payload;
// For send_initial_metadata.
// Note that we need to make a copy of the initial metadata for each
// subchannel call instead of just referring to the copy in call_data,
@@ -818,15 +827,6 @@ typedef struct {
grpc_metadata_batch recv_trailing_metadata;
grpc_transport_stream_stats collect_stats;
grpc_closure recv_trailing_metadata_ready;
- // For intercepting on_complete.
- grpc_closure on_complete;
-} subchannel_batch_data;
-
-// Retry state associated with a subchannel call.
-// Stored in the parent_data of the subchannel call object.
-typedef struct {
- // subchannel_batch_data.batch.payload points to this.
- grpc_transport_stream_op_batch_payload batch_payload;
// These fields indicate which ops have been started and completed on
// this subchannel call.
size_t started_send_message_count;
@@ -1524,17 +1524,21 @@ static subchannel_batch_data* batch_data_create(grpc_call_element* elem,
static void batch_data_unref(subchannel_batch_data* batch_data) {
if (gpr_unref(&batch_data->refs)) {
- if (batch_data->send_initial_metadata_storage != nullptr) {
- grpc_metadata_batch_destroy(&batch_data->send_initial_metadata);
+ subchannel_call_retry_state* retry_state =
+ static_cast<subchannel_call_retry_state*>(
+ grpc_connected_subchannel_call_get_parent_data(
+ batch_data->subchannel_call));
+ if (batch_data->batch.send_initial_metadata) {
+ grpc_metadata_batch_destroy(&retry_state->send_initial_metadata);
}
- if (batch_data->send_trailing_metadata_storage != nullptr) {
- grpc_metadata_batch_destroy(&batch_data->send_trailing_metadata);
+ if (batch_data->batch.send_trailing_metadata) {
+ grpc_metadata_batch_destroy(&retry_state->send_trailing_metadata);
}
if (batch_data->batch.recv_initial_metadata) {
- grpc_metadata_batch_destroy(&batch_data->recv_initial_metadata);
+ grpc_metadata_batch_destroy(&retry_state->recv_initial_metadata);
}
if (batch_data->batch.recv_trailing_metadata) {
- grpc_metadata_batch_destroy(&batch_data->recv_trailing_metadata);
+ grpc_metadata_batch_destroy(&retry_state->recv_trailing_metadata);
}
GRPC_SUBCHANNEL_CALL_UNREF(batch_data->subchannel_call, "batch_data_unref");
call_data* calld = static_cast<call_data*>(batch_data->elem->call_data);
@@ -1560,8 +1564,12 @@ static void invoke_recv_initial_metadata_callback(void* arg,
});
GPR_ASSERT(pending != nullptr);
// Return metadata.
+ subchannel_call_retry_state* retry_state =
+ static_cast<subchannel_call_retry_state*>(
+ grpc_connected_subchannel_call_get_parent_data(
+ batch_data->subchannel_call));
grpc_metadata_batch_move(
- &batch_data->recv_initial_metadata,
+ &retry_state->recv_initial_metadata,
pending->batch->payload->recv_initial_metadata.recv_initial_metadata);
// Update bookkeeping.
// Note: Need to do this before invoking the callback, since invoking
@@ -1606,7 +1614,7 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
// the recv_trailing_metadata_ready callback, then defer propagating this
// callback back to the surface. We can evaluate whether to retry when
// recv_trailing_metadata comes back.
- if (GPR_UNLIKELY((batch_data->trailing_metadata_available ||
+ if (GPR_UNLIKELY((retry_state->trailing_metadata_available ||
error != GRPC_ERROR_NONE) &&
!retry_state->completed_recv_trailing_metadata)) {
if (grpc_client_channel_trace.enabled()) {
@@ -1651,8 +1659,12 @@ static void invoke_recv_message_callback(void* arg, grpc_error* error) {
});
GPR_ASSERT(pending != nullptr);
// Return payload.
+ subchannel_call_retry_state* retry_state =
+ static_cast<subchannel_call_retry_state*>(
+ grpc_connected_subchannel_call_get_parent_data(
+ batch_data->subchannel_call));
*pending->batch->payload->recv_message.recv_message =
- std::move(batch_data->recv_message);
+ std::move(retry_state->recv_message);
// Update bookkeeping.
// Note: Need to do this before invoking the callback, since invoking
// the callback will result in yielding the call combiner.
@@ -1693,7 +1705,7 @@ static void recv_message_ready(void* arg, grpc_error* error) {
// callback back to the surface. We can evaluate whether to retry when
// recv_trailing_metadata comes back.
if (GPR_UNLIKELY(
- (batch_data->recv_message == nullptr || error != GRPC_ERROR_NONE) &&
+ (retry_state->recv_message == nullptr || error != GRPC_ERROR_NONE) &&
!retry_state->completed_recv_trailing_metadata)) {
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_INFO,
@@ -1766,8 +1778,12 @@ static void add_closure_for_recv_trailing_metadata_ready(
return;
}
// Return metadata.
+ subchannel_call_retry_state* retry_state =
+ static_cast<subchannel_call_retry_state*>(
+ grpc_connected_subchannel_call_get_parent_data(
+ batch_data->subchannel_call));
grpc_metadata_batch_move(
- &batch_data->recv_trailing_metadata,
+ &retry_state->recv_trailing_metadata,
pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata);
// Add closure.
closures->Add(pending->batch->payload->recv_trailing_metadata
@@ -1788,11 +1804,11 @@ static void add_closures_for_deferred_recv_callbacks(
// Add closure for deferred recv_initial_metadata_ready.
if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch !=
nullptr)) {
- GRPC_CLOSURE_INIT(&batch_data->recv_initial_metadata_ready,
+ GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
invoke_recv_initial_metadata_callback,
retry_state->recv_initial_metadata_ready_deferred_batch,
grpc_schedule_on_exec_ctx);
- closures->Add(&batch_data->recv_initial_metadata_ready,
+ closures->Add(&retry_state->recv_initial_metadata_ready,
retry_state->recv_initial_metadata_error,
"resuming recv_initial_metadata_ready");
retry_state->recv_initial_metadata_ready_deferred_batch = nullptr;
@@ -1800,11 +1816,11 @@ static void add_closures_for_deferred_recv_callbacks(
// Add closure for deferred recv_message_ready.
if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch !=
nullptr)) {
- GRPC_CLOSURE_INIT(&batch_data->recv_message_ready,
+ GRPC_CLOSURE_INIT(&retry_state->recv_message_ready,
invoke_recv_message_callback,
retry_state->recv_message_ready_deferred_batch,
grpc_schedule_on_exec_ctx);
- closures->Add(&batch_data->recv_message_ready,
+ closures->Add(&retry_state->recv_message_ready,
retry_state->recv_message_error,
"resuming recv_message_ready");
retry_state->recv_message_ready_deferred_batch = nullptr;
@@ -2120,28 +2136,28 @@ static void add_retriable_send_initial_metadata_op(
//
// If we've already completed one or more attempts, add the
// grpc-retry-attempts header.
- batch_data->send_initial_metadata_storage =
+ retry_state->send_initial_metadata_storage =
static_cast<grpc_linked_mdelem*>(gpr_arena_alloc(
calld->arena, sizeof(grpc_linked_mdelem) *
(calld->send_initial_metadata.list.count +
(calld->num_attempts_completed > 0))));
grpc_metadata_batch_copy(&calld->send_initial_metadata,
- &batch_data->send_initial_metadata,
- batch_data->send_initial_metadata_storage);
- if (GPR_UNLIKELY(batch_data->send_initial_metadata.idx.named
+ &retry_state->send_initial_metadata,
+ retry_state->send_initial_metadata_storage);
+ if (GPR_UNLIKELY(retry_state->send_initial_metadata.idx.named
.grpc_previous_rpc_attempts != nullptr)) {
- grpc_metadata_batch_remove(
- &batch_data->send_initial_metadata,
- batch_data->send_initial_metadata.idx.named.grpc_previous_rpc_attempts);
+ grpc_metadata_batch_remove(&retry_state->send_initial_metadata,
+ retry_state->send_initial_metadata.idx.named
+ .grpc_previous_rpc_attempts);
}
if (GPR_UNLIKELY(calld->num_attempts_completed > 0)) {
grpc_mdelem retry_md = grpc_mdelem_from_slices(
GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS,
*retry_count_strings[calld->num_attempts_completed - 1]);
grpc_error* error = grpc_metadata_batch_add_tail(
- &batch_data->send_initial_metadata,
- &batch_data->send_initial_metadata_storage[calld->send_initial_metadata
- .list.count],
+ &retry_state->send_initial_metadata,
+ &retry_state->send_initial_metadata_storage[calld->send_initial_metadata
+ .list.count],
retry_md);
if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
gpr_log(GPR_ERROR, "error adding retry metadata: %s",
@@ -2152,7 +2168,7 @@ static void add_retriable_send_initial_metadata_op(
retry_state->started_send_initial_metadata = true;
batch_data->batch.send_initial_metadata = true;
batch_data->batch.payload->send_initial_metadata.send_initial_metadata =
- &batch_data->send_initial_metadata;
+ &retry_state->send_initial_metadata;
batch_data->batch.payload->send_initial_metadata.send_initial_metadata_flags =
calld->send_initial_metadata_flags;
batch_data->batch.payload->send_initial_metadata.peer_string =
@@ -2173,10 +2189,10 @@ static void add_retriable_send_message_op(
grpc_core::ByteStreamCache* cache =
(*calld->send_messages)[retry_state->started_send_message_count];
++retry_state->started_send_message_count;
- batch_data->send_message.Init(cache);
+ retry_state->send_message.Init(cache);
batch_data->batch.send_message = true;
batch_data->batch.payload->send_message.send_message.reset(
- batch_data->send_message.get());
+ retry_state->send_message.get());
}
// Adds retriable send_trailing_metadata op to batch_data.
@@ -2186,17 +2202,17 @@ static void add_retriable_send_trailing_metadata_op(
// We need to make a copy of the metadata batch for each attempt, since
// the filters in the subchannel stack may modify this batch, and we don't
// want those modifications to be passed forward to subsequent attempts.
- batch_data->send_trailing_metadata_storage =
+ retry_state->send_trailing_metadata_storage =
static_cast<grpc_linked_mdelem*>(gpr_arena_alloc(
calld->arena, sizeof(grpc_linked_mdelem) *
calld->send_trailing_metadata.list.count));
grpc_metadata_batch_copy(&calld->send_trailing_metadata,
- &batch_data->send_trailing_metadata,
- batch_data->send_trailing_metadata_storage);
+ &retry_state->send_trailing_metadata,
+ retry_state->send_trailing_metadata_storage);
retry_state->started_send_trailing_metadata = true;
batch_data->batch.send_trailing_metadata = true;
batch_data->batch.payload->send_trailing_metadata.send_trailing_metadata =
- &batch_data->send_trailing_metadata;
+ &retry_state->send_trailing_metadata;
}
// Adds retriable recv_initial_metadata op to batch_data.
@@ -2205,16 +2221,16 @@ static void add_retriable_recv_initial_metadata_op(
subchannel_batch_data* batch_data) {
retry_state->started_recv_initial_metadata = true;
batch_data->batch.recv_initial_metadata = true;
- grpc_metadata_batch_init(&batch_data->recv_initial_metadata);
+ grpc_metadata_batch_init(&retry_state->recv_initial_metadata);
batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata =
- &batch_data->recv_initial_metadata;
+ &retry_state->recv_initial_metadata;
batch_data->batch.payload->recv_initial_metadata.trailing_metadata_available =
- &batch_data->trailing_metadata_available;
- GRPC_CLOSURE_INIT(&batch_data->recv_initial_metadata_ready,
+ &retry_state->trailing_metadata_available;
+ GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
recv_initial_metadata_ready, batch_data,
grpc_schedule_on_exec_ctx);
batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata_ready =
- &batch_data->recv_initial_metadata_ready;
+ &retry_state->recv_initial_metadata_ready;
}
// Adds retriable recv_message op to batch_data.
@@ -2224,11 +2240,11 @@ static void add_retriable_recv_message_op(
++retry_state->started_recv_message_count;
batch_data->batch.recv_message = true;
batch_data->batch.payload->recv_message.recv_message =
- &batch_data->recv_message;
- GRPC_CLOSURE_INIT(&batch_data->recv_message_ready, recv_message_ready,
+ &retry_state->recv_message;
+ GRPC_CLOSURE_INIT(&retry_state->recv_message_ready, recv_message_ready,
batch_data, grpc_schedule_on_exec_ctx);
batch_data->batch.payload->recv_message.recv_message_ready =
- &batch_data->recv_message_ready;
+ &retry_state->recv_message_ready;
}
// Adds retriable recv_trailing_metadata op to batch_data.
@@ -2237,16 +2253,17 @@ static void add_retriable_recv_trailing_metadata_op(
subchannel_batch_data* batch_data) {
retry_state->started_recv_trailing_metadata = true;
batch_data->batch.recv_trailing_metadata = true;
- grpc_metadata_batch_init(&batch_data->recv_trailing_metadata);
+ grpc_metadata_batch_init(&retry_state->recv_trailing_metadata);
batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata =
- &batch_data->recv_trailing_metadata;
+ &retry_state->recv_trailing_metadata;
batch_data->batch.payload->recv_trailing_metadata.collect_stats =
- &batch_data->collect_stats;
- GRPC_CLOSURE_INIT(&batch_data->recv_trailing_metadata_ready,
+ &retry_state->collect_stats;
+ GRPC_CLOSURE_INIT(&retry_state->recv_trailing_metadata_ready,
recv_trailing_metadata_ready, batch_data,
grpc_schedule_on_exec_ctx);
batch_data->batch.payload->recv_trailing_metadata
- .recv_trailing_metadata_ready = &batch_data->recv_trailing_metadata_ready;
+ .recv_trailing_metadata_ready =
+ &retry_state->recv_trailing_metadata_ready;
}
// Helper function used to start a recv_trailing_metadata batch. This
@@ -2400,11 +2417,9 @@ static void add_subchannel_batches_for_pending_batches(
// started subchannel batch, since we'll propagate the
// completion when it completes.
if (retry_state->completed_recv_trailing_metadata) {
- subchannel_batch_data* batch_data =
- retry_state->recv_trailing_metadata_internal_batch;
// Batches containing recv_trailing_metadata always succeed.
closures->Add(
- &batch_data->recv_trailing_metadata_ready, GRPC_ERROR_NONE,
+ &retry_state->recv_trailing_metadata_ready, GRPC_ERROR_NONE,
"re-executing recv_trailing_metadata_ready to propagate "
"internally triggered result");
} else {