diff options
Diffstat (limited to 'src/core/lib/surface/call.cc')
-rw-r--r-- | src/core/lib/surface/call.cc | 59 |
1 files changed, 40 insertions, 19 deletions
diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index e012244b0f..52a7ea0f89 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -187,6 +187,7 @@ struct grpc_call { grpc_closure receiving_slice_ready; grpc_closure receiving_stream_ready; grpc_closure receiving_initial_metadata_ready; + grpc_closure receiving_trailing_metadata_ready; uint32_t test_only_last_message_flags; bool cancelled; @@ -1079,7 +1080,6 @@ static void post_batch_completion(batch_control* bctl) { if (bctl->op.send_initial_metadata) { grpc_metadata_batch_destroy( - &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]); } if (bctl->op.send_message) { @@ -1087,14 +1087,9 @@ static void post_batch_completion(batch_control* bctl) { } if (bctl->op.send_trailing_metadata) { grpc_metadata_batch_destroy( - &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]); } if (bctl->op.recv_trailing_metadata) { - grpc_metadata_batch* md = - &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; - recv_trailing_filter(call, md, GRPC_ERROR_REF(bctl->batch_error)); - /* propagate cancellation to any interested children */ gpr_atm_rel_store(&call->received_final_op_atm, 1); parent_call* pc = get_parent_call(call); @@ -1115,7 +1110,6 @@ static void post_batch_completion(batch_control* bctl) { } gpr_mu_unlock(&pc->child_list_mu); } - GRPC_ERROR_UNREF(error); error = GRPC_ERROR_NONE; } @@ -1390,6 +1384,16 @@ static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) { finish_batch_step(bctl); } +static void receiving_trailing_metadata_ready(void* bctlp, grpc_error* error) { + batch_control* bctl = static_cast<batch_control*>(bctlp); + grpc_call* call = bctl->call; + GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_trailing_metadata_ready"); + grpc_metadata_batch* md = + &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; + recv_trailing_filter(call, md, GRPC_ERROR_REF(error)); + finish_batch_step(bctl); +} + static void finish_batch(void* bctlp, grpc_error* error) { batch_control* bctl = static_cast<batch_control*>(bctlp); grpc_call* call = bctl->call; @@ -1415,7 +1419,8 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, size_t i; const grpc_op* op; batch_control* bctl; - int num_completion_callbacks_needed = 1; + bool has_send_ops = false; + int num_recv_ops = 0; grpc_call_error error = GRPC_CALL_OK; grpc_transport_stream_op_batch* stream_op; grpc_transport_stream_op_batch_payload* stream_op_payload; @@ -1521,6 +1526,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, stream_op_payload->send_initial_metadata.peer_string = &call->peer_string; } + has_send_ops = true; break; } case GRPC_OP_SEND_MESSAGE: { @@ -1550,6 +1556,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, &op->data.send_message.send_message->data.raw.slice_buffer, flags); stream_op_payload->send_message.send_message.reset( call->sending_stream.get()); + has_send_ops = true; break; } case GRPC_OP_SEND_CLOSE_FROM_CLIENT: { @@ -1570,6 +1577,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, call->sent_final_op = true; stream_op_payload->send_trailing_metadata.send_trailing_metadata = &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]; + has_send_ops = true; break; } case GRPC_OP_SEND_STATUS_FROM_SERVER: { @@ -1632,6 +1640,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, } stream_op_payload->send_trailing_metadata.send_trailing_metadata = &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]; + has_send_ops = true; break; } case GRPC_OP_RECV_INITIAL_METADATA: { @@ -1659,7 +1668,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, stream_op_payload->recv_initial_metadata.peer_string = &call->peer_string; } - num_completion_callbacks_needed++; + ++num_recv_ops; break; } case GRPC_OP_RECV_MESSAGE: { @@ -1681,7 +1690,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, grpc_schedule_on_exec_ctx); stream_op_payload->recv_message.recv_message_ready = &call->receiving_stream_ready; - num_completion_callbacks_needed++; + ++num_recv_ops; break; } case GRPC_OP_RECV_STATUS_ON_CLIENT: { @@ -1707,11 +1716,16 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, call->final_op.client.error_string = op->data.recv_status_on_client.error_string; stream_op->recv_trailing_metadata = true; - stream_op->collect_stats = true; stream_op_payload->recv_trailing_metadata.recv_trailing_metadata = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; - stream_op_payload->collect_stats.collect_stats = + stream_op_payload->recv_trailing_metadata.collect_stats = &call->final_info.stats.transport_stream_stats; + GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready, + receiving_trailing_metadata_ready, bctl, + grpc_schedule_on_exec_ctx); + stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready = + &call->receiving_trailing_metadata_ready; + ++num_recv_ops; break; } case GRPC_OP_RECV_CLOSE_ON_SERVER: { @@ -1732,11 +1746,16 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, call->final_op.server.cancelled = op->data.recv_close_on_server.cancelled; stream_op->recv_trailing_metadata = true; - stream_op->collect_stats = true; stream_op_payload->recv_trailing_metadata.recv_trailing_metadata = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; - stream_op_payload->collect_stats.collect_stats = + stream_op_payload->recv_trailing_metadata.collect_stats = &call->final_info.stats.transport_stream_stats; + GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready, + receiving_trailing_metadata_ready, bctl, + grpc_schedule_on_exec_ctx); + stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready = + &call->receiving_trailing_metadata_ready; + ++num_recv_ops; break; } } @@ -1746,13 +1765,15 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, if (!is_notify_tag_closure) { GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag)); } - gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed); + gpr_ref_init(&bctl->steps_to_complete, (has_send_ops ? 1 : 0) + num_recv_ops); - GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl, - grpc_schedule_on_exec_ctx); - stream_op->on_complete = &bctl->finish_batch; - gpr_atm_rel_store(&call->any_ops_sent_atm, 1); + if (has_send_ops) { + GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl, + grpc_schedule_on_exec_ctx); + stream_op->on_complete = &bctl->finish_batch; + } + gpr_atm_rel_store(&call->any_ops_sent_atm, 1); execute_batch(call, stream_op, &bctl->start_batch); done: |