aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface
diff options
context:
space:
mode:
authorGravatar Vijay Pai <vpai@google.com>2018-06-13 16:42:59 -0700
committerGravatar GitHub <noreply@github.com>2018-06-13 16:42:59 -0700
commit3f9308ce1f8cb42c96901c1700f0b9dbb531f186 (patch)
treef4f1c2c12d70487d3abe40877ce23cdd739071e9 /src/core/lib/surface
parent92a0ae0b1081840d2c5a488f66bf6550c1a492f4 (diff)
parent0159026111b4dac15ddd228baee3757554898235 (diff)
Merge pull request #15746 from grpc/revert-15709-recv_trailing_metadata_ready2
Revert "Second attempt: move recv_trailing_metadata into its own callback, don't use on_complete for recv_ops"
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r--src/core/lib/surface/call.cc81
1 files changed, 24 insertions, 57 deletions
diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc
index 8b224b6e7b..1cf8ea94e7 100644
--- a/src/core/lib/surface/call.cc
+++ b/src/core/lib/surface/call.cc
@@ -233,7 +233,6 @@ struct grpc_call {
grpc_closure receiving_slice_ready;
grpc_closure receiving_stream_ready;
grpc_closure receiving_initial_metadata_ready;
- grpc_closure receiving_trailing_metadata_ready;
uint32_t test_only_last_message_flags;
grpc_closure release_call;
@@ -271,17 +270,8 @@ struct grpc_call {
grpc_core::TraceFlag grpc_call_error_trace(false, "call_error");
grpc_core::TraceFlag grpc_compression_trace(false, "compression");
-/* Given a size, round up to the next multiple of sizeof(void*) */
-#define ROUND_UP_TO_ALIGNMENT_SIZE(x) \
- (((x) + GPR_MAX_ALIGNMENT - 1u) & ~(GPR_MAX_ALIGNMENT - 1u))
-
-#define CALL_STACK_FROM_CALL(call) \
- (grpc_call_stack*)((char*)(call) + \
- ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)))
-#define CALL_FROM_CALL_STACK(call_stack) \
- (grpc_call*)(((char*)(call_stack)) - \
- ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)))
-
+#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack*)((call) + 1))
+#define CALL_FROM_CALL_STACK(call_stack) (((grpc_call*)(call_stack)) - 1)
#define CALL_ELEM_FROM_CALL(call, idx) \
grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx)
#define CALL_FROM_TOP_ELEM(top_elem) \
@@ -352,9 +342,8 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
size_t initial_size = grpc_channel_get_call_size_estimate(args->channel);
GRPC_STATS_INC_CALL_INITIAL_SIZE(initial_size);
gpr_arena* arena = gpr_arena_create(initial_size);
- call = static_cast<grpc_call*>(
- gpr_arena_alloc(arena, ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)) +
- channel_stack->call_stack_size));
+ call = static_cast<grpc_call*>(gpr_arena_alloc(
+ arena, sizeof(grpc_call) + channel_stack->call_stack_size));
gpr_ref_init(&call->ext_ref, 1);
call->arena = arena;
grpc_call_combiner_init(&call->call_combiner);
@@ -1220,6 +1209,7 @@ static void post_batch_completion(batch_control* bctl) {
if (bctl->op.send_initial_metadata) {
grpc_metadata_batch_destroy(
+
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]);
}
if (bctl->op.send_message) {
@@ -1227,9 +1217,14 @@ static void post_batch_completion(batch_control* bctl) {
}
if (bctl->op.send_trailing_metadata) {
grpc_metadata_batch_destroy(
+
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]);
}
if (bctl->op.recv_trailing_metadata) {
+ grpc_metadata_batch* md =
+ &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
+ recv_trailing_filter(call, md);
+
/* propagate cancellation to any interested children */
gpr_atm_rel_store(&call->received_final_op_atm, 1);
parent_call* pc = get_parent_call(call);
@@ -1251,6 +1246,7 @@ static void post_batch_completion(batch_control* bctl) {
}
gpr_mu_unlock(&pc->child_list_mu);
}
+
if (call->is_client) {
get_final_status(call, set_status_value_directly,
call->final_op.client.status,
@@ -1260,6 +1256,7 @@ static void post_batch_completion(batch_control* bctl) {
get_final_status(call, set_cancelled_value,
call->final_op.server.cancelled, nullptr, nullptr);
}
+
GRPC_ERROR_UNREF(error);
error = GRPC_ERROR_NONE;
}
@@ -1541,19 +1538,6 @@ static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) {
finish_batch_step(bctl);
}
-static void receiving_trailing_metadata_ready(void* bctlp, grpc_error* error) {
- batch_control* bctl = static_cast<batch_control*>(bctlp);
- grpc_call* call = bctl->call;
- GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_trailing_metadata_ready");
- add_batch_error(bctl, GRPC_ERROR_REF(error), false);
- if (error == GRPC_ERROR_NONE) {
- grpc_metadata_batch* md =
- &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
- recv_trailing_filter(call, md);
- }
- finish_batch_step(bctl);
-}
-
static void finish_batch(void* bctlp, grpc_error* error) {
batch_control* bctl = static_cast<batch_control*>(bctlp);
grpc_call* call = bctl->call;
@@ -1574,8 +1558,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
size_t i;
const grpc_op* op;
batch_control* bctl;
- bool has_send_ops = false;
- int num_recv_ops = 0;
+ int num_completion_callbacks_needed = 1;
grpc_call_error error = GRPC_CALL_OK;
grpc_transport_stream_op_batch* stream_op;
grpc_transport_stream_op_batch_payload* stream_op_payload;
@@ -1681,7 +1664,6 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
stream_op_payload->send_initial_metadata.peer_string =
&call->peer_string;
}
- has_send_ops = true;
break;
}
case GRPC_OP_SEND_MESSAGE: {
@@ -1711,7 +1693,6 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
&op->data.send_message.send_message->data.raw.slice_buffer, flags);
stream_op_payload->send_message.send_message.reset(
call->sending_stream.get());
- has_send_ops = true;
break;
}
case GRPC_OP_SEND_CLOSE_FROM_CLIENT: {
@@ -1732,7 +1713,6 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
call->sent_final_op = true;
stream_op_payload->send_trailing_metadata.send_trailing_metadata =
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
- has_send_ops = true;
break;
}
case GRPC_OP_SEND_STATUS_FROM_SERVER: {
@@ -1797,7 +1777,6 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
}
stream_op_payload->send_trailing_metadata.send_trailing_metadata =
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
- has_send_ops = true;
break;
}
case GRPC_OP_RECV_INITIAL_METADATA: {
@@ -1825,7 +1804,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
stream_op_payload->recv_initial_metadata.peer_string =
&call->peer_string;
}
- ++num_recv_ops;
+ num_completion_callbacks_needed++;
break;
}
case GRPC_OP_RECV_MESSAGE: {
@@ -1847,7 +1826,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
grpc_schedule_on_exec_ctx);
stream_op_payload->recv_message.recv_message_ready =
&call->receiving_stream_ready;
- ++num_recv_ops;
+ num_completion_callbacks_needed++;
break;
}
case GRPC_OP_RECV_STATUS_ON_CLIENT: {
@@ -1873,16 +1852,11 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
call->final_op.client.error_string =
op->data.recv_status_on_client.error_string;
stream_op->recv_trailing_metadata = true;
+ stream_op->collect_stats = true;
stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
- stream_op_payload->recv_trailing_metadata.collect_stats =
+ stream_op_payload->collect_stats.collect_stats =
&call->final_info.stats.transport_stream_stats;
- GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
- receiving_trailing_metadata_ready, bctl,
- grpc_schedule_on_exec_ctx);
- stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
- &call->receiving_trailing_metadata_ready;
- ++num_recv_ops;
break;
}
case GRPC_OP_RECV_CLOSE_ON_SERVER: {
@@ -1903,16 +1877,11 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
call->final_op.server.cancelled =
op->data.recv_close_on_server.cancelled;
stream_op->recv_trailing_metadata = true;
+ stream_op->collect_stats = true;
stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
- stream_op_payload->recv_trailing_metadata.collect_stats =
+ stream_op_payload->collect_stats.collect_stats =
&call->final_info.stats.transport_stream_stats;
- GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
- receiving_trailing_metadata_ready, bctl,
- grpc_schedule_on_exec_ctx);
- stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
- &call->receiving_trailing_metadata_ready;
- ++num_recv_ops;
break;
}
}
@@ -1922,15 +1891,13 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
if (!is_notify_tag_closure) {
GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
}
- gpr_ref_init(&bctl->steps_to_complete, (has_send_ops ? 1 : 0) + num_recv_ops);
-
- if (has_send_ops) {
- GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl,
- grpc_schedule_on_exec_ctx);
- stream_op->on_complete = &bctl->finish_batch;
- }
+ gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed);
+ GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl,
+ grpc_schedule_on_exec_ctx);
+ stream_op->on_complete = &bctl->finish_batch;
gpr_atm_rel_store(&call->any_ops_sent_atm, 1);
+
execute_batch(call, stream_op, &bctl->start_batch);
done: