diff options
author | 2018-06-08 14:22:12 -0700 | |
---|---|---|
committer | 2018-06-08 14:22:12 -0700 | |
commit | f3715134458cb14efd855d948f229dc2661b4028 (patch) | |
tree | f3569a69146e234a77463f4219f6513cafce77f1 /src/core/lib | |
parent | 6fcbee0d03a20c61ac2ba785a7524cd84e888108 (diff) |
Revert "move recv_trailing_metadata into its own callback, don't use on_complete for recv_ops"
Diffstat (limited to 'src/core/lib')
-rw-r--r-- | src/core/lib/channel/connected_channel.cc | 9 | ||||
-rw-r--r-- | src/core/lib/iomgr/call_combiner.h | 80 | ||||
-rw-r--r-- | src/core/lib/iomgr/closure.h | 5 | ||||
-rw-r--r-- | src/core/lib/surface/call.cc | 63 | ||||
-rw-r--r-- | src/core/lib/transport/transport.cc | 29 | ||||
-rw-r--r-- | src/core/lib/transport/transport.h | 22 | ||||
-rw-r--r-- | src/core/lib/transport/transport_op_string.cc | 7 |
7 files changed, 49 insertions, 166 deletions
diff --git a/src/core/lib/channel/connected_channel.cc b/src/core/lib/channel/connected_channel.cc index e2ea334ded..ddd3029402 100644 --- a/src/core/lib/channel/connected_channel.cc +++ b/src/core/lib/channel/connected_channel.cc @@ -51,7 +51,6 @@ typedef struct connected_channel_call_data { callback_state on_complete[6]; // Max number of pending batches. callback_state recv_initial_metadata_ready; callback_state recv_message_ready; - callback_state recv_trailing_metadata_ready; } call_data; static void run_in_call_combiner(void* arg, grpc_error* error) { @@ -112,12 +111,6 @@ static void con_start_transport_stream_op_batch( intercept_callback(calld, state, false, "recv_message_ready", &batch->payload->recv_message.recv_message_ready); } - if (batch->recv_trailing_metadata) { - callback_state* state = &calld->recv_trailing_metadata_ready; - intercept_callback( - calld, state, false, "recv_trailing_metadata_ready", - &batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready); - } if (batch->cancel_stream) { // There can be more than one cancellation batch in flight at any // given time, so we can't just pick out a fixed index into @@ -128,7 +121,7 @@ static void con_start_transport_stream_op_batch( static_cast<callback_state*>(gpr_malloc(sizeof(*state))); intercept_callback(calld, state, true, "on_complete (cancel_stream)", &batch->on_complete); - } else if (batch->on_complete != nullptr) { + } else { callback_state* state = get_state_for_batch(calld, batch); intercept_callback(calld, state, false, "on_complete", &batch->on_complete); } diff --git a/src/core/lib/iomgr/call_combiner.h b/src/core/lib/iomgr/call_combiner.h index f9ce29f231..0ccd08ea57 100644 --- a/src/core/lib/iomgr/call_combiner.h +++ b/src/core/lib/iomgr/call_combiner.h @@ -26,7 +26,6 @@ #include <grpc/support/atm.h> #include "src/core/lib/gpr/mpscq.h" -#include "src/core/lib/gprpp/inlined_vector.h" #include "src/core/lib/iomgr/closure.h" // A simple, lock-free mechanism for serializing activity related to a @@ -110,83 +109,4 @@ void grpc_call_combiner_set_notify_on_cancel(grpc_call_combiner* call_combiner, void grpc_call_combiner_cancel(grpc_call_combiner* call_combiner, grpc_error* error); -namespace grpc_core { - -// Helper for running a list of closures in a call combiner. -// -// Each callback running in the call combiner will eventually be -// returned to the surface, at which point the surface will yield the -// call combiner. So when we are running in the call combiner and have -// more than one callback to return to the surface, we need to re-enter -// the call combiner for all but one of those callbacks. -class CallCombinerClosureList { - public: - CallCombinerClosureList() {} - - // Adds a closure to the list. The closure must eventually result in - // the call combiner being yielded. - void Add(grpc_closure* closure, grpc_error* error, const char* reason) { - closures_.emplace_back(closure, error, reason); - } - - // Runs all closures in the call combiner and yields the call combiner. - // - // All but one of the closures in the list will be scheduled via - // GRPC_CALL_COMBINER_START(), and the remaining closure will be - // scheduled via GRPC_CLOSURE_SCHED(), which will eventually result in - // yielding the call combiner. If the list is empty, then the call - // combiner will be yielded immediately. - void RunClosures(grpc_call_combiner* call_combiner) { - for (size_t i = 1; i < closures_.size(); ++i) { - auto& closure = closures_[i]; - GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error, - closure.reason); - } - if (closures_.size() > 0) { - if (grpc_call_combiner_trace.enabled()) { - gpr_log(GPR_INFO, - "CallCombinerClosureList executing closure while already " - "holding call_combiner %p: closure=%p error=%s reason=%s", - call_combiner, closures_[0].closure, - grpc_error_string(closures_[0].error), closures_[0].reason); - } - // This will release the call combiner. - GRPC_CLOSURE_SCHED(closures_[0].closure, closures_[0].error); - } else { - GRPC_CALL_COMBINER_STOP(call_combiner, "no closures to schedule"); - } - closures_.clear(); - } - - // Runs all closures in the call combiner, but does NOT yield the call - // combiner. All closures will be scheduled via GRPC_CALL_COMBINER_START(). - void RunClosuresWithoutYielding(grpc_call_combiner* call_combiner) { - for (size_t i = 0; i < closures_.size(); ++i) { - auto& closure = closures_[i]; - GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error, - closure.reason); - } - closures_.clear(); - } - - size_t size() const { return closures_.size(); } - - private: - struct CallCombinerClosure { - grpc_closure* closure; - grpc_error* error; - const char* reason; - - CallCombinerClosure(grpc_closure* closure, grpc_error* error, - const char* reason) - : closure(closure), error(error), reason(reason) {} - }; - - // There are generally a maximum of 6 closures to run in the call - // combiner, one for each pending op. - InlinedVector<CallCombinerClosure, 6> closures_; -}; - -} // namespace grpc_core - #endif /* GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H */ diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h index f14c723844..34a494485d 100644 --- a/src/core/lib/iomgr/closure.h +++ b/src/core/lib/iomgr/closure.h @@ -283,10 +283,9 @@ inline void grpc_closure_sched(grpc_closure* c, grpc_error* error) { if (c->scheduled) { gpr_log(GPR_ERROR, "Closure already scheduled. (closure: %p, created: [%s:%d], " - "previously scheduled at: [%s: %d], newly scheduled at [%s: %d], " - "run?: %s", + "previously scheduled at: [%s: %d] run?: %s", c, c->file_created, c->line_created, c->file_initiated, - c->line_initiated, file, line, c->run ? "true" : "false"); + c->line_initiated, c->run ? "true" : "false"); abort(); } c->scheduled = true; diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index d44846cd12..1cf8ea94e7 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -233,7 +233,6 @@ struct grpc_call { grpc_closure receiving_slice_ready; grpc_closure receiving_stream_ready; grpc_closure receiving_initial_metadata_ready; - grpc_closure receiving_trailing_metadata_ready; uint32_t test_only_last_message_flags; grpc_closure release_call; @@ -1210,6 +1209,7 @@ static void post_batch_completion(batch_control* bctl) { if (bctl->op.send_initial_metadata) { grpc_metadata_batch_destroy( + &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]); } if (bctl->op.send_message) { @@ -1217,9 +1217,14 @@ static void post_batch_completion(batch_control* bctl) { } if (bctl->op.send_trailing_metadata) { grpc_metadata_batch_destroy( + &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]); } if (bctl->op.recv_trailing_metadata) { + grpc_metadata_batch* md = + &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; + recv_trailing_filter(call, md); + /* propagate cancellation to any interested children */ gpr_atm_rel_store(&call->received_final_op_atm, 1); parent_call* pc = get_parent_call(call); @@ -1241,6 +1246,7 @@ static void post_batch_completion(batch_control* bctl) { } gpr_mu_unlock(&pc->child_list_mu); } + if (call->is_client) { get_final_status(call, set_status_value_directly, call->final_op.client.status, @@ -1250,6 +1256,7 @@ static void post_batch_completion(batch_control* bctl) { get_final_status(call, set_cancelled_value, call->final_op.server.cancelled, nullptr, nullptr); } + GRPC_ERROR_UNREF(error); error = GRPC_ERROR_NONE; } @@ -1531,19 +1538,6 @@ static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) { finish_batch_step(bctl); } -static void receiving_trailing_metadata_ready(void* bctlp, grpc_error* error) { - batch_control* bctl = static_cast<batch_control*>(bctlp); - grpc_call* call = bctl->call; - GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_trailing_metadata_ready"); - add_batch_error(bctl, GRPC_ERROR_REF(error), false); - if (error == GRPC_ERROR_NONE) { - grpc_metadata_batch* md = - &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; - recv_trailing_filter(call, md); - } - finish_batch_step(bctl); -} - static void finish_batch(void* bctlp, grpc_error* error) { batch_control* bctl = static_cast<batch_control*>(bctlp); grpc_call* call = bctl->call; @@ -1564,8 +1558,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, size_t i; const grpc_op* op; batch_control* bctl; - bool has_send_ops = false; - int num_recv_ops = 0; + int num_completion_callbacks_needed = 1; grpc_call_error error = GRPC_CALL_OK; grpc_transport_stream_op_batch* stream_op; grpc_transport_stream_op_batch_payload* stream_op_payload; @@ -1671,7 +1664,6 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, stream_op_payload->send_initial_metadata.peer_string = &call->peer_string; } - has_send_ops = true; break; } case GRPC_OP_SEND_MESSAGE: { @@ -1701,7 +1693,6 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, &op->data.send_message.send_message->data.raw.slice_buffer, flags); stream_op_payload->send_message.send_message.reset( call->sending_stream.get()); - has_send_ops = true; break; } case GRPC_OP_SEND_CLOSE_FROM_CLIENT: { @@ -1722,7 +1713,6 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, call->sent_final_op = true; stream_op_payload->send_trailing_metadata.send_trailing_metadata = &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]; - has_send_ops = true; break; } case GRPC_OP_SEND_STATUS_FROM_SERVER: { @@ -1787,7 +1777,6 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, } stream_op_payload->send_trailing_metadata.send_trailing_metadata = &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]; - has_send_ops = true; break; } case GRPC_OP_RECV_INITIAL_METADATA: { @@ -1815,7 +1804,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, stream_op_payload->recv_initial_metadata.peer_string = &call->peer_string; } - ++num_recv_ops; + num_completion_callbacks_needed++; break; } case GRPC_OP_RECV_MESSAGE: { @@ -1837,7 +1826,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, grpc_schedule_on_exec_ctx); stream_op_payload->recv_message.recv_message_ready = &call->receiving_stream_ready; - ++num_recv_ops; + num_completion_callbacks_needed++; break; } case GRPC_OP_RECV_STATUS_ON_CLIENT: { @@ -1863,16 +1852,11 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, call->final_op.client.error_string = op->data.recv_status_on_client.error_string; stream_op->recv_trailing_metadata = true; + stream_op->collect_stats = true; stream_op_payload->recv_trailing_metadata.recv_trailing_metadata = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; - stream_op_payload->recv_trailing_metadata.collect_stats = + stream_op_payload->collect_stats.collect_stats = &call->final_info.stats.transport_stream_stats; - GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready, - receiving_trailing_metadata_ready, bctl, - grpc_schedule_on_exec_ctx); - stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready = - &call->receiving_trailing_metadata_ready; - ++num_recv_ops; break; } case GRPC_OP_RECV_CLOSE_ON_SERVER: { @@ -1893,16 +1877,11 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, call->final_op.server.cancelled = op->data.recv_close_on_server.cancelled; stream_op->recv_trailing_metadata = true; + stream_op->collect_stats = true; stream_op_payload->recv_trailing_metadata.recv_trailing_metadata = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; - stream_op_payload->recv_trailing_metadata.collect_stats = + stream_op_payload->collect_stats.collect_stats = &call->final_info.stats.transport_stream_stats; - GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready, - receiving_trailing_metadata_ready, bctl, - grpc_schedule_on_exec_ctx); - stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready = - &call->receiving_trailing_metadata_ready; - ++num_recv_ops; break; } } @@ -1912,15 +1891,13 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, if (!is_notify_tag_closure) { GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag)); } - gpr_ref_init(&bctl->steps_to_complete, (has_send_ops ? 1 : 0) + num_recv_ops); - - if (has_send_ops) { - GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl, - grpc_schedule_on_exec_ctx); - stream_op->on_complete = &bctl->finish_batch; - } + gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed); + GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl, + grpc_schedule_on_exec_ctx); + stream_op->on_complete = &bctl->finish_batch; gpr_atm_rel_store(&call->any_ops_sent_atm, 1); + execute_batch(call, stream_op, &bctl->start_batch); done: diff --git a/src/core/lib/transport/transport.cc b/src/core/lib/transport/transport.cc index cbdb77c844..039d603394 100644 --- a/src/core/lib/transport/transport.cc +++ b/src/core/lib/transport/transport.cc @@ -212,32 +212,21 @@ void grpc_transport_stream_op_batch_finish_with_failure( if (batch->send_message) { batch->payload->send_message.send_message.reset(); } - if (batch->cancel_stream) { - GRPC_ERROR_UNREF(batch->payload->cancel_stream.cancel_error); + if (batch->recv_message) { + GRPC_CALL_COMBINER_START( + call_combiner, batch->payload->recv_message.recv_message_ready, + GRPC_ERROR_REF(error), "failing recv_message_ready"); } - // Construct a list of closures to execute. - grpc_core::CallCombinerClosureList closures; if (batch->recv_initial_metadata) { - closures.Add( + GRPC_CALL_COMBINER_START( + call_combiner, batch->payload->recv_initial_metadata.recv_initial_metadata_ready, GRPC_ERROR_REF(error), "failing recv_initial_metadata_ready"); } - if (batch->recv_message) { - closures.Add(batch->payload->recv_message.recv_message_ready, - GRPC_ERROR_REF(error), "failing recv_message_ready"); - } - if (batch->recv_trailing_metadata) { - closures.Add( - batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready, - GRPC_ERROR_REF(error), "failing recv_trailing_metadata_ready"); - } - if (batch->on_complete != nullptr) { - closures.Add(batch->on_complete, GRPC_ERROR_REF(error), - "failing on_complete"); + GRPC_CLOSURE_SCHED(batch->on_complete, error); + if (batch->cancel_stream) { + GRPC_ERROR_UNREF(batch->payload->cancel_stream.cancel_error); } - // Execute closures. - closures.RunClosures(call_combiner); - GRPC_ERROR_UNREF(error); } typedef struct { diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index 585b9dfae9..b2e252d939 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -122,15 +122,9 @@ typedef struct grpc_transport_stream_op_batch_payload /* Transport stream op: a set of operations to perform on a transport against a single stream */ typedef struct grpc_transport_stream_op_batch { - /** Should be scheduled when all of the non-recv operations in the batch - are complete. - - The recv ops (recv_initial_metadata, recv_message, and - recv_trailing_metadata) each have their own callbacks. If a batch - contains both recv ops and non-recv ops, on_complete should be - scheduled as soon as the non-recv ops are complete, regardless of - whether or not the recv ops are complete. If a batch contains - only recv ops, on_complete can be null. */ + /** Should be enqueued when all requested operations (excluding recv_message + and recv_initial_metadata which have their own closures) in a given batch + have been completed. */ grpc_closure* on_complete; /** Values for the stream op (fields set are determined by flags above) */ @@ -155,6 +149,9 @@ typedef struct grpc_transport_stream_op_batch { */ bool recv_trailing_metadata : 1; + /** Collect any stats into provided buffer, zero internal stat counters */ + bool collect_stats : 1; + /** Cancel this stream with the provided error */ bool cancel_stream : 1; @@ -222,11 +219,12 @@ struct grpc_transport_stream_op_batch_payload { struct { grpc_metadata_batch* recv_trailing_metadata; - grpc_transport_stream_stats* collect_stats; - /** Should be enqueued when initial metadata is ready to be processed. */ - grpc_closure* recv_trailing_metadata_ready; } recv_trailing_metadata; + struct { + grpc_transport_stream_stats* collect_stats; + } collect_stats; + /** Forcefully close this stream. The HTTP2 semantics should be: - server side: if cancel_error has GRPC_ERROR_INT_GRPC_STATUS, and diff --git a/src/core/lib/transport/transport_op_string.cc b/src/core/lib/transport/transport_op_string.cc index 8c7db642a5..25ab492f3a 100644 --- a/src/core/lib/transport/transport_op_string.cc +++ b/src/core/lib/transport/transport_op_string.cc @@ -120,6 +120,13 @@ char* grpc_transport_stream_op_batch_string( gpr_strvec_add(&b, tmp); } + if (op->collect_stats) { + gpr_strvec_add(&b, gpr_strdup(" ")); + gpr_asprintf(&tmp, "COLLECT_STATS:%p", + op->payload->collect_stats.collect_stats); + gpr_strvec_add(&b, tmp); + } + out = gpr_strvec_flatten(&b, nullptr); gpr_strvec_destroy(&b); |