diff options
author | Mark D. Roth <roth@google.com> | 2018-06-08 08:37:23 -0700 |
---|---|---|
committer | Mark D. Roth <roth@google.com> | 2018-06-08 08:37:23 -0700 |
commit | 8ae0ff0af2dc27ee82dd0e7174d4ad3112e6cf26 (patch) | |
tree | 2fb55b21217d62e3ed9b66c33bb88fc962526354 /src/core/lib/surface | |
parent | 8b5e45a35b83908d7a5f1dae6f412929f70c90ef (diff) |
Move recv_trailing_metadata into its own callback. Don't use
on_complete for recv_ops.
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r-- | src/core/lib/surface/call.cc | 63 |
1 files changed, 43 insertions, 20 deletions
diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index 1cf8ea94e7..d44846cd12 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -233,6 +233,7 @@ struct grpc_call { grpc_closure receiving_slice_ready; grpc_closure receiving_stream_ready; grpc_closure receiving_initial_metadata_ready; + grpc_closure receiving_trailing_metadata_ready; uint32_t test_only_last_message_flags; grpc_closure release_call; @@ -1209,7 +1210,6 @@ static void post_batch_completion(batch_control* bctl) { if (bctl->op.send_initial_metadata) { grpc_metadata_batch_destroy( - &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]); } if (bctl->op.send_message) { @@ -1217,14 +1217,9 @@ static void post_batch_completion(batch_control* bctl) { } if (bctl->op.send_trailing_metadata) { grpc_metadata_batch_destroy( - &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]); } if (bctl->op.recv_trailing_metadata) { - grpc_metadata_batch* md = - &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; - recv_trailing_filter(call, md); - /* propagate cancellation to any interested children */ gpr_atm_rel_store(&call->received_final_op_atm, 1); parent_call* pc = get_parent_call(call); @@ -1246,7 +1241,6 @@ static void post_batch_completion(batch_control* bctl) { } gpr_mu_unlock(&pc->child_list_mu); } - if (call->is_client) { get_final_status(call, set_status_value_directly, call->final_op.client.status, @@ -1256,7 +1250,6 @@ static void post_batch_completion(batch_control* bctl) { get_final_status(call, set_cancelled_value, call->final_op.server.cancelled, nullptr, nullptr); } - GRPC_ERROR_UNREF(error); error = GRPC_ERROR_NONE; } @@ -1538,6 +1531,19 @@ static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) { finish_batch_step(bctl); } +static void receiving_trailing_metadata_ready(void* bctlp, grpc_error* error) { + batch_control* bctl = static_cast<batch_control*>(bctlp); + grpc_call* call = bctl->call; + GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_trailing_metadata_ready"); + add_batch_error(bctl, GRPC_ERROR_REF(error), false); + if (error == GRPC_ERROR_NONE) { + grpc_metadata_batch* md = + &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; + recv_trailing_filter(call, md); + } + finish_batch_step(bctl); +} + static void finish_batch(void* bctlp, grpc_error* error) { batch_control* bctl = static_cast<batch_control*>(bctlp); grpc_call* call = bctl->call; @@ -1558,7 +1564,8 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, size_t i; const grpc_op* op; batch_control* bctl; - int num_completion_callbacks_needed = 1; + bool has_send_ops = false; + int num_recv_ops = 0; grpc_call_error error = GRPC_CALL_OK; grpc_transport_stream_op_batch* stream_op; grpc_transport_stream_op_batch_payload* stream_op_payload; @@ -1664,6 +1671,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, stream_op_payload->send_initial_metadata.peer_string = &call->peer_string; } + has_send_ops = true; break; } case GRPC_OP_SEND_MESSAGE: { @@ -1693,6 +1701,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, &op->data.send_message.send_message->data.raw.slice_buffer, flags); stream_op_payload->send_message.send_message.reset( call->sending_stream.get()); + has_send_ops = true; break; } case GRPC_OP_SEND_CLOSE_FROM_CLIENT: { @@ -1713,6 +1722,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, call->sent_final_op = true; stream_op_payload->send_trailing_metadata.send_trailing_metadata = &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]; + has_send_ops = true; break; } case GRPC_OP_SEND_STATUS_FROM_SERVER: { @@ -1777,6 +1787,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, } stream_op_payload->send_trailing_metadata.send_trailing_metadata = &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]; + has_send_ops = true; break; } case GRPC_OP_RECV_INITIAL_METADATA: { @@ -1804,7 +1815,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, stream_op_payload->recv_initial_metadata.peer_string = &call->peer_string; } - num_completion_callbacks_needed++; + ++num_recv_ops; break; } case GRPC_OP_RECV_MESSAGE: { @@ -1826,7 +1837,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, grpc_schedule_on_exec_ctx); stream_op_payload->recv_message.recv_message_ready = &call->receiving_stream_ready; - num_completion_callbacks_needed++; + ++num_recv_ops; break; } case GRPC_OP_RECV_STATUS_ON_CLIENT: { @@ -1852,11 +1863,16 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, call->final_op.client.error_string = op->data.recv_status_on_client.error_string; stream_op->recv_trailing_metadata = true; - stream_op->collect_stats = true; stream_op_payload->recv_trailing_metadata.recv_trailing_metadata = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; - stream_op_payload->collect_stats.collect_stats = + stream_op_payload->recv_trailing_metadata.collect_stats = &call->final_info.stats.transport_stream_stats; + GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready, + receiving_trailing_metadata_ready, bctl, + grpc_schedule_on_exec_ctx); + stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready = + &call->receiving_trailing_metadata_ready; + ++num_recv_ops; break; } case GRPC_OP_RECV_CLOSE_ON_SERVER: { @@ -1877,11 +1893,16 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, call->final_op.server.cancelled = op->data.recv_close_on_server.cancelled; stream_op->recv_trailing_metadata = true; - stream_op->collect_stats = true; stream_op_payload->recv_trailing_metadata.recv_trailing_metadata = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; - stream_op_payload->collect_stats.collect_stats = + stream_op_payload->recv_trailing_metadata.collect_stats = &call->final_info.stats.transport_stream_stats; + GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready, + receiving_trailing_metadata_ready, bctl, + grpc_schedule_on_exec_ctx); + stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready = + &call->receiving_trailing_metadata_ready; + ++num_recv_ops; break; } } @@ -1891,13 +1912,15 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, if (!is_notify_tag_closure) { GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag)); } - gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed); + gpr_ref_init(&bctl->steps_to_complete, (has_send_ops ? 1 : 0) + num_recv_ops); - GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl, - grpc_schedule_on_exec_ctx); - stream_op->on_complete = &bctl->finish_batch; - gpr_atm_rel_store(&call->any_ops_sent_atm, 1); + if (has_send_ops) { + GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl, + grpc_schedule_on_exec_ctx); + stream_op->on_complete = &bctl->finish_batch; + } + gpr_atm_rel_store(&call->any_ops_sent_atm, 1); execute_batch(call, stream_op, &bctl->start_batch); done: |