aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib
diff options
context:
space:
mode:
authorGravatar Mark D. Roth <roth@google.com>2018-06-08 14:22:12 -0700
committerGravatar GitHub <noreply@github.com>2018-06-08 14:22:12 -0700
commitf3715134458cb14efd855d948f229dc2661b4028 (patch)
treef3569a69146e234a77463f4219f6513cafce77f1 /src/core/lib
parent6fcbee0d03a20c61ac2ba785a7524cd84e888108 (diff)
Revert "move recv_trailing_metadata into its own callback, don't use on_complete for recv_ops"
Diffstat (limited to 'src/core/lib')
-rw-r--r--src/core/lib/channel/connected_channel.cc9
-rw-r--r--src/core/lib/iomgr/call_combiner.h80
-rw-r--r--src/core/lib/iomgr/closure.h5
-rw-r--r--src/core/lib/surface/call.cc63
-rw-r--r--src/core/lib/transport/transport.cc29
-rw-r--r--src/core/lib/transport/transport.h22
-rw-r--r--src/core/lib/transport/transport_op_string.cc7
7 files changed, 49 insertions, 166 deletions
diff --git a/src/core/lib/channel/connected_channel.cc b/src/core/lib/channel/connected_channel.cc
index e2ea334ded..ddd3029402 100644
--- a/src/core/lib/channel/connected_channel.cc
+++ b/src/core/lib/channel/connected_channel.cc
@@ -51,7 +51,6 @@ typedef struct connected_channel_call_data {
callback_state on_complete[6]; // Max number of pending batches.
callback_state recv_initial_metadata_ready;
callback_state recv_message_ready;
- callback_state recv_trailing_metadata_ready;
} call_data;
static void run_in_call_combiner(void* arg, grpc_error* error) {
@@ -112,12 +111,6 @@ static void con_start_transport_stream_op_batch(
intercept_callback(calld, state, false, "recv_message_ready",
&batch->payload->recv_message.recv_message_ready);
}
- if (batch->recv_trailing_metadata) {
- callback_state* state = &calld->recv_trailing_metadata_ready;
- intercept_callback(
- calld, state, false, "recv_trailing_metadata_ready",
- &batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready);
- }
if (batch->cancel_stream) {
// There can be more than one cancellation batch in flight at any
// given time, so we can't just pick out a fixed index into
@@ -128,7 +121,7 @@ static void con_start_transport_stream_op_batch(
static_cast<callback_state*>(gpr_malloc(sizeof(*state)));
intercept_callback(calld, state, true, "on_complete (cancel_stream)",
&batch->on_complete);
- } else if (batch->on_complete != nullptr) {
+ } else {
callback_state* state = get_state_for_batch(calld, batch);
intercept_callback(calld, state, false, "on_complete", &batch->on_complete);
}
diff --git a/src/core/lib/iomgr/call_combiner.h b/src/core/lib/iomgr/call_combiner.h
index f9ce29f231..0ccd08ea57 100644
--- a/src/core/lib/iomgr/call_combiner.h
+++ b/src/core/lib/iomgr/call_combiner.h
@@ -26,7 +26,6 @@
#include <grpc/support/atm.h>
#include "src/core/lib/gpr/mpscq.h"
-#include "src/core/lib/gprpp/inlined_vector.h"
#include "src/core/lib/iomgr/closure.h"
// A simple, lock-free mechanism for serializing activity related to a
@@ -110,83 +109,4 @@ void grpc_call_combiner_set_notify_on_cancel(grpc_call_combiner* call_combiner,
void grpc_call_combiner_cancel(grpc_call_combiner* call_combiner,
grpc_error* error);
-namespace grpc_core {
-
-// Helper for running a list of closures in a call combiner.
-//
-// Each callback running in the call combiner will eventually be
-// returned to the surface, at which point the surface will yield the
-// call combiner. So when we are running in the call combiner and have
-// more than one callback to return to the surface, we need to re-enter
-// the call combiner for all but one of those callbacks.
-class CallCombinerClosureList {
- public:
- CallCombinerClosureList() {}
-
- // Adds a closure to the list. The closure must eventually result in
- // the call combiner being yielded.
- void Add(grpc_closure* closure, grpc_error* error, const char* reason) {
- closures_.emplace_back(closure, error, reason);
- }
-
- // Runs all closures in the call combiner and yields the call combiner.
- //
- // All but one of the closures in the list will be scheduled via
- // GRPC_CALL_COMBINER_START(), and the remaining closure will be
- // scheduled via GRPC_CLOSURE_SCHED(), which will eventually result in
- // yielding the call combiner. If the list is empty, then the call
- // combiner will be yielded immediately.
- void RunClosures(grpc_call_combiner* call_combiner) {
- for (size_t i = 1; i < closures_.size(); ++i) {
- auto& closure = closures_[i];
- GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error,
- closure.reason);
- }
- if (closures_.size() > 0) {
- if (grpc_call_combiner_trace.enabled()) {
- gpr_log(GPR_INFO,
- "CallCombinerClosureList executing closure while already "
- "holding call_combiner %p: closure=%p error=%s reason=%s",
- call_combiner, closures_[0].closure,
- grpc_error_string(closures_[0].error), closures_[0].reason);
- }
- // This will release the call combiner.
- GRPC_CLOSURE_SCHED(closures_[0].closure, closures_[0].error);
- } else {
- GRPC_CALL_COMBINER_STOP(call_combiner, "no closures to schedule");
- }
- closures_.clear();
- }
-
- // Runs all closures in the call combiner, but does NOT yield the call
- // combiner. All closures will be scheduled via GRPC_CALL_COMBINER_START().
- void RunClosuresWithoutYielding(grpc_call_combiner* call_combiner) {
- for (size_t i = 0; i < closures_.size(); ++i) {
- auto& closure = closures_[i];
- GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error,
- closure.reason);
- }
- closures_.clear();
- }
-
- size_t size() const { return closures_.size(); }
-
- private:
- struct CallCombinerClosure {
- grpc_closure* closure;
- grpc_error* error;
- const char* reason;
-
- CallCombinerClosure(grpc_closure* closure, grpc_error* error,
- const char* reason)
- : closure(closure), error(error), reason(reason) {}
- };
-
- // There are generally a maximum of 6 closures to run in the call
- // combiner, one for each pending op.
- InlinedVector<CallCombinerClosure, 6> closures_;
-};
-
-} // namespace grpc_core
-
#endif /* GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H */
diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h
index f14c723844..34a494485d 100644
--- a/src/core/lib/iomgr/closure.h
+++ b/src/core/lib/iomgr/closure.h
@@ -283,10 +283,9 @@ inline void grpc_closure_sched(grpc_closure* c, grpc_error* error) {
if (c->scheduled) {
gpr_log(GPR_ERROR,
"Closure already scheduled. (closure: %p, created: [%s:%d], "
- "previously scheduled at: [%s: %d], newly scheduled at [%s: %d], "
- "run?: %s",
+ "previously scheduled at: [%s: %d] run?: %s",
c, c->file_created, c->line_created, c->file_initiated,
- c->line_initiated, file, line, c->run ? "true" : "false");
+ c->line_initiated, c->run ? "true" : "false");
abort();
}
c->scheduled = true;
diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc
index d44846cd12..1cf8ea94e7 100644
--- a/src/core/lib/surface/call.cc
+++ b/src/core/lib/surface/call.cc
@@ -233,7 +233,6 @@ struct grpc_call {
grpc_closure receiving_slice_ready;
grpc_closure receiving_stream_ready;
grpc_closure receiving_initial_metadata_ready;
- grpc_closure receiving_trailing_metadata_ready;
uint32_t test_only_last_message_flags;
grpc_closure release_call;
@@ -1210,6 +1209,7 @@ static void post_batch_completion(batch_control* bctl) {
if (bctl->op.send_initial_metadata) {
grpc_metadata_batch_destroy(
+
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]);
}
if (bctl->op.send_message) {
@@ -1217,9 +1217,14 @@ static void post_batch_completion(batch_control* bctl) {
}
if (bctl->op.send_trailing_metadata) {
grpc_metadata_batch_destroy(
+
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]);
}
if (bctl->op.recv_trailing_metadata) {
+ grpc_metadata_batch* md =
+ &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
+ recv_trailing_filter(call, md);
+
/* propagate cancellation to any interested children */
gpr_atm_rel_store(&call->received_final_op_atm, 1);
parent_call* pc = get_parent_call(call);
@@ -1241,6 +1246,7 @@ static void post_batch_completion(batch_control* bctl) {
}
gpr_mu_unlock(&pc->child_list_mu);
}
+
if (call->is_client) {
get_final_status(call, set_status_value_directly,
call->final_op.client.status,
@@ -1250,6 +1256,7 @@ static void post_batch_completion(batch_control* bctl) {
get_final_status(call, set_cancelled_value,
call->final_op.server.cancelled, nullptr, nullptr);
}
+
GRPC_ERROR_UNREF(error);
error = GRPC_ERROR_NONE;
}
@@ -1531,19 +1538,6 @@ static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) {
finish_batch_step(bctl);
}
-static void receiving_trailing_metadata_ready(void* bctlp, grpc_error* error) {
- batch_control* bctl = static_cast<batch_control*>(bctlp);
- grpc_call* call = bctl->call;
- GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_trailing_metadata_ready");
- add_batch_error(bctl, GRPC_ERROR_REF(error), false);
- if (error == GRPC_ERROR_NONE) {
- grpc_metadata_batch* md =
- &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
- recv_trailing_filter(call, md);
- }
- finish_batch_step(bctl);
-}
-
static void finish_batch(void* bctlp, grpc_error* error) {
batch_control* bctl = static_cast<batch_control*>(bctlp);
grpc_call* call = bctl->call;
@@ -1564,8 +1558,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
size_t i;
const grpc_op* op;
batch_control* bctl;
- bool has_send_ops = false;
- int num_recv_ops = 0;
+ int num_completion_callbacks_needed = 1;
grpc_call_error error = GRPC_CALL_OK;
grpc_transport_stream_op_batch* stream_op;
grpc_transport_stream_op_batch_payload* stream_op_payload;
@@ -1671,7 +1664,6 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
stream_op_payload->send_initial_metadata.peer_string =
&call->peer_string;
}
- has_send_ops = true;
break;
}
case GRPC_OP_SEND_MESSAGE: {
@@ -1701,7 +1693,6 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
&op->data.send_message.send_message->data.raw.slice_buffer, flags);
stream_op_payload->send_message.send_message.reset(
call->sending_stream.get());
- has_send_ops = true;
break;
}
case GRPC_OP_SEND_CLOSE_FROM_CLIENT: {
@@ -1722,7 +1713,6 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
call->sent_final_op = true;
stream_op_payload->send_trailing_metadata.send_trailing_metadata =
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
- has_send_ops = true;
break;
}
case GRPC_OP_SEND_STATUS_FROM_SERVER: {
@@ -1787,7 +1777,6 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
}
stream_op_payload->send_trailing_metadata.send_trailing_metadata =
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
- has_send_ops = true;
break;
}
case GRPC_OP_RECV_INITIAL_METADATA: {
@@ -1815,7 +1804,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
stream_op_payload->recv_initial_metadata.peer_string =
&call->peer_string;
}
- ++num_recv_ops;
+ num_completion_callbacks_needed++;
break;
}
case GRPC_OP_RECV_MESSAGE: {
@@ -1837,7 +1826,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
grpc_schedule_on_exec_ctx);
stream_op_payload->recv_message.recv_message_ready =
&call->receiving_stream_ready;
- ++num_recv_ops;
+ num_completion_callbacks_needed++;
break;
}
case GRPC_OP_RECV_STATUS_ON_CLIENT: {
@@ -1863,16 +1852,11 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
call->final_op.client.error_string =
op->data.recv_status_on_client.error_string;
stream_op->recv_trailing_metadata = true;
+ stream_op->collect_stats = true;
stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
- stream_op_payload->recv_trailing_metadata.collect_stats =
+ stream_op_payload->collect_stats.collect_stats =
&call->final_info.stats.transport_stream_stats;
- GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
- receiving_trailing_metadata_ready, bctl,
- grpc_schedule_on_exec_ctx);
- stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
- &call->receiving_trailing_metadata_ready;
- ++num_recv_ops;
break;
}
case GRPC_OP_RECV_CLOSE_ON_SERVER: {
@@ -1893,16 +1877,11 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
call->final_op.server.cancelled =
op->data.recv_close_on_server.cancelled;
stream_op->recv_trailing_metadata = true;
+ stream_op->collect_stats = true;
stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
- stream_op_payload->recv_trailing_metadata.collect_stats =
+ stream_op_payload->collect_stats.collect_stats =
&call->final_info.stats.transport_stream_stats;
- GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
- receiving_trailing_metadata_ready, bctl,
- grpc_schedule_on_exec_ctx);
- stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
- &call->receiving_trailing_metadata_ready;
- ++num_recv_ops;
break;
}
}
@@ -1912,15 +1891,13 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
if (!is_notify_tag_closure) {
GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
}
- gpr_ref_init(&bctl->steps_to_complete, (has_send_ops ? 1 : 0) + num_recv_ops);
-
- if (has_send_ops) {
- GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl,
- grpc_schedule_on_exec_ctx);
- stream_op->on_complete = &bctl->finish_batch;
- }
+ gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed);
+ GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl,
+ grpc_schedule_on_exec_ctx);
+ stream_op->on_complete = &bctl->finish_batch;
gpr_atm_rel_store(&call->any_ops_sent_atm, 1);
+
execute_batch(call, stream_op, &bctl->start_batch);
done:
diff --git a/src/core/lib/transport/transport.cc b/src/core/lib/transport/transport.cc
index cbdb77c844..039d603394 100644
--- a/src/core/lib/transport/transport.cc
+++ b/src/core/lib/transport/transport.cc
@@ -212,32 +212,21 @@ void grpc_transport_stream_op_batch_finish_with_failure(
if (batch->send_message) {
batch->payload->send_message.send_message.reset();
}
- if (batch->cancel_stream) {
- GRPC_ERROR_UNREF(batch->payload->cancel_stream.cancel_error);
+ if (batch->recv_message) {
+ GRPC_CALL_COMBINER_START(
+ call_combiner, batch->payload->recv_message.recv_message_ready,
+ GRPC_ERROR_REF(error), "failing recv_message_ready");
}
- // Construct a list of closures to execute.
- grpc_core::CallCombinerClosureList closures;
if (batch->recv_initial_metadata) {
- closures.Add(
+ GRPC_CALL_COMBINER_START(
+ call_combiner,
batch->payload->recv_initial_metadata.recv_initial_metadata_ready,
GRPC_ERROR_REF(error), "failing recv_initial_metadata_ready");
}
- if (batch->recv_message) {
- closures.Add(batch->payload->recv_message.recv_message_ready,
- GRPC_ERROR_REF(error), "failing recv_message_ready");
- }
- if (batch->recv_trailing_metadata) {
- closures.Add(
- batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
- GRPC_ERROR_REF(error), "failing recv_trailing_metadata_ready");
- }
- if (batch->on_complete != nullptr) {
- closures.Add(batch->on_complete, GRPC_ERROR_REF(error),
- "failing on_complete");
+ GRPC_CLOSURE_SCHED(batch->on_complete, error);
+ if (batch->cancel_stream) {
+ GRPC_ERROR_UNREF(batch->payload->cancel_stream.cancel_error);
}
- // Execute closures.
- closures.RunClosures(call_combiner);
- GRPC_ERROR_UNREF(error);
}
typedef struct {
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index 585b9dfae9..b2e252d939 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -122,15 +122,9 @@ typedef struct grpc_transport_stream_op_batch_payload
/* Transport stream op: a set of operations to perform on a transport
against a single stream */
typedef struct grpc_transport_stream_op_batch {
- /** Should be scheduled when all of the non-recv operations in the batch
- are complete.
-
- The recv ops (recv_initial_metadata, recv_message, and
- recv_trailing_metadata) each have their own callbacks. If a batch
- contains both recv ops and non-recv ops, on_complete should be
- scheduled as soon as the non-recv ops are complete, regardless of
- whether or not the recv ops are complete. If a batch contains
- only recv ops, on_complete can be null. */
+ /** Should be enqueued when all requested operations (excluding recv_message
+ and recv_initial_metadata which have their own closures) in a given batch
+ have been completed. */
grpc_closure* on_complete;
/** Values for the stream op (fields set are determined by flags above) */
@@ -155,6 +149,9 @@ typedef struct grpc_transport_stream_op_batch {
*/
bool recv_trailing_metadata : 1;
+ /** Collect any stats into provided buffer, zero internal stat counters */
+ bool collect_stats : 1;
+
/** Cancel this stream with the provided error */
bool cancel_stream : 1;
@@ -222,11 +219,12 @@ struct grpc_transport_stream_op_batch_payload {
struct {
grpc_metadata_batch* recv_trailing_metadata;
- grpc_transport_stream_stats* collect_stats;
- /** Should be enqueued when initial metadata is ready to be processed. */
- grpc_closure* recv_trailing_metadata_ready;
} recv_trailing_metadata;
+ struct {
+ grpc_transport_stream_stats* collect_stats;
+ } collect_stats;
+
/** Forcefully close this stream.
The HTTP2 semantics should be:
- server side: if cancel_error has GRPC_ERROR_INT_GRPC_STATUS, and
diff --git a/src/core/lib/transport/transport_op_string.cc b/src/core/lib/transport/transport_op_string.cc
index 8c7db642a5..25ab492f3a 100644
--- a/src/core/lib/transport/transport_op_string.cc
+++ b/src/core/lib/transport/transport_op_string.cc
@@ -120,6 +120,13 @@ char* grpc_transport_stream_op_batch_string(
gpr_strvec_add(&b, tmp);
}
+ if (op->collect_stats) {
+ gpr_strvec_add(&b, gpr_strdup(" "));
+ gpr_asprintf(&tmp, "COLLECT_STATS:%p",
+ op->payload->collect_stats.collect_stats);
+ gpr_strvec_add(&b, tmp);
+ }
+
out = gpr_strvec_flatten(&b, nullptr);
gpr_strvec_destroy(&b);